1. # Implementations shared between HyperSeq and RaceSeq.
  2. class Rakudo::Internals::HyperRaceSharedImpl {
  3. my class Grep does Rakudo::Internals::HyperProcessor {
  4. has $!matcher;
  5. submethod TWEAK(:$!matcher) {}
  6. method process-batch(Rakudo::Internals::HyperWorkBatch $batch) {
  7. my $result := IterationBuffer.new;
  8. my $items := $batch.items;
  9. my int $n = $items.elems;
  10. my \matcher := nqp::istype($!matcher, Callable)
  11. ?? $!matcher.clone !! $!matcher;
  12. if nqp::istype(matcher, Callable) && ! nqp::istype(matcher, Regex) {
  13. loop (my int $i = 0; $i < $n; ++$i) {
  14. my \item := nqp::atpos($items, $i);
  15. $result.push(item) if matcher.(item);
  16. }
  17. }
  18. else {
  19. loop (my int $i = 0; $i < $n; ++$i) {
  20. my \item := nqp::atpos($items, $i);
  21. $result.push(item) if matcher.ACCEPTS(item);
  22. }
  23. }
  24. $batch.replace-with($result);
  25. }
  26. }
  27. multi method grep(\hyper, $source, \matcher, %options) {
  28. if %options || nqp::istype(matcher, Code) && matcher.count > 1 {
  29. # Fall back to sequential grep for cases we can't yet handle
  30. self.rehyper(hyper, hyper.Any::grep(matcher, |%options))
  31. }
  32. elsif nqp::istype(matcher,Block) && matcher.has-phasers {
  33. X::NYI.new(feature => 'Phasers in hyper/race').throw
  34. }
  35. else {
  36. hyper.bless:
  37. configuration => hyper.configuration,
  38. work-stage-head => Grep.new(:$source, :matcher(matcher))
  39. }
  40. }
  41. my class Map does Rakudo::Internals::HyperProcessor {
  42. has &!mapper;
  43. submethod TWEAK(:&!mapper) {}
  44. method process-batch(Rakudo::Internals::HyperWorkBatch $batch) {
  45. my $result := IterationBuffer.new;
  46. my $items := $batch.items;
  47. my int $n = $items.elems;
  48. my &mapper := &!mapper.clone;
  49. loop (my int $i = 0; $i < $n; ++$i) {
  50. my \mapped = mapper(nqp::atpos($items, $i));
  51. nqp::istype(mapped, Slip) && !nqp::iscont(mapped)
  52. ?? mapped.iterator.push-all($result)
  53. !! $result.push(mapped)
  54. }
  55. $batch.replace-with($result);
  56. }
  57. }
  58. multi method map(\hyper, $source, &mapper, %options) {
  59. X::NYI.new(feature => 'Phasers in hyper/race').throw
  60. if nqp::istype(&mapper,Block) && &mapper.has-phasers;
  61. if %options || &mapper.count > 1 {
  62. # Fall back to sequential map for cases we can't yet handle
  63. self.rehyper(hyper, hyper.Any::map(&mapper, |%options))
  64. }
  65. else {
  66. hyper.bless:
  67. configuration => hyper.configuration,
  68. work-stage-head => Map.new(:$source, :&mapper)
  69. }
  70. }
  71. my class Sink does Rakudo::Internals::HyperJoiner {
  72. has Promise $.complete .= new;
  73. has int $!last-target = -1;
  74. has int $!batches-seen = 0;
  75. method consume-batch(Rakudo::Internals::HyperWorkBatch $batch --> Nil) {
  76. ++$!batches-seen;
  77. self.batch-used();
  78. if $batch.last {
  79. $!last-target = $batch.sequence-number;
  80. }
  81. if $!last-target >= 0 && $!batches-seen == $!last-target + 1 {
  82. $!complete.keep(True);
  83. }
  84. }
  85. method consume-error(Exception $e --> Nil) {
  86. $!complete.break($e);
  87. }
  88. }
  89. method sink(\hyper, $source --> Nil) {
  90. if hyper.DEFINITE {
  91. my $sink = Sink.new(:$source);
  92. Rakudo::Internals::HyperPipeline.start($sink, hyper.configuration);
  93. $*AWAITER.await($sink.complete);
  94. CATCH {
  95. unless nqp::istype($_, X::HyperRace::Died) {
  96. ($_ but X::HyperRace::Died(Backtrace.new(5))).rethrow
  97. }
  98. }
  99. }
  100. }
  101. proto method rehyper($, $) {*}
  102. multi method rehyper(HyperSeq \hyper, \seq) {
  103. my \conf = hyper.configuration;
  104. seq.hyper(:degree(conf.degree), :batch(conf.batch))
  105. }
  106. multi method rehyper(RaceSeq \hyper, \seq) {
  107. my \conf = hyper.configuration;
  108. seq.race(:degree(conf.degree), :batch(conf.batch))
  109. }
  110. }