1. # Takes a linked list of pipeline stages and assembles them into a pipeline.
  2. # Given a pipeline must end with a HyperJoiner, it expects to be passed
  3. # something of this type.
  4. my class Rakudo::Internals::HyperPipeline {
  5. method start(Rakudo::Internals::HyperJoiner $stage, HyperConfiguration $config) {
  6. # Create channel that the last non-join operation in the pipeline will
  7. # put its results into, and start a worker to handle the channel.
  8. my $cur-dest-channel = Channel.new;
  9. self!join-worker($stage, $cur-dest-channel);
  10. # Create a channel that will signal we're ready for more batches,
  11. # and set join stage to send on it when batch-used is called.
  12. my $ready-channel = Channel.new;
  13. $stage.SET-BATCH-USED-CHANNEL($ready-channel);
  14. # Go through the rest of the stages.
  15. my $cur-stage = $stage.source;
  16. my @processors;
  17. while $cur-stage {
  18. my $next-stage = $cur-stage.source;
  19. given $cur-stage {
  20. when Rakudo::Internals::HyperProcessor {
  21. # Unshift them so a sequence will be in application order.
  22. unshift @processors, $_;
  23. }
  24. when Rakudo::Internals::HyperBatcher {
  25. if $next-stage {
  26. die "A HyperBatcher may only be at the pipeline start";
  27. }
  28. $cur-dest-channel = self!maybe-processor-workers:
  29. [@processors], $cur-dest-channel, $config.degree;
  30. @processors = ();
  31. self!batch-worker($cur-stage, $cur-dest-channel, $ready-channel,
  32. $config.batch);
  33. }
  34. default {
  35. die "Unrecognized hyper pipeline stage " ~ .^name();
  36. }
  37. }
  38. $cur-stage = $next-stage;
  39. }
  40. # Set off $degree batches.
  41. $ready-channel.send(True) for ^$config.degree;
  42. }
  43. method !batch-worker(Rakudo::Internals::HyperBatcher $stage, Channel $dest-channel,
  44. Channel $ready-channel, int $size) {
  45. start {
  46. my $AWAITER := $*AWAITER;
  47. loop {
  48. $AWAITER.await($ready-channel);
  49. my $batch := $stage.produce-batch($size);
  50. $dest-channel.send($batch);
  51. last if $batch.last;
  52. CATCH {
  53. default {
  54. $dest-channel.fail($_);
  55. }
  56. }
  57. }
  58. }
  59. }
  60. method !maybe-processor-workers(@processors, Channel $dest-channel, Int:D $degree) {
  61. return $dest-channel unless @processors;
  62. my $source-channel := Channel.new;
  63. for ^$degree {
  64. start {
  65. my $AWAITER := $*AWAITER;
  66. loop {
  67. my $batch := $AWAITER.await($source-channel);
  68. for @processors {
  69. .process-batch($batch);
  70. }
  71. $dest-channel.send($batch);
  72. }
  73. CATCH {
  74. when X::Channel::ReceiveOnClosed {
  75. $dest-channel.close;
  76. }
  77. default {
  78. $dest-channel.fail($_);
  79. }
  80. }
  81. }
  82. }
  83. return $source-channel;
  84. }
  85. method !join-worker(Rakudo::Internals::HyperJoiner $stage, Channel $source) {
  86. start {
  87. my $AWAITER := $*AWAITER;
  88. loop {
  89. $stage.consume-batch($AWAITER.await($source));
  90. }
  91. CATCH {
  92. when X::Channel::ReceiveOnClosed {
  93. # We got everything; quietly exit the start block.
  94. }
  95. default {
  96. $stage.consume-error($_);
  97. CATCH {
  98. default {
  99. # Error handling code blew up; let the scheduler's
  100. # error handler do it, which will typically bring
  101. # the program down. Should never get here unless
  102. # we've some bug in a joiner implementation.
  103. $*SCHEDULER.handle_uncaught($_);
  104. }
  105. }
  106. }
  107. }
  108. }
  109. }
  110. }