1. my class Rakudo::Internals::RaceToIterator does Rakudo::Internals::HyperJoiner does Iterator {
  2. has Channel $.batches .= new;
  3. has int $!last-target = -1;
  4. has int $!batches-seen = 0;
  5. method consume-batch(Rakudo::Internals::HyperWorkBatch $batch --> Nil) {
  6. $!batches.send($batch);
  7. ++$!batches-seen;
  8. if $batch.last {
  9. $!last-target = $batch.sequence-number;
  10. }
  11. if $!last-target >= 0 && $!batches-seen == $!last-target + 1 {
  12. $!batches.close;
  13. }
  14. }
  15. method consume-error(Exception $e --> Nil) {
  16. $!batches.fail($e);
  17. }
  18. my constant EMPTY_BUFFER = IterationBuffer.CREATE;
  19. has IterationBuffer $!current-items = EMPTY_BUFFER;
  20. method pull-one() {
  21. until nqp::elems(nqp::decont($!current-items)) { # Handles empty batches
  22. my $batch = $!batches.receive;
  23. self.batch-used();
  24. $!current-items = $batch.items;
  25. CATCH {
  26. when X::Channel::ReceiveOnClosed {
  27. return IterationEnd;
  28. }
  29. default {
  30. unless nqp::istype($_, X::HyperRace::Died) {
  31. ($_ but X::HyperRace::Died(Backtrace.new(5))).rethrow
  32. }
  33. }
  34. }
  35. }
  36. nqp::shift(nqp::decont($!current-items))
  37. }
  38. }