1. # Work stages are individual steps in a hyper/race pipeline. They are chained
  2. # in a linked list by the source attribute. Roles for different kinds of stages
  3. # follow.
  4. my role Rakudo::Internals::HyperWorkStage {
  5. has Rakudo::Internals::HyperWorkStage $.source;
  6. }
  7. # A HyperBatcher stage produces batches of work to do. It will typically be
  8. # created with an Iterable of some kind, and divide up the work into batches
  9. # of the appropriate size. Such a stage always lives at the start of a piece
  10. # of parallel processing pipeline.
  11. my role Rakudo::Internals::HyperBatcher does Rakudo::Internals::HyperWorkStage {
  12. method produce-batch(int $batch-size --> Rakudo::Internals::HyperWorkBatch) { ... }
  13. }
  14. # A HyperProcessor performs some operation in a work batch, updating it to
  15. # reflect the results of the operation.
  16. my role Rakudo::Internals::HyperProcessor does Rakudo::Internals::HyperWorkStage {
  17. method process-batch(Rakudo::Internals::HyperWorkBatch $batch --> Nil) { ... }
  18. }
  19. # A HyperRebatcher is given batches, and may produce zero or more batches as a
  20. # result. The produced batches will be passed on to the next pipeline stages.
  21. # This is intended only for steps that need to look across multiple batches,
  22. # but that work in a "streaming" way rather than being a full bottleneck in
  23. # the pipeline. A HyperRebatcher should produce one output batch for each
  24. # input batch it gets (though may produce no batches on one call, and two on
  25. # the next, for example).
  26. my role Rakudo::Internals::HyperRebatcher does Rakudo::Internals::HyperWorkStage {
  27. method rebatch(Rakudo::Internals::HyperWorkBatch $batch --> List) { ... }
  28. }
  29. # Comes at the end of a pipeline, or a stage in a multi-stage pipeline (that
  30. # is, one with a step in it where all results are needed). The batch-used
  31. # method should be called whenever a batch passed to consume-batch has been
  32. # used. This allows for backpressure control: a sequential iterator at the
  33. # end of a parallel pipeline can choose to call batch-used only at the point
  34. # when the downstream iterator has actually eaten all the values in a batch.
  35. my role Rakudo::Internals::HyperJoiner does Rakudo::Internals::HyperWorkStage {
  36. has $!batch-used-channel;
  37. method consume-batch(Rakudo::Internals::HyperWorkBatch $batch --> Nil) { ... }
  38. method consume-error(Exception \e) { ... }
  39. method batch-used(--> Nil) {
  40. $!batch-used-channel.send(True);
  41. }
  42. method SET-BATCH-USED-CHANNEL($!batch-used-channel) {}
  43. }