1. # A channel provides a thread-safe way to send a series of values from some
  2. # producer(s) to some consumer(s).
  3. my class X::Channel::SendOnClosed is Exception {
  4. has $.channel;
  5. method message() { "Cannot send a message on a closed channel" }
  6. }
  7. my class X::Channel::ReceiveOnClosed is Exception {
  8. has $.channel;
  9. method message() { "Cannot receive a message on a closed channel" }
  10. }
  11. my class Channel does Awaitable {
  12. # The queue of events moving through the channel.
  13. my class Queue is repr('ConcBlockingQueue') { }
  14. has $!queue;
  15. # Promise that is triggered when all values are received, or an error is
  16. # received and the channel is thus closed.
  17. has $!closed_promise;
  18. # Closed promise's vow.
  19. has $!closed_promise_vow;
  20. # Flag for if the channel is closed to senders.
  21. has int $!closed;
  22. # We use a Supplier to send async notifications that there may be a new
  23. # message to read from the channel (there may be many things competing
  24. # over them).
  25. has $!async-notify;
  26. # Magical objects for various ways a channel can end.
  27. my class CHANNEL_CLOSE { }
  28. my class CHANNEL_FAIL { has $.error }
  29. submethod BUILD(--> Nil) {
  30. $!queue := nqp::create(Queue);
  31. $!closed_promise = Promise.new;
  32. $!closed_promise_vow = $!closed_promise.vow;
  33. $!async-notify = Supplier.new;
  34. }
  35. method send(Channel:D: \item --> Nil) {
  36. nqp::if(
  37. $!closed,
  38. X::Channel::SendOnClosed.new(channel => self).throw,
  39. nqp::stmts(
  40. nqp::push($!queue,nqp::decont(item)),
  41. $!async-notify.emit(True)
  42. )
  43. )
  44. }
  45. method receive(Channel:D:) {
  46. nqp::if(
  47. nqp::istype((my \msg := nqp::shift($!queue)),CHANNEL_CLOSE),
  48. nqp::stmts(
  49. nqp::push($!queue, msg), # make sure other readers see it
  50. $!closed_promise_vow.keep(Nil),
  51. X::Channel::ReceiveOnClosed.new(channel => self).throw
  52. ),
  53. nqp::if(
  54. nqp::istype(msg,CHANNEL_FAIL),
  55. nqp::stmts(
  56. nqp::push($!queue,msg), # make sure other readers see it
  57. $!closed_promise_vow.break(my $error := msg.error),
  58. $error.rethrow
  59. ),
  60. msg
  61. )
  62. )
  63. }
  64. method receive-nil-on-close(Channel:D:) {
  65. nqp::if(
  66. nqp::istype((my \msg := nqp::shift($!queue)),CHANNEL_CLOSE),
  67. nqp::stmts(
  68. nqp::push($!queue, msg), # make sure other readers see it
  69. $!closed_promise_vow.keep(Nil),
  70. Nil
  71. ),
  72. nqp::if(
  73. nqp::istype(msg,CHANNEL_FAIL),
  74. nqp::stmts(
  75. nqp::push($!queue,msg), # make sure other readers see it
  76. $!closed_promise_vow.break(my $error := msg.error),
  77. $error.rethrow
  78. ),
  79. msg
  80. )
  81. )
  82. }
  83. method poll(Channel:D:) {
  84. nqp::if(
  85. nqp::isnull(my \msg := nqp::queuepoll($!queue)),
  86. Nil,
  87. nqp::if(
  88. nqp::istype(msg, CHANNEL_CLOSE),
  89. nqp::stmts(
  90. $!closed_promise_vow.keep(Nil),
  91. Nil
  92. ),
  93. nqp::if(
  94. nqp::istype(msg, CHANNEL_FAIL),
  95. nqp::stmts(
  96. $!closed_promise_vow.break(msg.error),
  97. Nil
  98. ),
  99. msg
  100. )
  101. )
  102. )
  103. }
  104. method !peek(Channel:D:) {
  105. my \msg := nqp::atpos($!queue, 0);
  106. if nqp::isnull(msg) {
  107. Nil
  108. } else {
  109. if nqp::istype(msg, CHANNEL_CLOSE) {
  110. $!closed_promise_vow.keep(Nil);
  111. Nil
  112. }
  113. elsif nqp::istype(msg, CHANNEL_FAIL) {
  114. $!closed_promise_vow.break(msg.error);
  115. Nil
  116. }
  117. else {
  118. msg
  119. }
  120. }
  121. }
  122. method Capture(Channel:D:) { self.List.Capture }
  123. multi method Supply(Channel:D:) {
  124. supply {
  125. # Tap the async notification for new values supply.
  126. whenever $!async-notify.unsanitized-supply.schedule-on($*SCHEDULER) {
  127. my Mu \got = self.poll;
  128. if nqp::eqaddr(got, Nil) {
  129. if $!closed_promise {
  130. $!closed_promise.status == Kept
  131. ?? done()
  132. !! die $!closed_promise.cause
  133. }
  134. }
  135. else {
  136. emit got;
  137. }
  138. }
  139. # Grab anything that's in the channel and emit it. Note that
  140. # it's important to do this after tapping the supply, or a
  141. # value sent between us draining it and doing the tap would
  142. # not result in a notification, and so we'd not emit it on
  143. # the supply. This lost event can then cause a deadlock.
  144. loop {
  145. my Mu \got = self.poll;
  146. last if nqp::eqaddr(got, Nil);
  147. emit got;
  148. }
  149. self!peek();
  150. if $!closed_promise {
  151. $!closed_promise.status == Kept
  152. ?? done()
  153. !! die $!closed_promise.cause
  154. }
  155. }
  156. }
  157. method iterator(Channel:D:) {
  158. class :: does Iterator {
  159. has $!channel;
  160. method !SET-SELF($!channel) { self }
  161. method new(\c) { nqp::create(self)!SET-SELF(c) }
  162. method pull-one() {
  163. my Mu \got = $!channel.receive-nil-on-close;
  164. nqp::eqaddr(got, Nil) ?? IterationEnd !! got
  165. }
  166. }.new(self)
  167. }
  168. method list(Channel:D:) { self.Seq.list }
  169. my class ChannelAwaitableHandle does Awaitable::Handle {
  170. has $!channel;
  171. has $!closed_promise;
  172. has $!async-notify;
  173. method not-ready(Channel:D $channel, Promise:D $closed_promise, Supplier:D $async-notify) {
  174. nqp::create(self)!not-ready($channel, $closed_promise, $async-notify)
  175. }
  176. method !not-ready($channel, $closed_promise, $async-notify) {
  177. $!already = False;
  178. $!channel := $channel;
  179. $!closed_promise := $closed_promise;
  180. $!async-notify := $async-notify;
  181. self
  182. }
  183. method subscribe-awaiter(&subscriber --> Nil) {
  184. # Need some care here to avoid a race. We must tap the notification
  185. # supply first, and then do an immediate poll after it, just to be
  186. # sure we won't miss notifications between the two. Also, we need
  187. # to take some care that we never call subscriber twice.
  188. my $notified := False;
  189. my $l := Lock.new;
  190. my $t;
  191. $l.protect: {
  192. # Lock ensures $t will be assigned before we run the logic
  193. # inside of poll-now, which relies on being able to do
  194. # $t.close.
  195. $t := $!async-notify.unsanitized-supply.tap: &poll-now;
  196. }
  197. poll-now();
  198. sub poll-now($discard?) {
  199. $l.protect: {
  200. unless $notified {
  201. my \maybe = $!channel.poll;
  202. if maybe === Nil {
  203. if $!closed_promise.status == Kept {
  204. $notified := True;
  205. subscriber(False, X::Channel::ReceiveOnClosed.new(:$!channel))
  206. }
  207. elsif $!closed_promise.status == Broken {
  208. $notified := True;
  209. subscriber(False, $!closed_promise.cause)
  210. }
  211. }
  212. else {
  213. $notified := True;
  214. subscriber(True, maybe);
  215. }
  216. $t.close if $notified;
  217. }
  218. }
  219. }
  220. }
  221. }
  222. method get-await-handle(--> Awaitable::Handle:D) {
  223. my \maybe = self.poll;
  224. if maybe === Nil {
  225. if $!closed_promise {
  226. ChannelAwaitableHandle.already-failure(
  227. $!closed_promise.status == Kept
  228. ?? X::Channel::ReceiveOnClosed.new(channel => self)
  229. !! $!closed_promise.cause
  230. )
  231. }
  232. else {
  233. ChannelAwaitableHandle.not-ready(self, $!closed_promise, $!async-notify)
  234. }
  235. }
  236. else {
  237. ChannelAwaitableHandle.already-success(maybe)
  238. }
  239. }
  240. method close(--> Nil) {
  241. $!closed = 1;
  242. nqp::push($!queue, CHANNEL_CLOSE);
  243. # if $!queue is otherwise empty, make sure that $!closed_promise
  244. # learns about the new value
  245. self!peek();
  246. $!async-notify.emit(True);
  247. }
  248. method elems() {
  249. Failure.new("Cannot determine number of elements on a {self.^name}")
  250. }
  251. method fail($error is copy) {
  252. $!closed = 1;
  253. $error = X::AdHoc.new(payload => $error) unless nqp::istype($error, Exception);
  254. nqp::push($!queue, CHANNEL_FAIL.new(:$error));
  255. $!async-notify.emit(True);
  256. Nil
  257. }
  258. method closed() {
  259. self!peek();
  260. $!closed_promise
  261. }
  262. }