1. # When we tap a Supply, we get back a Tap object. We close the tap in order
  2. # to turn off the flow of values.
  3. my class Tap {
  4. has &!on-close;
  5. submethod BUILD(:&!on-close --> Nil) { } # for subclasses of Tap
  6. multi method new(Tap: --> Tap:D) {
  7. nqp::create(self)
  8. }
  9. multi method new(Tap: &on-close --> Tap:D) {
  10. nqp::if(
  11. nqp::eqaddr(self.WHAT,Tap),
  12. nqp::p6bindattrinvres( # we're a real Tap, fast path
  13. nqp::create(self),Tap,'&!on-close',&on-close
  14. ),
  15. self.bless(:&on-close) # subclass, use slow path
  16. )
  17. }
  18. method close(--> True) {
  19. nqp::if(
  20. nqp::isconcrete(&!on-close),
  21. nqp::if(
  22. nqp::istype((my \close-result := &!on-close()),Promise),
  23. (await close-result)
  24. )
  25. )
  26. }
  27. }
  28. # The asynchronous dual of the Iterator role; goes inside of a Supply, which
  29. # is the asynchronous dual of the Seq class. So just as a Seq wraps around an
  30. # Iterator so we don't expose all the internal iterator types to the world, a
  31. # Supply wraps about a Tappable so we don't expose all of those. (It may
  32. # surprise you that it's a Tappable, not a Tap, given Seq wraps an Iterator,
  33. # not an Iterable. Guess that's part of the duality too. Ask your local
  34. # category theorist. :-))
  35. my role Tappable {
  36. method tap(&emit, &done, &quit, &tap) { ... }
  37. method live() { ... } # Taps into a live data source
  38. method serial() { ... } # Promises no concurrent emits
  39. method sane() { ... } # Matches emit* [done|quit]? grammar
  40. }
  41. # A few Supply-related exception types.
  42. my class X::Supply::Combinator is Exception {
  43. has $.combinator;
  44. method message() { "Can only use $!combinator to combine defined Supply objects" }
  45. }
  46. my class X::Supply::Migrate::Needs is Exception {
  47. method message() {
  48. ".migrate needs Supplies to be emitted"
  49. }
  50. }
  51. my class X::Supply::New is Exception {
  52. method message() {
  53. "Cannot directly create a Supply. You might want:\n" ~
  54. " - To use a Supplier in order to get a live supply\n" ~
  55. " - To use Supply.on-demand to create an on-demand supply\n" ~
  56. " - To create a Supply using a supply block"
  57. }
  58. }
  59. # A Supply is like an asynchronous Seq. All the methods that you can do on
  60. # a Supply go in here.
  61. my class Supplier { ... }
  62. my class Supplier::Preserving { ... }
  63. my class Supply does Awaitable {
  64. has Tappable $!tappable;
  65. proto method new(|) {*}
  66. multi method new() {
  67. X::Supply::New.new.throw
  68. }
  69. multi method new(Tappable $tappable) {
  70. self.WHAT =:= Supply
  71. ?? nqp::create(self)!SET-SELF($tappable)
  72. !! self.bless(:$tappable)
  73. }
  74. submethod BUILD(Tappable :$!tappable! --> Nil) { }
  75. method !SET-SELF(Tappable $tappable) {
  76. $!tappable := $tappable;
  77. self
  78. }
  79. method Capture(Supply:D:) { self.List.Capture }
  80. method live(Supply:D:) { $!tappable.live }
  81. method serial(Supply:D:) { $!tappable.serial }
  82. method Tappable(--> Tappable) { $!tappable }
  83. my \DISCARD = -> $ {};
  84. my \NOP = -> {};
  85. my \DEATH = -> $ex { $ex.throw };
  86. method tap(Supply:D: &emit = DISCARD, :&done = NOP, :&quit = DEATH, :&tap = DISCARD) {
  87. $!tappable.tap(&emit, &done, &quit, &tap)
  88. }
  89. method act(Supply:D: &actor, *%others) {
  90. self.sanitize.tap(&actor, |%others)
  91. }
  92. ##
  93. ## Supply factories
  94. ##
  95. my class OnDemand does Tappable {
  96. has &!producer;
  97. has &!closing;
  98. has $!scheduler;
  99. submethod BUILD(:&!producer!, :&!closing!, :$!scheduler! --> Nil) {}
  100. method tap(&emit, &done, &quit, &tap) {
  101. my int $closed = 0;
  102. my $t = Tap.new: {
  103. if &!closing {
  104. &!closing() unless $closed++;
  105. }
  106. }
  107. tap($t);
  108. my $p = Supplier.new;
  109. $p.Supply.tap(&emit,
  110. done => {
  111. done();
  112. $t.close();
  113. },
  114. quit => -> \ex {
  115. quit(ex);
  116. $t.close();
  117. });
  118. $!scheduler.cue({ &!producer($p) },
  119. catch => -> \ex { $p.quit(ex) });
  120. $t
  121. }
  122. method live(--> False) { }
  123. method sane(--> False) { }
  124. method serial(--> False) { }
  125. }
  126. method on-demand(Supply:U: &producer, :&closing, :$scheduler = CurrentThreadScheduler) {
  127. Supply.new(OnDemand.new(:&producer, :&closing, :$scheduler)).sanitize
  128. }
  129. method from-list(Supply:U: +@values, :$scheduler = CurrentThreadScheduler) {
  130. self.on-demand(-> $p {
  131. $p.emit($_) for @values;
  132. $p.done();
  133. }, :$scheduler);
  134. }
  135. my class Interval does Tappable {
  136. has $!scheduler;
  137. has $!interval;
  138. has $!delay;
  139. submethod BUILD(:$!scheduler, :$!interval, :$!delay --> Nil) { }
  140. method tap(&emit, &, &, &tap) {
  141. my $i = 0;
  142. my $lock = Lock::Async.new;
  143. $lock.protect: {
  144. my $cancellation = $!scheduler.cue(
  145. {
  146. emit($lock.protect: { $i++ });
  147. CATCH { $cancellation.cancel if $cancellation }
  148. },
  149. :every($!interval), :in($!delay)
  150. );
  151. my $t = Tap.new({ $cancellation.cancel });
  152. tap($t);
  153. $t
  154. }
  155. }
  156. method live(--> False) { }
  157. method sane(--> True) { }
  158. method serial(--> False) { }
  159. }
  160. method interval(Supply:U: $interval, $delay = 0, :$scheduler = $*SCHEDULER) {
  161. Supply.new(Interval.new(:$interval, :$delay, :$scheduler));
  162. }
  163. ##
  164. ## Simple operations are those that operate on a single Supply, carry its
  165. ## liveness, and are always serial. We implement the directly as they are
  166. ## common and fairly "hot path".
  167. ##
  168. my role SimpleOpTappable does Tappable {
  169. has $!source;
  170. method live() { $!source.live }
  171. method sane(--> True) { }
  172. method serial(--> True) { }
  173. method !cleanup(int $cleaned-up is rw, $source-tap) {
  174. if $source-tap && !$cleaned-up {
  175. $cleaned-up = 1;
  176. $source-tap.close;
  177. }
  178. }
  179. }
  180. my class Serialize does SimpleOpTappable {
  181. submethod BUILD(:$!source! --> Nil) { }
  182. method tap(&emit, &done, &quit, &tap) {
  183. my $lock = Lock::Async.new;
  184. my int $cleaned-up = 0;
  185. my $source-tap;
  186. my $t;
  187. $!source.tap(
  188. tap => {
  189. $source-tap = $_;
  190. $t = Tap.new({ self!cleanup($cleaned-up, $source-tap) });
  191. tap($t);
  192. },
  193. -> \value{
  194. $lock.protect-or-queue-on-recursion: { emit(value); }
  195. },
  196. done => -> {
  197. $lock.protect-or-queue-on-recursion: {
  198. done();
  199. self!cleanup($cleaned-up, $source-tap);
  200. }
  201. },
  202. quit => -> $ex {
  203. $lock.protect-or-queue-on-recursion: {
  204. quit($ex);
  205. self!cleanup($cleaned-up, $source-tap);
  206. }
  207. });
  208. $t
  209. }
  210. }
  211. method serialize(Supply:D:) {
  212. $!tappable.serial ?? self !! Supply.new(Serialize.new(source => self))
  213. }
  214. my class Sanitize does SimpleOpTappable {
  215. submethod BUILD(:$!source! --> Nil) { }
  216. method tap(&emit, &done, &quit, &tap) {
  217. my int $cleaned-up = 0;
  218. my int $finished = 0;
  219. my $source-tap;
  220. my $t;
  221. $!source.tap(
  222. tap => {
  223. $source-tap = $_;
  224. $t = Tap.new({ self!cleanup($cleaned-up, $source-tap) });
  225. tap($t);
  226. },
  227. -> \value{
  228. emit(value) unless $finished;
  229. },
  230. done => -> {
  231. unless $finished {
  232. $finished = 1;
  233. done();
  234. self!cleanup($cleaned-up, $source-tap);
  235. }
  236. },
  237. quit => -> $ex {
  238. unless $finished {
  239. $finished = 1;
  240. quit($ex);
  241. self!cleanup($cleaned-up, $source-tap);
  242. }
  243. });
  244. $t
  245. }
  246. }
  247. method sanitize() {
  248. $!tappable.sane ?? self !! Supply.new(Sanitize.new(source => self.serialize))
  249. }
  250. my class OnClose does SimpleOpTappable {
  251. has &!on-close;
  252. submethod BUILD(:$!source!, :&!on-close! --> Nil) { }
  253. method tap(&emit, &done, &quit, &tap) {
  254. my int $cleaned-up = 0;
  255. my $t;
  256. $!source.tap: &emit, :&done, :&quit, tap => -> $source-tap {
  257. $t = Tap.new({
  258. &!on-close();
  259. self!cleanup($cleaned-up, $source-tap)
  260. });
  261. tap($t);
  262. }
  263. $t
  264. }
  265. }
  266. method on-close(Supply:D: &on-close) {
  267. return Supply.new(OnClose.new(source => self, :&on-close))
  268. }
  269. my class MapSupply does SimpleOpTappable {
  270. has &!mapper;
  271. submethod BUILD(:$!source!, :&!mapper! --> Nil) { }
  272. method tap(&emit, &done, &quit, &tap) {
  273. my int $cleaned-up = 0;
  274. my $source-tap;
  275. my $t;
  276. $!source.tap(
  277. tap => {
  278. $source-tap = $_;
  279. $t = Tap.new({ self!cleanup($cleaned-up, $source-tap) });
  280. tap($t);
  281. },
  282. -> \value {
  283. my \result = try &!mapper(value);
  284. if $! {
  285. quit($!);
  286. self!cleanup($cleaned-up, $source-tap);
  287. }
  288. else {
  289. emit(result)
  290. }
  291. },
  292. done => -> {
  293. done();
  294. self!cleanup($cleaned-up, $source-tap);
  295. },
  296. quit => -> $ex {
  297. quit($ex);
  298. self!cleanup($cleaned-up, $source-tap);
  299. });
  300. $t
  301. }
  302. }
  303. method map(Supply:D: &mapper) {
  304. Supply.new(MapSupply.new(source => self.sanitize, :&mapper))
  305. }
  306. my class Grep does SimpleOpTappable {
  307. has Mu $!test;
  308. submethod BUILD(:$!source!, Mu :$!test! --> Nil) { }
  309. method tap(&emit, &done, &quit, &tap) {
  310. my int $cleaned-up = 0;
  311. my $source-tap;
  312. my $t;
  313. $!source.tap(
  314. tap => {
  315. $source-tap = $_;
  316. $t = Tap.new({ self!cleanup($cleaned-up, $source-tap) });
  317. tap($t);
  318. },
  319. -> \value {
  320. my \accepted = try $!test.ACCEPTS(value);
  321. if accepted {
  322. emit(value);
  323. }
  324. elsif $! {
  325. quit($!);
  326. self!cleanup($cleaned-up, $source-tap);
  327. }
  328. },
  329. done => -> {
  330. done();
  331. self!cleanup($cleaned-up, $source-tap);
  332. },
  333. quit => -> $ex {
  334. quit($ex);
  335. self!cleanup($cleaned-up, $source-tap);
  336. });
  337. $t
  338. }
  339. }
  340. method grep(Supply:D: Mu $test) {
  341. Supply.new(Grep.new(source => self.sanitize, :$test))
  342. }
  343. my class ScheduleOn does SimpleOpTappable {
  344. has $!scheduler;
  345. submethod BUILD(:$!source!, :$!scheduler! --> Nil) { }
  346. method tap(&emit, &done, &quit, &tap) {
  347. my int $cleaned-up = 0;
  348. my $source-tap;
  349. my $t;
  350. $!source.tap(
  351. tap => {
  352. $source-tap = $_;
  353. $t = Tap.new({ self!cleanup($cleaned-up, $source-tap) });
  354. tap($t);
  355. },
  356. -> \value {
  357. $!scheduler.cue: { emit(value) }
  358. },
  359. done => -> {
  360. $!scheduler.cue: { done(); self!cleanup($cleaned-up, $source-tap); }
  361. },
  362. quit => -> $ex {
  363. $!scheduler.cue: { quit($ex); self!cleanup($cleaned-up, $source-tap); }
  364. });
  365. $t
  366. }
  367. }
  368. method schedule-on(Supply:D: Scheduler $scheduler) {
  369. Supply.new(ScheduleOn.new(source => self.sanitize, :$scheduler))
  370. }
  371. my class Start does SimpleOpTappable {
  372. has $!value;
  373. has &!startee;
  374. submethod BUILD(:$!value, :&!startee --> Nil) { }
  375. method tap(&emit, &done, &quit, &tap) {
  376. my int $closed = 0;
  377. my $t = Tap.new({ $closed = 1 });
  378. tap($t);
  379. Promise.start({ &!startee($!value) }).then({
  380. unless $closed {
  381. if .status == Kept {
  382. emit(.result);
  383. done();
  384. }
  385. else {
  386. quit(.cause);
  387. }
  388. }
  389. });
  390. $t
  391. }
  392. }
  393. method start(Supply:D: &startee) {
  394. self.map: -> \value {
  395. Supply.new(Start.new(:value(value), :&startee))
  396. }
  397. }
  398. my class Stable does SimpleOpTappable {
  399. has $!time;
  400. has $!scheduler;
  401. submethod BUILD(:$!source!, :$!time!, :$!scheduler! --> Nil) { }
  402. method tap(&emit, &done, &quit, &tap) {
  403. my int $cleaned-up = 0;
  404. my $lock = Lock::Async.new;
  405. my $last_cancellation;
  406. my $source-tap;
  407. my $t;
  408. $!source.tap(
  409. tap => {
  410. $source-tap = $_;
  411. $t = Tap.new({ self!cleanup($cleaned-up, $source-tap) });
  412. tap($t);
  413. },
  414. -> \value {
  415. $lock.protect: {
  416. if $last_cancellation {
  417. $last_cancellation.cancel;
  418. }
  419. $last_cancellation = $!scheduler.cue(
  420. :in($!time),
  421. {
  422. $lock.protect: { $last_cancellation = Nil; }
  423. try {
  424. emit(value);
  425. CATCH {
  426. default {
  427. quit($_);
  428. self!cleanup($cleaned-up, $source-tap);
  429. }
  430. }
  431. }
  432. });
  433. }
  434. },
  435. done => -> {
  436. done();
  437. self!cleanup($cleaned-up, $source-tap);
  438. },
  439. quit => -> $ex {
  440. quit($ex);
  441. self!cleanup($cleaned-up, $source-tap);
  442. });
  443. $t
  444. }
  445. }
  446. method stable(Supply:D: $time, :$scheduler = $*SCHEDULER) {
  447. return self unless $time;
  448. Supply.new(Stable.new(source => self.sanitize, :$time, :$scheduler))
  449. }
  450. my class Delayed does SimpleOpTappable {
  451. has $!time;
  452. has $!scheduler;
  453. submethod BUILD(:$!source!, :$!time, :$!scheduler! --> Nil) { }
  454. method tap(&emit, &done, &quit, &tap) {
  455. my int $cleaned-up = 0;
  456. my $source-tap;
  457. my $t;
  458. $!source.tap(
  459. tap => {
  460. $source-tap = $_;
  461. my $t = Tap.new({ self!cleanup($cleaned-up, $source-tap) });
  462. tap($t);
  463. },
  464. -> \value {
  465. $!scheduler.cue: { emit(value) }, :in($!time)
  466. },
  467. done => -> {
  468. $!scheduler.cue:
  469. { done(); self!cleanup($cleaned-up, $source-tap); },
  470. :in($!time)
  471. },
  472. quit => -> $ex {
  473. $!scheduler.cue:
  474. { quit($ex); self!cleanup($cleaned-up, $source-tap); },
  475. :in($!time)
  476. });
  477. $t
  478. }
  479. }
  480. method delayed(Supply:D: $time, :$scheduler = $*SCHEDULER) {
  481. return self unless $time; # nothing to do
  482. Supply.new(Delayed.new(source => self.sanitize, :$time, :$scheduler))
  483. }
  484. ##
  485. ## A bunch of the more complex combinators, implemented as supply blocks
  486. ##
  487. method do(Supply:D $self: &side-effect) {
  488. supply {
  489. whenever self -> \value {
  490. side-effect(value);
  491. emit(value);
  492. }
  493. }
  494. }
  495. method flat(Supply:D:) {
  496. supply {
  497. whenever self -> \inner {
  498. whenever inner -> \value {
  499. emit value;
  500. }
  501. }
  502. }
  503. }
  504. method merge(*@s) {
  505. @s.unshift(self) if self.DEFINITE; # add if instance method
  506. return supply { } unless +@s; # nothing to be done
  507. X::Supply::Combinator.new(
  508. combinator => 'merge'
  509. ).throw unless Rakudo::Internals.ALL_DEFINED_TYPE(@s,Supply);
  510. return @s[0].sanitize if +@s == 1; # nothing to be done
  511. supply {
  512. for @s {
  513. whenever $_ -> \value { emit(value) }
  514. }
  515. }
  516. }
  517. method reduce(Supply:D $self: &with) {
  518. supply {
  519. my $first := True;
  520. my $reduced := Nil;
  521. whenever self -> \value {
  522. if $first {
  523. $reduced := value;
  524. $first := False;
  525. }
  526. else {
  527. $reduced := with($reduced, value);
  528. }
  529. LAST {
  530. emit $reduced;
  531. }
  532. }
  533. }
  534. }
  535. method produce(Supply:D $self: &with) {
  536. supply {
  537. my $first := True;
  538. my $reduced := Nil;
  539. whenever self -> \value {
  540. if $first {
  541. $reduced := value;
  542. $first := False;
  543. }
  544. else {
  545. $reduced := with($reduced, value);
  546. }
  547. emit $reduced;
  548. }
  549. }
  550. }
  551. method migrate(Supply:D:) {
  552. supply {
  553. my $current;
  554. whenever self -> \inner {
  555. X::Supply::Migrate::Needs.new.throw
  556. unless nqp::istype(inner, Supply);
  557. $current.close if $current;
  558. $current = do whenever inner -> \value {
  559. emit(value);
  560. }
  561. }
  562. }
  563. }
  564. proto method classify(|) {*}
  565. multi method classify(Supply:D: &mapper ) {
  566. self!classify(&mapper);
  567. }
  568. multi method classify(Supply:D: %mapper ) {
  569. self!classify({ %mapper{$^a} });
  570. }
  571. multi method classify(Supply:D: @mapper ) {
  572. self!classify({ @mapper[$^a] });
  573. }
  574. proto method categorize (|) {*}
  575. multi method categorize(Supply:D: &mapper ) {
  576. self!classify(&mapper, :multi);
  577. }
  578. multi method categorize(Supply:D: %mapper ) {
  579. self!classify({ %mapper{$^a} }, :multi);
  580. }
  581. multi method categorize(Supply:D: @mapper ) {
  582. self!classify({ @mapper[$^a] }, :multi);
  583. }
  584. method !classify(&mapper, :$multi) {
  585. supply {
  586. my %mapping;
  587. sub find-target($key) {
  588. %mapping{ $key.WHICH } //= do {
  589. my $p = Supplier::Preserving.new;
  590. emit($key => $p.Supply);
  591. $p
  592. };
  593. }
  594. whenever self -> \value {
  595. if $multi {
  596. for @(mapper(value)) -> $key {
  597. find-target($key).emit(value);
  598. }
  599. }
  600. else {
  601. find-target(mapper(value)).emit(value);
  602. }
  603. LAST {
  604. %mapping.values>>.done;
  605. }
  606. }
  607. }
  608. }
  609. ##
  610. ## Coercions
  611. ##
  612. multi method Supply(Supply:D:) { self }
  613. method Channel(Supply:D:) {
  614. my $c = Channel.new();
  615. self.sanitize.tap:
  616. -> \val { $c.send(val) },
  617. done => { $c.close },
  618. quit => -> $ex { $c.fail($ex) };
  619. $c
  620. }
  621. my class ConcQueue is repr('ConcBlockingQueue') { }
  622. multi method list(Supply:D:) {
  623. self.Seq.list
  624. }
  625. method Seq(Supply:D:) {
  626. gather {
  627. my Mu \queue = nqp::create(ConcQueue);
  628. my $exception;
  629. self.tap(
  630. -> \val { nqp::push(queue, val) },
  631. done => -> { nqp::push(queue, ConcQueue) }, # type obj as sentinel
  632. quit => -> \ex { $exception := ex; nqp::push(queue, ConcQueue) });
  633. loop {
  634. my \got = nqp::shift(queue);
  635. if got =:= ConcQueue {
  636. $exception.DEFINITE
  637. ?? $exception.throw
  638. !! last
  639. }
  640. else {
  641. take got;
  642. }
  643. }
  644. }
  645. }
  646. method Promise(Supply:D:) {
  647. my $p = Promise.new;
  648. my $v = $p.vow;
  649. my $final := Nil;
  650. my $t = self.tap:
  651. -> \val { $final := val },
  652. done => { $v.keep($final) },
  653. quit => -> \ex { $v.break(ex) };
  654. $p
  655. }
  656. method wait(Supply:D:) { await self.Promise }
  657. my class SupplyAwaitableHandle does Awaitable::Handle {
  658. has $!supply;
  659. method not-ready(Supply:D \supply) {
  660. nqp::create(self)!not-ready(supply)
  661. }
  662. method !not-ready(\supply) {
  663. $!already = False;
  664. $!supply := supply;
  665. self
  666. }
  667. method subscribe-awaiter(&subscriber --> Nil) {
  668. my $final := Nil;
  669. $!supply.tap:
  670. -> \val { $final := val },
  671. done => { subscriber(True, $final) },
  672. quit => -> \ex { subscriber(False, ex) };
  673. }
  674. }
  675. method get-await-handle(--> Awaitable::Handle) {
  676. SupplyAwaitableHandle.not-ready(self)
  677. }
  678. method unique(Supply:D $self: :&as, :&with, :$expires) {
  679. supply {
  680. if $expires {
  681. if &with and !(&with === &[===]) {
  682. my @seen; # really Mu, but doesn't work in settings
  683. my Mu $target;
  684. if &as {
  685. whenever self -> \val {
  686. my $now := now;
  687. $target = &as(val);
  688. my $index =
  689. @seen.first({&with($target,$_[0])},:k);
  690. with $index {
  691. if $now > @seen[$index][1] { # expired
  692. @seen[$index][1] = $now+$expires;
  693. emit(val);
  694. }
  695. }
  696. else {
  697. @seen.push: [$target, $now+$expires];
  698. emit(val);
  699. }
  700. }
  701. }
  702. else {
  703. whenever self -> \val {
  704. my $now := now;
  705. my $index =
  706. @seen.first({&with(val,$_[0])},:k);
  707. with $index {
  708. if $now > @seen[$index][1] { # expired
  709. @seen[$index][1] = $now+$expires;
  710. emit(val);
  711. }
  712. }
  713. else {
  714. @seen.push: [val, $now+$expires];
  715. emit(val);
  716. }
  717. }
  718. }
  719. }
  720. else {
  721. my $seen := nqp::hash();
  722. my str $target;
  723. if &as {
  724. whenever self -> \val {
  725. my $now := now;
  726. $target = nqp::unbox_s(&as(val).WHICH);
  727. if !nqp::existskey($seen,$target) ||
  728. $now > nqp::atkey($seen,$target) { #expired
  729. emit(val);
  730. nqp::bindkey($seen,$target,$now+$expires);
  731. }
  732. }
  733. }
  734. else {
  735. whenever self -> \val {
  736. my $now := now;
  737. $target = nqp::unbox_s(val.WHICH);
  738. if !nqp::existskey($seen,$target) ||
  739. $now > nqp::atkey($seen,$target) { #expired
  740. emit(val);
  741. nqp::bindkey($seen,$target,$now+$expires);
  742. }
  743. }
  744. }
  745. }
  746. }
  747. else { # !$!expires
  748. if &with and !(&with === &[===]) {
  749. my @seen; # really Mu, but doesn't work in settings
  750. my Mu $target;
  751. if &as {
  752. whenever self -> \val {
  753. $target = &as(val);
  754. if @seen.first({ &with($target,$_) } ) =:= Nil {
  755. @seen.push($target);
  756. emit(val);
  757. }
  758. }
  759. }
  760. else {
  761. whenever self -> \val {
  762. if @seen.first({ &with(val,$_) } ) =:= Nil {
  763. @seen.push(val);
  764. emit(val);
  765. }
  766. }
  767. }
  768. }
  769. else {
  770. my $seen := nqp::hash();
  771. my str $target;
  772. if &as {
  773. whenever self -> \val {
  774. $target = nqp::unbox_s(&as(val).WHICH);
  775. unless nqp::existskey($seen, $target) {
  776. nqp::bindkey($seen, $target, 1);
  777. emit(val);
  778. }
  779. }
  780. }
  781. else {
  782. whenever self -> \val {
  783. $target = nqp::unbox_s(val.WHICH);
  784. unless nqp::existskey($seen, $target) {
  785. nqp::bindkey($seen, $target, 1);
  786. emit(val);
  787. }
  788. }
  789. }
  790. }
  791. }
  792. }
  793. }
  794. method squish(Supply:D $self: :&as, :&with is copy) {
  795. &with //= &[===];
  796. supply {
  797. my int $first = 1;
  798. my Mu $last;
  799. my Mu $target;
  800. if &as {
  801. whenever self -> \val {
  802. $target = &as(val);
  803. if $first || !&with($last,$target) {
  804. $first = 0;
  805. emit(val);
  806. }
  807. $last = $target;
  808. }
  809. }
  810. else {
  811. whenever self -> \val {
  812. if $first || !&with($last, val) {
  813. $first = 0;
  814. emit(val);
  815. }
  816. $last = val;
  817. }
  818. }
  819. }
  820. }
  821. multi method rotor(Supply:D $self: Int:D $batch, :$partial) {
  822. self.rotor(($batch,), :$partial)
  823. }
  824. multi method rotor(Supply:D $self: *@cycle, :$partial) {
  825. my @c := @cycle.is-lazy ?? @cycle !! (@cycle xx *).flat.cache;
  826. supply {
  827. my Int $elems;
  828. my Int $gap;
  829. my int $to-skip;
  830. my int $skip;
  831. my \c = @c.iterator;
  832. sub next-batch(--> Nil) {
  833. given c.pull-one {
  834. when Pair {
  835. $elems = +.key;
  836. $gap = +.value;
  837. $to-skip = $gap > 0 ?? $gap !! 0;
  838. }
  839. default {
  840. $elems = +$_;
  841. $gap = 0;
  842. $to-skip = 0;
  843. }
  844. }
  845. }
  846. next-batch;
  847. my @batched;
  848. sub flush(--> Nil) {
  849. emit( @batched.splice(0, +@batched, @batched[* + $gap .. *]) );
  850. $skip = $to-skip;
  851. }
  852. whenever self -> \val {
  853. @batched.push: val unless $skip && $skip--;
  854. if @batched.elems == $elems {
  855. flush;
  856. next-batch;
  857. }
  858. LAST {
  859. flush if @batched and $partial;
  860. }
  861. }
  862. }
  863. }
  864. method batch(Supply:D $self: Int(Cool) :$elems = 0, :$seconds) {
  865. supply {
  866. my int $max = $elems >= 0 ?? $elems !! 0;
  867. my $batched := nqp::list;
  868. my $last_time;
  869. sub flush(--> Nil) {
  870. emit($batched);
  871. $batched := nqp::list;
  872. }
  873. sub final-flush(--> Nil) {
  874. flush if nqp::elems($batched);
  875. }
  876. if $seconds {
  877. $last_time = time div $seconds;
  878. if $elems > 0 { # and $seconds
  879. whenever self -> \val {
  880. my $this_time = time div $seconds;
  881. if $this_time != $last_time {
  882. flush if nqp::elems($batched);
  883. $last_time = $this_time;
  884. nqp::push($batched,val);
  885. }
  886. else {
  887. nqp::push($batched,val);
  888. flush if nqp::iseq_i(nqp::elems($batched),$max);
  889. }
  890. LAST { final-flush; }
  891. }
  892. }
  893. else {
  894. whenever self -> \val {
  895. my $this_time = time div $seconds;
  896. if $this_time != $last_time {
  897. flush if nqp::elems($batched);
  898. $last_time = $this_time;
  899. }
  900. nqp::push($batched,val);
  901. LAST { final-flush; }
  902. }
  903. }
  904. }
  905. else { # just $elems
  906. whenever self -> \val {
  907. nqp::push($batched,val);
  908. flush if nqp::isge_i(nqp::elems($batched),$max);
  909. LAST { final-flush; }
  910. }
  911. }
  912. }
  913. }
  914. method lines(Supply:D $self: :$chomp = True ) {
  915. supply {
  916. my str $str;
  917. my int $chars;
  918. my int $left;
  919. my int $pos;
  920. my int $nextpos;
  921. my int $found;
  922. whenever self -> \val {
  923. $str = $str ~ nqp::unbox_s(val);
  924. $chars = nqp::chars($str);
  925. $pos = 0;
  926. while ($left = $chars - $pos) > 0 {
  927. $nextpos = nqp::findcclass(
  928. nqp::const::CCLASS_NEWLINE, $str, $pos, $left
  929. );
  930. last
  931. if $nextpos >= $chars # no line delimiter
  932. or $nextpos == $chars - 1 # broken CRLF ?
  933. && nqp::eqat($str, "\r", $nextpos); # yes!
  934. if $chomp {
  935. emit( ($found = $nextpos - $pos)
  936. ?? nqp::p6box_s(nqp::substr($str,$pos,$found))
  937. !! ''
  938. );
  939. $pos = $nextpos + 1;
  940. }
  941. else {
  942. $found = $nextpos - $pos + 1;
  943. emit(
  944. nqp::p6box_s(nqp::substr($str,$pos,$found)));
  945. $pos = $pos + $found;
  946. }
  947. }
  948. $str = $pos < $chars
  949. ?? nqp::substr($str,$pos)
  950. !! '';
  951. LAST {
  952. if $str {
  953. $chars = nqp::chars($str);
  954. emit( $chomp && nqp::iscclass(
  955. nqp::const::CCLASS_NEWLINE,$str,$chars-1)
  956. ?? nqp::p6box_s(nqp::substr($str,0,$chars - 1))
  957. !! nqp::p6box_s($str)
  958. );
  959. }
  960. }
  961. }
  962. }
  963. }
  964. method words(Supply:D $self:) {
  965. supply {
  966. my str $str;
  967. my int $chars;
  968. my int $left;
  969. my int $pos;
  970. my int $nextpos;
  971. my int $found;
  972. my int $cr;
  973. my int $crlf;
  974. whenever self -> \val {
  975. $str = $str ~ nqp::unbox_s(val);
  976. $chars = nqp::chars($str);
  977. $pos = nqp::findnotcclass(
  978. nqp::const::CCLASS_WHITESPACE, $str, 0, $chars);
  979. while ($left = $chars - $pos) > 0 {
  980. $nextpos = nqp::findcclass(
  981. nqp::const::CCLASS_WHITESPACE, $str, $pos, $left
  982. );
  983. last unless $left = $chars - $nextpos; # broken word
  984. emit( nqp::box_s(
  985. nqp::substr( $str, $pos, $nextpos - $pos ), Str)
  986. );
  987. $pos = nqp::findnotcclass(
  988. nqp::const::CCLASS_WHITESPACE,$str,$nextpos,$left);
  989. }
  990. $str = $pos < $chars
  991. ?? nqp::substr($str,$pos)
  992. !! '';
  993. LAST {
  994. emit( nqp::box_s($str, Str) ) if $str;
  995. }
  996. }
  997. }
  998. }
  999. method elems(Supply:D $self: $seconds? ) {
  1000. supply {
  1001. my int $elems = 0;
  1002. if $seconds {
  1003. my $last_time = time div $seconds;
  1004. my int $last_elems = $elems;
  1005. whenever self -> \val {
  1006. $last_elems = $elems = $elems + 1;
  1007. my $this_time = time div $seconds;
  1008. if $this_time != $last_time {
  1009. emit $elems;
  1010. $last_time = $this_time;
  1011. }
  1012. LAST emit($elems) if $elems != $last_elems;
  1013. }
  1014. }
  1015. else {
  1016. whenever self -> \val { emit $elems = $elems + 1 }
  1017. }
  1018. }
  1019. }
  1020. method head(Supply:D: Int(Cool) $number = 1) {
  1021. supply {
  1022. my int $todo = $number;
  1023. whenever self -> \val {
  1024. if $todo > 0 {
  1025. emit val;
  1026. $todo = $todo - 1;
  1027. }
  1028. done if $todo <= 0; # nothing left to do
  1029. }
  1030. }
  1031. }
  1032. method tail(Supply:D: Int(Cool) $number = 1) {
  1033. my int $size = $number;
  1034. supply {
  1035. if $size == 1 {
  1036. my $last;
  1037. whenever self -> \val {
  1038. $last := val;
  1039. LAST emit $last;
  1040. }
  1041. }
  1042. elsif $size > 1 {
  1043. my $lastn := nqp::list;
  1044. my int $index = 0;
  1045. nqp::setelems($lastn,$number); # presize list
  1046. nqp::setelems($lastn,0);
  1047. whenever self -> \val {
  1048. nqp::bindpos($lastn,$index,val);
  1049. $index = ($index + 1) % $size;
  1050. LAST {
  1051. my int $todo = nqp::elems($lastn);
  1052. $index = 0 # start from beginning
  1053. if $todo < $size; # if not a full set
  1054. while $todo {
  1055. emit nqp::atpos($lastn,$index);
  1056. $index = ($index + 1) % $size;
  1057. $todo = $todo - 1;
  1058. }
  1059. }
  1060. }
  1061. }
  1062. else { # number <= 0, needed to keep tap open
  1063. whenever self -> \val { }
  1064. }
  1065. }
  1066. }
  1067. method skip(Supply:D: Int(Cool) $number = 1) {
  1068. supply {
  1069. my int $size = $number + 1;
  1070. my int $skipping = $size > 1;
  1071. whenever self {
  1072. .emit unless $skipping && ($skipping = --$size)
  1073. }
  1074. }
  1075. }
  1076. method min(Supply:D $self: &by = &infix:<cmp>) {
  1077. my &cmp = &by.arity == 2 ?? &by !! { by($^a) cmp by($^b) }
  1078. supply {
  1079. my $min;
  1080. whenever self -> \val {
  1081. if val.defined and !$min.defined || cmp(val,$min) < 0 {
  1082. emit( $min := val );
  1083. }
  1084. }
  1085. }
  1086. }
  1087. method max(Supply:D $self: &by = &infix:<cmp>) {
  1088. my &cmp = &by.arity == 2 ?? &by !! { by($^a) cmp by($^b) }
  1089. supply {
  1090. my $max;
  1091. whenever self -> \val {
  1092. if val.defined and !$max.defined || cmp(val,$max) > 0 {
  1093. emit( $max = val );
  1094. }
  1095. }
  1096. }
  1097. }
  1098. method minmax(Supply:D $self: &by = &infix:<cmp>) {
  1099. my &cmp = &by.arity == 2 ?? &by !! { by($^a) cmp by($^b) }
  1100. supply {
  1101. my $min;
  1102. my $max;
  1103. whenever self -> \val {
  1104. if nqp::istype(val,Failure) {
  1105. val.throw; # XXX or just ignore ???
  1106. }
  1107. elsif val.defined {
  1108. if !$min.defined {
  1109. emit( Range.new($min = val, $max = val) );
  1110. }
  1111. elsif cmp(val,$min) < 0 {
  1112. emit( Range.new( $min = val, $max ) );
  1113. }
  1114. elsif cmp(val,$max) > 0 {
  1115. emit( Range.new( $min, $max = val ) );
  1116. }
  1117. }
  1118. }
  1119. }
  1120. }
  1121. method grab(Supply:D $self: &when_done) {
  1122. supply {
  1123. my @seen;
  1124. whenever self -> \val {
  1125. @seen.push: val;
  1126. LAST {
  1127. emit($_) for when_done(@seen);
  1128. }
  1129. }
  1130. }
  1131. }
  1132. method reverse(Supply:D:) { self.grab( {.reverse} ) }
  1133. multi method sort(Supply:D:) { self.grab( {.sort} ) }
  1134. multi method sort(Supply:D: &by) { self.grab( {.sort(&by)} ) }
  1135. method zip(**@s, :&with) {
  1136. @s.unshift(self) if self.DEFINITE; # add if instance method
  1137. return supply { } unless +@s; # nothing to be done
  1138. X::Supply::Combinator.new(
  1139. combinator => 'zip'
  1140. ).throw unless Rakudo::Internals.ALL_DEFINED_TYPE(@s,Supply);
  1141. return @s[0] if +@s == 1; # nothing to be done
  1142. supply {
  1143. my @values = [] xx +@s;
  1144. for @s.kv -> $index, $supply {
  1145. if &with {
  1146. whenever $supply -> \val {
  1147. @values[$index].push(val);
  1148. emit( [[&with]] @values.map(*.shift) ) if all(@values);
  1149. LAST { done }
  1150. }
  1151. }
  1152. else {
  1153. whenever $supply -> \val {
  1154. @values[$index].push(val);
  1155. emit( $(@values.map(*.shift).list.eager) ) if all(@values);
  1156. LAST { done }
  1157. }
  1158. }
  1159. }
  1160. }
  1161. }
  1162. method zip-latest(**@s, :&with, :$initial ) {
  1163. @s.unshift(self) if self.DEFINITE; # add if instance method
  1164. return supply { } unless +@s; # nothing to do.
  1165. X::Supply::Combinator.new(
  1166. combinator => 'zip-latest'
  1167. ).throw unless Rakudo::Internals.ALL_DEFINED_TYPE(@s,Supply);
  1168. return @s[0] if +@s == 1; # nothing to do.
  1169. supply {
  1170. my @values;
  1171. my $uninitialised = +@s; # how many supplies have yet to emit until we
  1172. # can start emitting, too?
  1173. if $initial {
  1174. @values = @$initial;
  1175. $uninitialised = 0 max $uninitialised - @$initial;
  1176. }
  1177. for @s.kv -> $index, $supply {
  1178. if &with {
  1179. whenever $supply -> \val {
  1180. --$uninitialised
  1181. if $uninitialised > 0 && not @values.EXISTS-POS($index);
  1182. @values[$index] = val;
  1183. emit( [[&with]] @values ) unless $uninitialised;
  1184. }
  1185. }
  1186. else {
  1187. whenever $supply -> \val {
  1188. --$uninitialised
  1189. if $uninitialised > 0 && not @values.EXISTS-POS($index);
  1190. @values[$index] = val;
  1191. emit( @values.List.item ) unless $uninitialised;
  1192. }
  1193. }
  1194. }
  1195. }
  1196. }
  1197. proto method throttle(|) {*}
  1198. multi method throttle(Supply:D $self:
  1199. Int() $elems,
  1200. Real() $seconds,
  1201. Real() $delay = 0,
  1202. :$scheduler = $*SCHEDULER,
  1203. :$control,
  1204. :$status,
  1205. :$bleed,
  1206. :$vent-at,
  1207. ) {
  1208. my $timer = Supply.interval($seconds,$delay,:$scheduler);
  1209. my int $limit = $elems;
  1210. my int $vent = $vent-at if $bleed;
  1211. supply {
  1212. my @buffer;
  1213. my int $allowed = $limit;
  1214. my int $emitted;
  1215. my int $bled;
  1216. my int $done;
  1217. sub emit-status($id --> Nil) {
  1218. $status.emit(
  1219. { :$allowed, :$bled, :buffered(+@buffer),
  1220. :$emitted, :$id, :$limit, :$vent-at } );
  1221. }
  1222. whenever $timer -> \tick {
  1223. if +@buffer -> \buffered {
  1224. my int $todo = buffered > $limit ?? $limit !! buffered;
  1225. emit(@buffer.shift) for ^$todo;
  1226. $emitted = $emitted + $todo;
  1227. $allowed = $limit - $todo;
  1228. }
  1229. else {
  1230. $allowed = $limit;
  1231. }
  1232. if $done && !@buffer {
  1233. done;
  1234. }
  1235. }
  1236. whenever self -> \val {
  1237. if $allowed {
  1238. emit(val);
  1239. $emitted = $emitted + 1;
  1240. $allowed = $allowed - 1;
  1241. }
  1242. elsif $vent && +@buffer >= $vent {
  1243. $bleed.emit(val);
  1244. }
  1245. else {
  1246. @buffer.push(val);
  1247. }
  1248. LAST {
  1249. if $status {
  1250. emit-status("done");
  1251. $status.done;
  1252. }
  1253. if $bleed && @buffer {
  1254. $bleed.emit(@buffer.shift) while @buffer;
  1255. $bleed.done;
  1256. }
  1257. $done = 1;
  1258. }
  1259. }
  1260. if $control {
  1261. whenever $control -> \val {
  1262. my str $type;
  1263. my str $value;
  1264. Rakudo::Internals.KEY_COLON_VALUE(val,$type,$value);
  1265. if $type eq 'limit' {
  1266. my int $extra = $value - $limit;
  1267. $allowed = $extra > 0 || $allowed + $extra >= 0
  1268. ?? $allowed + $extra
  1269. !! 0;
  1270. $limit = $value;
  1271. }
  1272. elsif $type eq 'bleed' && $bleed {
  1273. my int $todo = $value min +@buffer;
  1274. $bleed.emit(@buffer.shift) for ^$todo;
  1275. $bled = $bled + $todo;
  1276. }
  1277. elsif $type eq 'status' && $status {
  1278. emit-status($value);
  1279. }
  1280. elsif $type eq 'vent-at' && $bleed {
  1281. $vent = $value;
  1282. if $vent && +@buffer > $vent {
  1283. $bleed.emit(@buffer.shift)
  1284. until !@buffer || +@buffer == $vent;
  1285. }
  1286. }
  1287. }
  1288. }
  1289. }
  1290. }
  1291. multi method throttle(Supply:D $self:
  1292. Int() $elems,
  1293. Callable:D $process,
  1294. Real() $delay = 0,
  1295. :$scheduler = $*SCHEDULER,
  1296. :$control,
  1297. :$status,
  1298. :$bleed,
  1299. :$vent-at,
  1300. ) {
  1301. sleep $delay if $delay;
  1302. my @buffer;
  1303. my int $limit = $elems;
  1304. my int $allowed = $limit;
  1305. my int $running;
  1306. my int $emitted;
  1307. my int $bled;
  1308. my int $done;
  1309. my int $vent = $vent-at if $bleed;
  1310. my $ready = Supplier::Preserving.new;
  1311. sub start-process(\val --> Nil) {
  1312. my $p = Promise.start( $process, :$scheduler, val );
  1313. $running = $running + 1;
  1314. $allowed = $allowed - 1;
  1315. $p.then: { $ready.emit($p) };
  1316. }
  1317. sub emit-status($id --> Nil) {
  1318. $status.emit(
  1319. { :$allowed, :$bled, :buffered(+@buffer),
  1320. :$emitted, :$id, :$limit, :$running } );
  1321. }
  1322. supply {
  1323. whenever $ready.Supply -> \val { # when a process is ready
  1324. $running = $running - 1;
  1325. $allowed = $allowed + 1;
  1326. emit(val);
  1327. $emitted = $emitted + 1;
  1328. start-process(@buffer.shift) if $allowed > 0 && @buffer;
  1329. if $done && !$running {
  1330. $control.done if $control;
  1331. if $status {
  1332. emit-status("done");
  1333. $status.done;
  1334. }
  1335. if $bleed && @buffer {
  1336. $bleed.emit(@buffer.shift) while @buffer;
  1337. $bleed.done;
  1338. }
  1339. done;
  1340. }
  1341. }
  1342. if $control {
  1343. whenever $control -> \val {
  1344. my str $type;
  1345. my str $value;
  1346. Rakudo::Internals.KEY_COLON_VALUE(val,$type,$value);
  1347. if $type eq 'limit' {
  1348. $allowed = $allowed + $value - $limit;
  1349. $limit = $value;
  1350. start-process(@buffer.shift)
  1351. while $allowed > 0 && @buffer;
  1352. }
  1353. elsif $type eq 'bleed' && $bleed {
  1354. my int $todo = $value min +@buffer;
  1355. $bleed.emit(@buffer.shift) for ^$todo;
  1356. $bled = $bled + $todo;
  1357. }
  1358. elsif $type eq 'status' && $status {
  1359. emit-status($value);
  1360. }
  1361. elsif $type eq 'vent-at' && $bleed {
  1362. $vent = $value;
  1363. if $vent && +@buffer > $vent {
  1364. $bleed.emit(@buffer.shift)
  1365. until !@buffer || +@buffer == $vent;
  1366. }
  1367. }
  1368. }
  1369. }
  1370. whenever self -> \val {
  1371. $allowed > 0
  1372. ?? start-process(val)
  1373. !! $vent && $vent == +@buffer
  1374. ?? $bleed.emit(val)
  1375. !! @buffer.push(val);
  1376. LAST { $done = 1 }
  1377. }
  1378. }
  1379. }
  1380. method share(Supply:D:) {
  1381. my $sup = Supplier.new;
  1382. self.tap:
  1383. -> \msg { $sup.emit(msg) },
  1384. done => -> { $sup.done() },
  1385. quit => -> \ex { $sup.quit(ex) }
  1386. $sup.Supply
  1387. }
  1388. }
  1389. # A Supplier is a convenient way to create a live Supply. The publisher can
  1390. # be used to emit/done/quit. The Supply objects obtained from it will tap into
  1391. # the same live Supply.
  1392. my class Supplier {
  1393. my class TapList does Tappable {
  1394. my class TapListEntry {
  1395. has &.emit;
  1396. has &.done;
  1397. has &.quit;
  1398. }
  1399. # Lock serializes updates to tappers.
  1400. has Lock $!lock = Lock.new;
  1401. # An immutable list of tappers. Always replaced on change, never
  1402. # mutated in-place ==> thread safe together with lock (and only
  1403. # need lock on modification).
  1404. has Mu $!tappers;
  1405. method tap(&emit, &done, &quit, &tap) {
  1406. my $tle := TapListEntry.new(:&emit, :&done, :&quit);
  1407. my $t = Tap.new({
  1408. $!lock.protect({
  1409. my Mu $update := nqp::list();
  1410. for nqp::hllize($!tappers) -> \entry {
  1411. nqp::push($update, entry) unless entry =:= $tle;
  1412. }
  1413. $!tappers := $update;
  1414. });
  1415. });
  1416. tap($t);
  1417. $!lock.protect({
  1418. my Mu $update := nqp::isconcrete($!tappers)
  1419. ?? nqp::clone($!tappers)
  1420. !! nqp::list();
  1421. nqp::push($update, $tle);
  1422. $!tappers := $update;
  1423. });
  1424. $t
  1425. }
  1426. method emit(\value --> Nil) {
  1427. my $snapshot := $!tappers;
  1428. if nqp::isconcrete($snapshot) {
  1429. my int $n = nqp::elems($snapshot);
  1430. loop (my int $i = 0; $i < $n; $i = $i + 1) {
  1431. nqp::atpos($snapshot, $i).emit()(value);
  1432. }
  1433. }
  1434. }
  1435. method done(--> Nil) {
  1436. my $snapshot := $!tappers;
  1437. if nqp::isconcrete($snapshot) {
  1438. my int $n = nqp::elems($snapshot);
  1439. loop (my int $i = 0; $i < $n; $i = $i + 1) {
  1440. nqp::atpos($snapshot, $i).done()();
  1441. }
  1442. }
  1443. }
  1444. method quit($ex --> Nil) {
  1445. my $snapshot := $!tappers;
  1446. if nqp::isconcrete($snapshot) {
  1447. my int $n = nqp::elems($snapshot);
  1448. loop (my int $i = 0; $i < $n; $i = $i + 1) {
  1449. nqp::atpos($snapshot, $i).quit()($ex);
  1450. }
  1451. }
  1452. }
  1453. method live(--> True) { }
  1454. method serial(--> False) { }
  1455. method sane(--> False) { }
  1456. }
  1457. has $!taplist;
  1458. method new() {
  1459. self.bless(taplist => TapList.new)
  1460. }
  1461. submethod BUILD(:$!taplist! --> Nil) { }
  1462. method emit(Supplier:D: Mu \value --> Nil) {
  1463. $!taplist.emit(value);
  1464. }
  1465. method done(Supplier:D: --> Nil) {
  1466. $!taplist.done();
  1467. }
  1468. proto method quit($) {*}
  1469. multi method quit(Supplier:D: Exception $ex) {
  1470. $!taplist.quit($ex);
  1471. }
  1472. multi method quit(Supplier:D: Str() $message) {
  1473. $!taplist.quit(X::AdHoc.new(payload => $message));
  1474. }
  1475. method Supply(Supplier:D:) {
  1476. Supply.new($!taplist).sanitize
  1477. }
  1478. method unsanitized-supply(Supplier:D:) {
  1479. Supply.new($!taplist)
  1480. }
  1481. }
  1482. # A preserving supplier holds on to emitted values and state when nobody is
  1483. # tapping. As soon as there a tap is made, any preserved events will be
  1484. # immediately sent to that tapper.
  1485. my class Supplier::Preserving is Supplier {
  1486. my class PreservingTapList does Tappable {
  1487. my class TapListEntry {
  1488. has &.emit;
  1489. has &.done;
  1490. has &.quit;
  1491. }
  1492. # Lock serializes updates to tappers.
  1493. has Lock $!lock = Lock.new;
  1494. # An immutable list of tappers. Always replaced on change, never
  1495. # mutated in-place ==> thread safe together with lock (and only
  1496. # need lock on modification).
  1497. has Mu $!tappers;
  1498. # Events to reply, whether the replay was done, and a lock to protect
  1499. # updates to these.
  1500. has @!replay;
  1501. has int $!replay-done;
  1502. has $!replay-lock = Lock.new;
  1503. method tap(&emit, &done, &quit, &tap) {
  1504. my $tle := TapListEntry.new(:&emit, :&done, :&quit);
  1505. my int $replay = 0;
  1506. my $t = Tap.new({
  1507. $!lock.protect({
  1508. my Mu $update := nqp::list();
  1509. for nqp::hllize($!tappers) -> \entry {
  1510. nqp::push($update, entry) unless entry =:= $tle;
  1511. }
  1512. $!replay-done = 0 if nqp::elems($update) == 0;
  1513. $!tappers := $update;
  1514. });
  1515. });
  1516. tap($t);
  1517. $!lock.protect({
  1518. my Mu $update := nqp::isconcrete($!tappers)
  1519. ?? nqp::clone($!tappers)
  1520. !! nqp::list();
  1521. nqp::push($update, $tle);
  1522. $replay = 1 if nqp::elems($update) == 1;
  1523. self!replay($tle) if $replay;
  1524. $!tappers := $update;
  1525. });
  1526. $t
  1527. }
  1528. method emit(\value --> Nil) {
  1529. loop {
  1530. my int $sent = 0;
  1531. my $snapshot := $!tappers;
  1532. if nqp::isconcrete($snapshot) {
  1533. $sent = nqp::elems($snapshot);
  1534. loop (my int $i = 0; $i < $sent; $i = $i + 1) {
  1535. nqp::atpos($snapshot, $i).emit()(value);
  1536. }
  1537. }
  1538. return if $sent;
  1539. return if self!add-replay({ $_.emit()(value) });
  1540. }
  1541. }
  1542. method done(--> Nil) {
  1543. loop {
  1544. my int $sent = 0;
  1545. my $snapshot := $!tappers;
  1546. if nqp::isconcrete($snapshot) {
  1547. $sent = nqp::elems($snapshot);
  1548. loop (my int $i = 0; $i < $sent; $i = $i + 1) {
  1549. nqp::atpos($snapshot, $i).done()();
  1550. }
  1551. }
  1552. return if $sent;
  1553. return if self!add-replay({ $_.done()() });
  1554. }
  1555. }
  1556. method quit($ex --> Nil) {
  1557. loop {
  1558. my int $sent = 0;
  1559. my $snapshot := $!tappers;
  1560. if nqp::isconcrete($snapshot) {
  1561. $sent = nqp::elems($snapshot);
  1562. loop (my int $i = 0; $i < $sent; $i = $i + 1) {
  1563. nqp::atpos($snapshot, $i).quit()($ex);
  1564. }
  1565. }
  1566. return if $sent;
  1567. return if self!add-replay({ $_.quit()($ex) });
  1568. }
  1569. }
  1570. method !add-replay(&replay --> Bool) {
  1571. $!replay-lock.protect: {
  1572. if $!replay-done {
  1573. False
  1574. }
  1575. else {
  1576. @!replay.push(&replay);
  1577. True
  1578. }
  1579. }
  1580. }
  1581. method !replay($tle) {
  1582. $!replay-lock.protect: {
  1583. while @!replay.shift -> $rep {
  1584. $rep($tle);
  1585. }
  1586. $!replay-done = 1;
  1587. }
  1588. }
  1589. method live(--> True) { }
  1590. method serial(--> False) { }
  1591. method sane(--> False) { }
  1592. }
  1593. method new() {
  1594. self.bless(taplist => PreservingTapList.new)
  1595. }
  1596. }
  1597. augment class Rakudo::Internals {
  1598. my constant ADD_WHENEVER_PROMPT = Mu.new;
  1599. class CachedAwaitHandle does Awaitable {
  1600. has $.get-await-handle;
  1601. }
  1602. class SupplyBlockAddWheneverAwaiter does Awaiter {
  1603. has $!continuations;
  1604. method await(Awaitable:D $a) {
  1605. my $handle := $a.get-await-handle;
  1606. if $handle.already {
  1607. $handle.success
  1608. ?? $handle.result
  1609. !! $handle.cause.rethrow
  1610. }
  1611. else {
  1612. my $reawaitable := CachedAwaitHandle.new(get-await-handle => $handle);
  1613. $!continuations := nqp::list() unless nqp::isconcrete($!continuations);
  1614. nqp::continuationcontrol(0, ADD_WHENEVER_PROMPT, -> Mu \c {
  1615. nqp::push($!continuations, -> $delegate-awaiter {
  1616. nqp::continuationinvoke(c, {
  1617. $delegate-awaiter.await($reawaitable);
  1618. });
  1619. });
  1620. });
  1621. }
  1622. }
  1623. method await-all(Iterable:D \i) {
  1624. $!continuations := nqp::list() unless nqp::isconcrete($!continuations);
  1625. nqp::continuationcontrol(0, ADD_WHENEVER_PROMPT, -> Mu \c {
  1626. nqp::push($!continuations, -> $delegate-awaiter {
  1627. nqp::continuationinvoke(c, {
  1628. $delegate-awaiter.await-all(i);
  1629. });
  1630. });
  1631. });
  1632. }
  1633. method take-all() {
  1634. if nqp::isconcrete($!continuations) {
  1635. my \result = $!continuations;
  1636. $!continuations := Mu;
  1637. result
  1638. }
  1639. else {
  1640. Empty
  1641. }
  1642. }
  1643. }
  1644. class SupplyBlockState {
  1645. has &.emit;
  1646. has &.done;
  1647. has &.quit;
  1648. has @.close-phasers;
  1649. has $.active;
  1650. has $!lock;
  1651. has %!active-taps;
  1652. has $.run-async-lock;
  1653. has $.awaiter;
  1654. method new(:&emit!, :&done!, :&quit!) {
  1655. self.CREATE!SET-SELF(&emit, &done, &quit)
  1656. }
  1657. method !SET-SELF(&emit, &done, &quit) {
  1658. &!emit := &emit;
  1659. &!done := &done;
  1660. &!quit := &quit;
  1661. $!active = 1;
  1662. $!lock := Lock.new;
  1663. $!run-async-lock := Lock::Async.new;
  1664. $!awaiter := SupplyBlockAddWheneverAwaiter.CREATE;
  1665. self
  1666. }
  1667. method decrement-active() {
  1668. $!lock.protect: { --$!active }
  1669. }
  1670. method get-and-zero-active() {
  1671. $!lock.protect: {
  1672. my $result = $!active;
  1673. $!active = 0;
  1674. $result
  1675. }
  1676. }
  1677. method add-active-tap($tap --> Nil) {
  1678. $!lock.protect: {
  1679. ++$!active;
  1680. %!active-taps{nqp::objectid($tap)} = $tap;
  1681. }
  1682. }
  1683. method delete-active-tap($tap --> Nil) {
  1684. $!lock.protect: {
  1685. %!active-taps{nqp::objectid($tap)}:delete;
  1686. }
  1687. }
  1688. method teardown(--> Nil) {
  1689. my $to-close := nqp::create(IterationBuffer);
  1690. $!lock.protect: {
  1691. %!active-taps.values.iterator.push-all($to-close);
  1692. %!active-taps = ();
  1693. $!active = 0;
  1694. }
  1695. my int $n = nqp::elems($to-close);
  1696. loop (my int $i = 0; $i < $n; $i++) {
  1697. nqp::atpos($to-close, $i).close();
  1698. }
  1699. my @close-phasers := @!close-phasers;
  1700. while @close-phasers {
  1701. @close-phasers.pop()();
  1702. }
  1703. }
  1704. method run-emit(--> Nil) {
  1705. if $!active {
  1706. my \ex := nqp::exception();
  1707. my $emit-handler := &!emit;
  1708. $emit-handler(nqp::getpayload(ex)) if $emit-handler.DEFINITE;
  1709. nqp::resume(ex)
  1710. }
  1711. }
  1712. method run-done(--> Nil) {
  1713. self.get-and-zero-active();
  1714. self.teardown();
  1715. my $done-handler := &!done;
  1716. $done-handler() if $done-handler.DEFINITE;
  1717. }
  1718. method run-catch(--> Nil) {
  1719. my \ex = EXCEPTION(nqp::exception());
  1720. self.get-and-zero-active();
  1721. self.teardown();
  1722. my $quit-handler = &!quit;
  1723. $quit-handler(ex) if $quit-handler;
  1724. }
  1725. }
  1726. class SupplyBlockTappable does Tappable {
  1727. has &!block;
  1728. submethod BUILD(:&!block --> Nil) { }
  1729. method tap(&emit, &done, &quit, &tap) {
  1730. # Create state for this tapping.
  1731. my $state := Rakudo::Internals::SupplyBlockState.new(:&emit, :&done, :&quit);
  1732. # Placed here so it can close over $state, but we only need to
  1733. # closure-clone it once per Supply block, not once per whenever.
  1734. sub add-whenever($supply, &whenever-block) {
  1735. my $tap;
  1736. $state.run-async-lock.with-lock-hidden-from-recursion-check: {
  1737. my $*AWAITER := $state.awaiter;
  1738. nqp::continuationreset(ADD_WHENEVER_PROMPT, {
  1739. $supply.tap(
  1740. tap => {
  1741. $tap := $_;
  1742. $state.add-active-tap($tap);
  1743. },
  1744. -> \value {
  1745. self!run-supply-code(&whenever-block, value, $state,
  1746. &add-whenever)
  1747. },
  1748. done => {
  1749. $state.delete-active-tap($tap);
  1750. my @phasers := &whenever-block.phasers('LAST');
  1751. if @phasers {
  1752. self!run-supply-code({ .() for @phasers }, Nil, $state,
  1753. &add-whenever)
  1754. }
  1755. $tap.close;
  1756. self!deactivate-one($state);
  1757. },
  1758. quit => -> \ex {
  1759. $state.delete-active-tap($tap);
  1760. my $handled := False;
  1761. self!run-supply-code({
  1762. my $phaser := &whenever-block.phasers('QUIT')[0];
  1763. if $phaser.DEFINITE {
  1764. $handled := $phaser(ex) === Nil;
  1765. }
  1766. if !$handled && $state.get-and-zero-active() {
  1767. $state.quit().(ex) if $state.quit;
  1768. $state.teardown();
  1769. }
  1770. }, Nil, $state, &add-whenever);
  1771. if $handled {
  1772. $tap.close;
  1773. self!deactivate-one($state);
  1774. }
  1775. });
  1776. });
  1777. }
  1778. $tap
  1779. }
  1780. # Stash away any CLOSE phasers.
  1781. if nqp::istype(&!block, Block) {
  1782. $state.close-phasers.append(&!block.phasers('CLOSE'));
  1783. }
  1784. # Create and pass on tap; when closed, tear down the state and all
  1785. # of our subscriptions.
  1786. my $t := Tap.new(-> { $state.teardown() });
  1787. tap($t);
  1788. # Run the Supply block, then decrease active count afterwards (it
  1789. # counts as an active runner).
  1790. self!run-supply-code:
  1791. { &!block(); self!deactivate-one-internal($state) },
  1792. Nil, $state, &add-whenever;
  1793. # Evaluate to the Tap.
  1794. $t
  1795. }
  1796. method !run-supply-code(&code, \value, SupplyBlockState $state, &add-whenever) {
  1797. my @run-after;
  1798. my $queued := $state.run-async-lock.protect-or-queue-on-recursion: {
  1799. my &*ADD-WHENEVER := &add-whenever;
  1800. $state.active > 0 and nqp::handle(code(value),
  1801. 'EMIT', $state.run-emit(),
  1802. 'DONE', $state.run-done(),
  1803. 'CATCH', $state.run-catch(),
  1804. 'NEXT', 0);
  1805. @run-after = $state.awaiter.take-all;
  1806. }
  1807. if $queued.defined {
  1808. $queued.then({ self!run-add-whenever-awaits(@run-after) });
  1809. }
  1810. else {
  1811. self!run-add-whenever-awaits(@run-after);
  1812. }
  1813. }
  1814. method !run-add-whenever-awaits(@run-after --> Nil) {
  1815. if @run-after {
  1816. my $nested-awaiter := SupplyBlockAddWheneverAwaiter.CREATE;
  1817. my $delegate-awaiter := $*AWAITER;
  1818. while @run-after.elems {
  1819. my $*AWAITER := $nested-awaiter;
  1820. nqp::continuationreset(ADD_WHENEVER_PROMPT, {
  1821. @run-after.shift()($delegate-awaiter);
  1822. });
  1823. @run-after.append($nested-awaiter.take-all);
  1824. }
  1825. }
  1826. }
  1827. method !deactivate-one(SupplyBlockState $state) {
  1828. $state.run-async-lock.protect-or-queue-on-recursion:
  1829. { self!deactivate-one-internal($state) };
  1830. }
  1831. method !deactivate-one-internal(SupplyBlockState $state) {
  1832. if $state.decrement-active() == 0 {
  1833. my $done-handler := $state.done;
  1834. $done-handler() if $done-handler;
  1835. $state.teardown();
  1836. }
  1837. }
  1838. method live(--> False) { }
  1839. method sane(--> True) { }
  1840. method serial(--> True) { }
  1841. }
  1842. class SupplyOneWheneverState {
  1843. has &.emit;
  1844. has &.done;
  1845. has &.quit;
  1846. has @.close-phasers;
  1847. has $.tap is rw;
  1848. has $.active;
  1849. method new(:&emit!, :&done!, :&quit!) {
  1850. self.CREATE!SET-SELF(&emit, &done, &quit)
  1851. }
  1852. method !SET-SELF(&emit, &done, &quit) {
  1853. &!emit := &emit;
  1854. &!done := &done;
  1855. &!quit := &quit;
  1856. $!active = 1;
  1857. self
  1858. }
  1859. method teardown(--> Nil) {
  1860. $!active = 0;
  1861. $!tap.close if $!tap;
  1862. my @close-phasers := @!close-phasers;
  1863. while @close-phasers {
  1864. @close-phasers.pop()();
  1865. }
  1866. }
  1867. method run-emit(--> Nil) {
  1868. if $!active {
  1869. my \ex := nqp::exception();
  1870. my $emit-handler := &!emit;
  1871. $emit-handler(nqp::getpayload(ex)) if $emit-handler.DEFINITE;
  1872. nqp::resume(ex)
  1873. }
  1874. }
  1875. method run-done(--> Nil) {
  1876. if $!active {
  1877. self.teardown();
  1878. my $done-handler := &!done;
  1879. $done-handler() if $done-handler.DEFINITE;
  1880. }
  1881. }
  1882. method run-catch(--> Nil) {
  1883. if $!active {
  1884. my \ex = EXCEPTION(nqp::exception());
  1885. self.teardown();
  1886. my $quit-handler = &!quit;
  1887. $quit-handler(ex) if $quit-handler;
  1888. }
  1889. }
  1890. }
  1891. class SupplyOneWheneverTappable does Tappable {
  1892. has &!block;
  1893. submethod BUILD(:&!block --> Nil) { }
  1894. method tap(&emit, &done, &quit, &tap) {
  1895. # Create state for this tapping.
  1896. my $state := Rakudo::Internals::SupplyOneWheneverState.new(:&emit, :&done, :&quit);
  1897. # We only expcet one whenever; detect getting a second and complain.
  1898. my $*WHENEVER-SUPPLY-TO-ADD := Nil;
  1899. my &*WHENEVER-BLOCK-TO-ADD := Nil;
  1900. sub add-whenever(\the-supply, \the-whenever-block) {
  1901. if $*WHENEVER-SUPPLY-TO-ADD =:= Nil {
  1902. $*WHENEVER-SUPPLY-TO-ADD := the-supply;
  1903. &*WHENEVER-BLOCK-TO-ADD := the-whenever-block;
  1904. }
  1905. else {
  1906. die "Single whenever block special case tried to add second whenever";
  1907. }
  1908. }
  1909. # Stash away any CLOSE phasers.
  1910. if nqp::istype(&!block, Block) {
  1911. $state.close-phasers.append(&!block.phasers('CLOSE'));
  1912. }
  1913. # Create and pass on tap; when closed, tear down the state and all
  1914. # of our subscriptions.
  1915. my $t := Tap.new(-> { $state.teardown() });
  1916. tap($t);
  1917. # Run the Supply block. Only proceed if it didn't send done/quit.
  1918. self!run-supply-code: { &!block() }, Nil, $state, &add-whenever;
  1919. if $state.active {
  1920. # If we didn't get a whenever, something is badly wrong.
  1921. if $*WHENEVER-SUPPLY-TO-ADD =:= Nil {
  1922. die "Single whenever block special case did not get a whenever block";
  1923. }
  1924. # Otherwise, we can now tap that whenever block. Since it is the
  1925. # only one, and we know from compile-time analysis it is the last
  1926. # thing in the block, then it's safe to do it now the block is
  1927. # completed and without any concurrency control. However, we do
  1928. # call .sanitize just in case, to ensure that we have a serial and
  1929. # protocol-following Supply. That is enough.
  1930. my $supply := $*WHENEVER-SUPPLY-TO-ADD.sanitize;
  1931. my &whenever-block := &*WHENEVER-BLOCK-TO-ADD;
  1932. my $tap;
  1933. $supply.tap(
  1934. tap => {
  1935. $tap := $_;
  1936. $state.tap = $tap;
  1937. },
  1938. -> \value {
  1939. self!run-supply-code(&whenever-block, value, $state,
  1940. &add-whenever)
  1941. },
  1942. done => {
  1943. my @phasers := &whenever-block.phasers('LAST');
  1944. if @phasers {
  1945. self!run-supply-code({ .() for @phasers }, Nil, $state,
  1946. &add-whenever)
  1947. }
  1948. $tap.close;
  1949. $state.run-done();
  1950. },
  1951. quit => -> \ex {
  1952. my $handled := False;
  1953. self!run-supply-code({
  1954. my $phaser := &whenever-block.phasers('QUIT')[0];
  1955. if $phaser.DEFINITE {
  1956. $handled := $phaser(ex) === Nil;
  1957. }
  1958. if !$handled {
  1959. $state.quit().(ex) if $state.quit;
  1960. $state.teardown();
  1961. }
  1962. }, Nil, $state, &add-whenever);
  1963. if $handled {
  1964. $tap.close;
  1965. $state.run-done();
  1966. }
  1967. });
  1968. }
  1969. # Evaluate to the Tap.
  1970. $t
  1971. }
  1972. method !run-supply-code(&code, \value, SupplyOneWheneverState $state, &add-whenever) {
  1973. my &*ADD-WHENEVER := &add-whenever;
  1974. {
  1975. $state.active > 0 and nqp::handle(code(value),
  1976. 'EMIT', $state.run-emit(),
  1977. 'DONE', $state.run-done(),
  1978. 'CATCH', $state.run-catch(),
  1979. 'NEXT', 0);
  1980. }(); # XXX Workaround for optimizer bug
  1981. }
  1982. method live(--> False) { }
  1983. method sane(--> True) { }
  1984. method serial(--> True) { }
  1985. }
  1986. class OneEmitTappable does Tappable {
  1987. has &!block;
  1988. submethod BUILD(:&!block! --> Nil) {}
  1989. method tap(&emit, &done, &quit, &tap) {
  1990. my $t := Tap.new;
  1991. tap($t);
  1992. try {
  1993. emit(&!block());
  1994. done();
  1995. CATCH {
  1996. default {
  1997. quit($_);
  1998. }
  1999. }
  2000. }
  2001. $t
  2002. }
  2003. method live(--> False) { }
  2004. method sane(--> True) { }
  2005. method serial(--> True) { }
  2006. }
  2007. }
  2008. sub SUPPLY(&block) {
  2009. Supply.new(Rakudo::Internals::SupplyBlockTappable.new(:&block))
  2010. }
  2011. sub WHENEVER(Supply() $supply, &block) {
  2012. my \adder = nqp::getlexdyn('&*ADD-WHENEVER');
  2013. nqp::isnull(adder)
  2014. ?? X::WheneverOutOfScope.new.throw
  2015. !! adder.($supply, &block)
  2016. }
  2017. sub REACT(&block) {
  2018. my $s := SUPPLY(&block);
  2019. my $p := Promise.new;
  2020. $s.tap(
  2021. { warn "Useless use of emit in react" },
  2022. done => { $p.keep(Nil) },
  2023. quit => { $p.break($_) });
  2024. await $p;
  2025. }
  2026. sub SUPPLY-ONE-EMIT(&block) {
  2027. Supply.new(Rakudo::Internals::OneEmitTappable.new(:&block))
  2028. }
  2029. sub SUPPLY-ONE-WHENEVER(&block) {
  2030. Supply.new(Rakudo::Internals::SupplyOneWheneverTappable.new(:&block))
  2031. }
  2032. sub REACT-ONE-WHENEVER(&block) {
  2033. my $s := SUPPLY-ONE-WHENEVER(&block);
  2034. my $p := Promise.new;
  2035. $s.tap(
  2036. { warn "Useless use of emit in react" },
  2037. done => { $p.keep(Nil) },
  2038. quit => { $p.break($_) });
  2039. await $p;
  2040. }