1. # A promise is a synchronization mechanism for a piece of work that will
  2. # produce a single result (keeping the promise) or fail (breaking the
  3. # promise).
  4. my enum PromiseStatus (:Planned(0), :Kept(1), :Broken(2));
  5. my class X::Promise::Combinator is Exception {
  6. has $.combinator;
  7. method message() { "Can only use $!combinator to combine defined Promise objects" }
  8. }
  9. my class X::Promise::CauseOnlyValidOnBroken is Exception {
  10. has $.promise;
  11. has $.status;
  12. method message() { "Can only call cause on a broken promise (status: $.status)" }
  13. }
  14. my class X::Promise::Vowed is Exception {
  15. has $.promise;
  16. method message() { "Access denied to keep/break this Promise; already vowed" }
  17. }
  18. my role X::Promise::Broken {
  19. has $.result-backtrace;
  20. multi method gist(::?CLASS:D:) {
  21. "Tried to get the result of a broken Promise\n" ~
  22. ((try $!result-backtrace ~ "\n") // '') ~
  23. "Original exception:\n" ~
  24. callsame().indent(4)
  25. }
  26. }
  27. my class Promise does Awaitable {
  28. has $.scheduler;
  29. has $.status;
  30. has $!result is default(Nil);
  31. has int $!vow_taken;
  32. has $!lock;
  33. has $!cond;
  34. has $!thens;
  35. has Mu $!dynamic_context;
  36. submethod new(:$scheduler = $*SCHEDULER) {
  37. my \p = nqp::create(self);
  38. p.BUILD(:$scheduler);
  39. p
  40. }
  41. submethod BUILD(:$scheduler = $*SCHEDULER --> Nil) {
  42. $!scheduler := $scheduler;
  43. $!lock := nqp::create(Lock);
  44. $!cond := $!lock.condition();
  45. $!status := Planned;
  46. $!thens := nqp::list();
  47. }
  48. # A Vow is used to enable the right to keep/break a promise
  49. # to be restricted to a given "owner". Taking the Vow for a Promise
  50. # prevents anybody else from getting hold of it.
  51. my class Vow { ... }
  52. trusts Vow;
  53. my class Vow {
  54. has $.promise;
  55. method keep(Mu \result) {
  56. $!promise!Promise::keep(result)
  57. }
  58. method break(\exception) {
  59. $!promise!Promise::break(exception)
  60. }
  61. }
  62. method vow() {
  63. nqp::lock($!lock);
  64. if $!vow_taken {
  65. nqp::unlock($!lock);
  66. X::Promise::Vowed.new(promise => self).throw
  67. }
  68. my $vow := nqp::create(Vow);
  69. nqp::bindattr($vow, Vow, '$!promise', self);
  70. $!vow_taken = 1;
  71. nqp::unlock($!lock);
  72. $vow
  73. }
  74. proto method kept(|) {*}
  75. multi method kept(Promise:U:) {
  76. my \rv := self.new;
  77. rv!keep(True);
  78. rv;
  79. }
  80. multi method kept(Promise:U: Mu \result) {
  81. my \rv := self.new;
  82. rv!keep(result);
  83. rv;
  84. }
  85. proto method keep(|) {*}
  86. multi method keep(Promise:D:) {
  87. self.vow.keep(True)
  88. }
  89. multi method keep(Promise:D: Mu \result) {
  90. self.vow.keep(result)
  91. }
  92. method !keep(Mu \result --> Nil) {
  93. $!lock.protect({
  94. $!result := result;
  95. $!status := Kept;
  96. self!schedule_thens();
  97. $!cond.signal_all;
  98. });
  99. }
  100. proto method broken(|) {*}
  101. multi method broken(Promise:U:) {
  102. my \rv := self.new;
  103. rv!break("Died");
  104. rv;
  105. }
  106. multi method broken(Promise:U: Mu \exception) {
  107. my \rv := self.new;
  108. rv!break(exception);
  109. rv;
  110. }
  111. proto method break(|) {*}
  112. multi method break(Promise:D:) {
  113. self.vow.break("Died")
  114. }
  115. multi method break(Promise:D: \result) {
  116. self.vow.break(result)
  117. }
  118. method !break(\result --> Nil) {
  119. $!lock.protect({
  120. $!result := nqp::istype(result, Exception)
  121. ?? result
  122. !! X::AdHoc.new(payload => result);
  123. $!status := Broken;
  124. self!schedule_thens();
  125. $!cond.signal_all;
  126. });
  127. }
  128. method !schedule_thens(--> Nil) {
  129. while nqp::elems($!thens) {
  130. $!scheduler.cue(nqp::shift($!thens), :catch(nqp::shift($!thens)))
  131. }
  132. }
  133. method result(Promise:D:) {
  134. # One important missing optimization here is that if the promise is
  135. # not yet started, then the work can be done immediately by the
  136. # thing that is blocking on it. (Note the while loop is there to cope
  137. # with spurious wake-ups).
  138. while $!status == Planned {
  139. $!lock.protect({
  140. # Re-check planned to avoid data race.
  141. $!cond.wait() if $!status == Planned;
  142. });
  143. }
  144. if $!status == Kept {
  145. $!result
  146. }
  147. elsif $!status == Broken {
  148. ($!result but X::Promise::Broken(Backtrace.new)).rethrow
  149. }
  150. }
  151. multi method Bool(Promise:D:) {
  152. so $!status == Broken || $!status == Kept
  153. }
  154. method cause(Promise:D:) {
  155. my $status := $!status;
  156. if $status == Broken {
  157. $!result
  158. } else {
  159. X::Promise::CauseOnlyValidOnBroken.new(
  160. promise => self,
  161. status => $status,
  162. ).throw
  163. }
  164. }
  165. method then(Promise:D: &code) {
  166. nqp::lock($!lock);
  167. if $!status == Broken || $!status == Kept {
  168. # Already have the result, start immediately.
  169. nqp::unlock($!lock);
  170. self.WHAT.start( { code(self) }, :$!scheduler);
  171. }
  172. else {
  173. # Create a Promise, and push 2 entries to $!thens: something that
  174. # starts the then code, and something that handles its exceptions.
  175. # They will be sent to the scheduler when this promise is kept or
  176. # broken.
  177. my $then-p := self.new(:$!scheduler);
  178. nqp::bindattr($then-p, Promise, '$!dynamic_context', nqp::ctx());
  179. my $vow := $then-p.vow;
  180. nqp::push($!thens, { my $*PROMISE := $then-p; $vow.keep(code(self)) });
  181. nqp::push($!thens, -> $ex { $vow.break($ex) });
  182. nqp::unlock($!lock);
  183. $then-p
  184. }
  185. }
  186. my class PromiseAwaitableHandle does Awaitable::Handle {
  187. has &!add-subscriber;
  188. method not-ready(&add-subscriber) {
  189. nqp::create(self)!not-ready(&add-subscriber)
  190. }
  191. method !not-ready(&add-subscriber) {
  192. $!already = False;
  193. &!add-subscriber := &add-subscriber;
  194. self
  195. }
  196. method subscribe-awaiter(&subscriber --> Nil) {
  197. &!add-subscriber(&subscriber);
  198. }
  199. }
  200. method get-await-handle(--> Awaitable::Handle:D) {
  201. if $!status == Broken {
  202. PromiseAwaitableHandle.already-failure($!result)
  203. }
  204. elsif $!status == Kept {
  205. PromiseAwaitableHandle.already-success($!result)
  206. }
  207. else {
  208. PromiseAwaitableHandle.not-ready: -> &on-ready {
  209. nqp::lock($!lock);
  210. if $!status == Broken || $!status == Kept {
  211. # Already have the result, call on-ready immediately.
  212. nqp::unlock($!lock);
  213. on-ready($!status == Kept, $!result)
  214. }
  215. else {
  216. # Push 2 entries to $!thens (only need the first one in
  217. # this case; second we push 'cus .then uses it).
  218. nqp::push($!thens, { on-ready($!status == Kept, $!result) });
  219. nqp::push($!thens, Callable);
  220. nqp::unlock($!lock);
  221. }
  222. }
  223. }
  224. }
  225. method start(Promise:U: &code, :&catch, :$scheduler = $*SCHEDULER, |c) {
  226. my $p := self.new(:$scheduler);
  227. nqp::bindattr($p, Promise, '$!dynamic_context', nqp::ctx());
  228. my $vow := $p.vow;
  229. $scheduler.cue(
  230. { my $*PROMISE := $p; $vow.keep(code(|c)) },
  231. :catch(-> $ex { catch($ex) if &catch; $vow.break($ex); }) );
  232. $p
  233. }
  234. method in(Promise:U: $seconds, :$scheduler = $*SCHEDULER) {
  235. my $p := self.new(:$scheduler);
  236. my $vow := $p.vow;
  237. $scheduler.cue({ $vow.keep(True) }, :in($seconds));
  238. $p
  239. }
  240. method at(Promise:U: $at, :$scheduler = $*SCHEDULER) {
  241. self.in( $at - now, :$scheduler )
  242. }
  243. method anyof(Promise:U: *@p) { self!until_n_kept(@p, 1, 'anyof') }
  244. method allof(Promise:U: *@p) { self!until_n_kept(@p, +@p, 'allof') }
  245. method !until_n_kept(@promises, Int:D $N, Str $combinator) {
  246. my $p := self.new;
  247. unless @promises {
  248. $p.keep;
  249. return $p
  250. }
  251. X::Promise::Combinator.new(:$combinator).throw
  252. unless Rakudo::Internals.ALL_DEFINED_TYPE(@promises, Promise);
  253. my int $n = $N;
  254. my int $c = $n;
  255. my $lock := nqp::create(Lock);
  256. my $vow := $p.vow;
  257. for @promises -> $cand {
  258. $cand.then({
  259. if $lock.protect({ $c = $c - 1 }) == 0 {
  260. $vow.keep(True)
  261. }
  262. })
  263. }
  264. $p
  265. }
  266. multi method Supply(Promise:D:) {
  267. Supply.on-demand: -> $s {
  268. self.then({
  269. if self.status == Kept {
  270. $s.emit(self.result);
  271. $s.done();
  272. }
  273. else {
  274. $s.quit(self.cause);
  275. }
  276. });
  277. }
  278. }
  279. }
  280. multi sub infix:<eqv>(Promise:D \a, Promise:D \b) {
  281. nqp::p6bool(
  282. nqp::eqaddr(a,b) || a.result eqv b.result
  283. )
  284. }