1. my class Backtrace { ... }
  2. my role X::HyperRace::Died {
  3. has $.start-backtrace;
  4. multi method gist(::?CLASS:D:) {
  5. "A worker in a parallel iteration (hyper or race) initiated here:\n" ~
  6. ((try $!start-backtrace ~ "\n") // '<unknown location>') ~
  7. "Died at:\n" ~
  8. callsame().indent(4)
  9. }
  10. }
  11. my class Rakudo::Internals::HyperToIterator does Rakudo::Internals::HyperJoiner does Iterator {
  12. has int $!seen-last;
  13. has int $!offset;
  14. has $!batches;
  15. has $!waiting;
  16. has $!current-items;
  17. my constant EMPTY_BUFFER = nqp::create(IterationBuffer);
  18. submethod TWEAK() {
  19. $!batches := Channel.new;
  20. $!waiting := nqp::list;
  21. $!current-items := EMPTY_BUFFER;
  22. }
  23. method consume-batch(Rakudo::Internals::HyperWorkBatch $batch --> Nil) {
  24. nqp::stmts(
  25. nqp::bindpos( # store the batch at its place
  26. $!waiting,
  27. nqp::sub_i($batch.sequence-number,$!offset),
  28. $batch
  29. ),
  30. nqp::until( # feed valid batches in order
  31. nqp::isnull(nqp::atpos($!waiting,0)),
  32. nqp::stmts(
  33. $!batches.send(nqp::shift($!waiting)),
  34. ($!offset = nqp::add_i($!offset,1))
  35. )
  36. ),
  37. nqp::if( # set flag we've seen last one
  38. $batch.last,
  39. ($!seen-last = 1)
  40. ),
  41. nqp::if( # close channel if we're done
  42. $!seen-last && nqp::not_i(nqp::elems($!waiting)),
  43. $!batches.close
  44. )
  45. )
  46. }
  47. method consume-error(Exception $e --> Nil) {
  48. $!batches.fail($e);
  49. }
  50. method pull-one() is raw {
  51. until nqp::elems($!current-items) { # handles empty batches
  52. $!current-items := $!batches.receive.items;
  53. self.batch-used();
  54. CATCH {
  55. when X::Channel::ReceiveOnClosed {
  56. return IterationEnd;
  57. }
  58. default {
  59. ($_ but X::HyperRace::Died(Backtrace.new(5))).rethrow
  60. unless nqp::istype($_, X::HyperRace::Died);
  61. }
  62. }
  63. }
  64. nqp::shift($!current-items)
  65. }
  66. method skip-at-least(int $skipping) {
  67. my int $toskip = $skipping;
  68. loop {
  69. if nqp::isge_i(nqp::elems($!current-items),$toskip) {
  70. nqp::splice($!current-items,EMPTY_BUFFER,0,$toskip);
  71. return 1;
  72. }
  73. $toskip = nqp::sub_i($toskip,nqp::elems($!current-items));
  74. $!current-items := $!batches.receive.items;
  75. self.batch-used();
  76. CATCH {
  77. when X::Channel::ReceiveOnClosed {
  78. return 0;
  79. }
  80. default {
  81. ($_ but X::HyperRace::Died(Backtrace.new(5))).rethrow
  82. unless nqp::istype($_, X::HyperRace::Died);
  83. }
  84. }
  85. }
  86. }
  87. method push-exactly($target, int $pushing) {
  88. my int $topush = $pushing;
  89. my int $pushed = 0;
  90. loop {
  91. if nqp::isge_i(nqp::elems($!current-items),$topush) {
  92. $target.append(
  93. nqp::clone(nqp::setelems($!current-items,$topush))
  94. );
  95. nqp::splice($!current-items,EMPTY_BUFFER,0,$topush);
  96. return $pushed + $topush;
  97. }
  98. $target.append($!current-items);
  99. $pushed = $pushed + nqp::elems($!current-items);
  100. $!current-items := $!batches.receive.items;
  101. self.batch-used();
  102. CATCH {
  103. when X::Channel::ReceiveOnClosed {
  104. return IterationEnd;
  105. }
  106. default {
  107. ($_ but X::HyperRace::Died(Backtrace.new(5))).rethrow
  108. unless nqp::istype($_, X::HyperRace::Died);
  109. }
  110. }
  111. }
  112. }
  113. }