1. my role Awaiter {
  2. method await(Awaitable:D $a) { ... }
  3. method await-all(Iterable:D $i) { ... }
  4. }
  5. my class Awaiter::Blocking does Awaiter {
  6. method await(Awaitable:D $a) {
  7. my $handle := $a.get-await-handle;
  8. if $handle.already {
  9. $handle.success
  10. ?? $handle.result
  11. !! $handle.cause.rethrow
  12. }
  13. else {
  14. my $s = Semaphore.new(0);
  15. my $success;
  16. my $result;
  17. $handle.subscribe-awaiter(-> \success, \result {
  18. $success := success;
  19. $result := result;
  20. $s.release;
  21. });
  22. $s.acquire;
  23. $success
  24. ?? $result
  25. !! $result.rethrow
  26. }
  27. }
  28. method await-all(Iterable:D \i) {
  29. # Collect results that are already available, and handles where the
  30. # results are not yet available together with the matching insertion
  31. # indices.
  32. my \results = nqp::list();
  33. my \handles = nqp::list();
  34. my \indices = nqp::list_i();
  35. my int $insert = 0;
  36. my $saw-slip = False;
  37. for i -> $awaitable {
  38. unless nqp::istype($awaitable, Awaitable) {
  39. die "Can only specify Awaitable objects to await (got a $awaitable.^name())";
  40. }
  41. unless nqp::isconcrete($awaitable) {
  42. die "Must specify a defined Awaitable to await (got an undefined $awaitable.^name())";
  43. }
  44. my $handle := $awaitable.get-await-handle;
  45. if $handle.already {
  46. if $handle.success {
  47. my \result = $handle.result;
  48. nqp::bindpos(results, $insert, result);
  49. $saw-slip = True if nqp::istype(result, Slip);
  50. }
  51. else {
  52. $handle.cause.rethrow
  53. }
  54. }
  55. else {
  56. nqp::push(handles, $handle);
  57. nqp::push_i(indices, $insert);
  58. }
  59. ++$insert;
  60. }
  61. # See if we have anything that we need to really block on. If so, we
  62. # use a lock and condition variable to handle the blocking. The lock
  63. # protects writes into the array.
  64. my int $num-handles = nqp::elems(handles);
  65. if $num-handles {
  66. my $exception = Mu;
  67. my $l = Lock.new;
  68. my $ready = $l.condition();
  69. my int $remaining = $num-handles;
  70. loop (my int $i = 0; $i < $num-handles; ++$i) {
  71. my $handle := nqp::atpos(handles, $i);
  72. my int $insert = nqp::atpos_i(indices, $i);
  73. $handle.subscribe-awaiter(-> \success, \result {
  74. $l.protect: {
  75. if success && $remaining {
  76. nqp::bindpos(results, $insert, result);
  77. $saw-slip = True if nqp::istype(result, Slip);
  78. --$remaining;
  79. $ready.signal unless $remaining;
  80. }
  81. elsif !nqp::isconcrete($exception) {
  82. $exception := result;
  83. $remaining = 0;
  84. $ready.signal;
  85. }
  86. }
  87. });
  88. }
  89. # Block until remaining is 0 (need the loop to cope with suprious
  90. # wakeups).
  91. loop {
  92. $l.protect: {
  93. last if $remaining == 0;
  94. $ready.wait;
  95. }
  96. }
  97. # If we got an exception, throw it.
  98. $exception.rethrow if nqp::isconcrete($exception);
  99. }
  100. my \result-list = nqp::p6bindattrinvres(nqp::create(List), List, '$!reified', results);
  101. $saw-slip ?? result-list.map(-> \val { val }).List !! result-list
  102. }
  103. }
  104. PROCESS::<$AWAITER> := Awaiter::Blocking;