1. # An asynchronous lock provides a non-blocking non-reentrant mechanism for
  2. # mutual exclusion. The lock method returns a Promise, which will already be
  3. # Kept if nothing was holding the lock already, so execution can proceed
  4. # immediately. For performance reasons, in this case it returns a singleton
  5. # Promise instance. Otherwise, a Promise in planned state will be returned,
  6. # and Kept once the lock has been unlocked by its current holder. The lock
  7. # and unlock do not need to take place on the same thread; that's why it's not
  8. # reentrant.
  9. my class X::Lock::Async::NotLocked is Exception {
  10. method message() {
  11. "Cannot unlock a Lock::Async that is not currently locked"
  12. }
  13. }
  14. my class Lock::Async {
  15. # The Holder class is an immutable object. A type object represents an
  16. # unheld lock, an instance represents a held lock, and it has a queue of
  17. # vows to be kept on unlock.
  18. my class Holder {
  19. has $!queue;
  20. method queue-vow(\v) {
  21. my $new-queue := $!queue.DEFINITE
  22. ?? nqp::clone($!queue)
  23. !! nqp::list();
  24. nqp::push($new-queue, v);
  25. nqp::p6bindattrinvres(nqp::create(Holder), Holder, '$!queue', $new-queue)
  26. }
  27. method waiter-queue-length() {
  28. nqp::elems($!queue)
  29. }
  30. # Assumes it won't be called if there is no queue (SINGLE_HOLDER case
  31. # in unlock())
  32. method head-vow() {
  33. nqp::atpos($!queue, 0)
  34. }
  35. # Assumes it won't be called if the queue only had one item in it (to
  36. # mantain SINGLE_HOLDER fast path usage)
  37. method without-head-vow() {
  38. my $new-queue := nqp::clone($!queue);
  39. nqp::shift($new-queue);
  40. nqp::p6bindattrinvres(nqp::create(Holder), Holder, '$!queue', $new-queue)
  41. }
  42. }
  43. # Base states for Holder
  44. my constant NO_HOLDER = Holder;
  45. my constant SINGLE_HOLDER = nqp::create(Holder);
  46. # The current holder record, with waiters queue, of the lock.
  47. has Holder $!holder = Holder;
  48. # Singleton Promise to be used when there's no need to wait.
  49. my \KEPT-PROMISE := do {
  50. my \p = Promise.new;
  51. p.keep(True);
  52. p
  53. }
  54. method lock(Lock::Async:D: --> Promise) {
  55. loop {
  56. my $holder := ⚛$!holder;
  57. if $holder.DEFINITE {
  58. my $p := Promise.new;
  59. my $v := $p.vow;
  60. my $holder-update := $holder.queue-vow($v);
  61. if cas($!holder, $holder, $holder-update) =:= $holder {
  62. return $p;
  63. }
  64. }
  65. else {
  66. if cas($!holder, NO_HOLDER, SINGLE_HOLDER) =:= NO_HOLDER {
  67. # Successfully acquired and we're the only holder
  68. return KEPT-PROMISE;
  69. }
  70. }
  71. }
  72. }
  73. method unlock(Lock::Async:D: --> Nil) {
  74. loop {
  75. my $holder := ⚛$!holder;
  76. if $holder =:= SINGLE_HOLDER {
  77. # We're the single holder and there's no wait queue.
  78. if cas($!holder, SINGLE_HOLDER, NO_HOLDER) =:= SINGLE_HOLDER {
  79. # Successfully released to NO_HOLDER state.
  80. return;
  81. }
  82. }
  83. elsif $holder.DEFINITE {
  84. my int $queue-length = $holder.waiter-queue-length();
  85. my $v := $holder.head-vow;
  86. if $queue-length == 1 {
  87. if cas($!holder, $holder, SINGLE_HOLDER) =:= $holder {
  88. # Successfully released; keep the head vow, thus
  89. # giving the lock to the next waiter.
  90. $v.keep(True);
  91. return;
  92. }
  93. }
  94. else {
  95. my $new-holder := $holder.without-head-vow();
  96. if cas($!holder, $holder, $new-holder) =:= $holder {
  97. # Successfully released and installed remaining queue;
  98. # keep the head vow which we successfully removed.
  99. $v.keep(True);
  100. return;
  101. }
  102. }
  103. }
  104. else {
  105. die X::Lock::Async::NotLocked.new;
  106. }
  107. }
  108. }
  109. method protect(Lock::Async:D: &code) {
  110. my int $acquired = 0;
  111. $*AWAITER.await(self.lock());
  112. $acquired = 1;
  113. LEAVE self.unlock() if $acquired;
  114. code()
  115. }
  116. # This either runs the code now if we can obtain the lock, releasing the
  117. # lock afterwards, or queues the code to run if a recursive use of the
  118. # lock is observed. It relies on all users of the lock to use it through
  119. # this method only. This is useful for providing back-pressure while also
  120. # avoiding code deadlocking on itself by providing a way for it to get run
  121. # later on. Returns Nil if the code was run now (maybe after blocking), or
  122. # a Promise if it was queued for running later.
  123. method protect-or-queue-on-recursion(Lock::Async:D: &code) {
  124. my $try-acquire := self.lock();
  125. if $try-acquire {
  126. # We could acquire the lock. Run the code right now.
  127. self!run-with-updated-recursion-list(&code);
  128. Nil
  129. }
  130. elsif self!on-recursion-list() {
  131. # Lock is already held on the stack, so we're recursing. Queue.
  132. $try-acquire.then({
  133. self!run-with-updated-recursion-list(&code);
  134. });
  135. }
  136. else {
  137. # Lock is held but by something else. Await it's availability.
  138. $*AWAITER.await($try-acquire);
  139. self!run-with-updated-recursion-list(&code);
  140. Nil
  141. }
  142. }
  143. method !on-recursion-list() {
  144. my $rec-list := nqp::getlexdyn('$*LOCK-ASYNC-RECURSION-LIST');
  145. nqp::isnull($rec-list) ?? False !! self!search-recursion-list($rec-list)
  146. }
  147. method !search-recursion-list(IterationBuffer \rec-list) {
  148. my int $n = nqp::elems(rec-list);
  149. loop (my int $i = 0; $i < $n; ++$i) {
  150. return True if nqp::eqaddr(nqp::atpos(rec-list, $i), self);
  151. }
  152. False
  153. }
  154. method !run-with-updated-recursion-list(&code) {
  155. LEAVE self.unlock();
  156. my $current := nqp::getlexdyn('$*LOCK-ASYNC-RECURSION-LIST');
  157. my $new-held := nqp::isnull($current)
  158. ?? nqp::create(IterationBuffer)
  159. !! nqp::clone($current);
  160. nqp::push($new-held, self);
  161. self!run-under-recursion-list($new-held, &code);
  162. }
  163. method with-lock-hidden-from-recursion-check(&code) {
  164. my $current := nqp::getlexdyn('$*LOCK-ASYNC-RECURSION-LIST');
  165. nqp::isnull($current)
  166. ?? code()
  167. !! self!hidden-in-recursion-list($current, &code)
  168. }
  169. method !hidden-in-recursion-list(IterationBuffer \current, &code) {
  170. my $new-held := nqp::create(IterationBuffer);
  171. my int $n = nqp::elems(current);
  172. loop (my int $i = 0; $i < $n; ++$i) {
  173. my $lock := nqp::atpos(current, $i);
  174. nqp::push($new-held, $lock) unless nqp::eqaddr($lock, self);
  175. }
  176. self!run-under-recursion-list($new-held, &code);
  177. }
  178. method !run-under-recursion-list(IterationBuffer $*LOCK-ASYNC-RECURSION-LIST, &code) {
  179. code()
  180. }
  181. }