1. my class IO::Socket::Async {
  2. my class SocketCancellation is repr('AsyncTask') { }
  3. has $!VMIO;
  4. has int $!udp;
  5. has $.enc;
  6. has $!encoder;
  7. has $!close-promise;
  8. has $!close-vow;
  9. has Str $.peer-host;
  10. has Int $.peer-port;
  11. has Str $.socket-host;
  12. has Int $.socket-port;
  13. method new() {
  14. die "Cannot create an asynchronous socket directly; please use\n" ~
  15. "IO::Socket::Async.connect, IO::Socket::Async.listen,\n" ~
  16. "IO::Socket::Async.udp, or IO::Socket::Async.udp-bind";
  17. }
  18. method print(IO::Socket::Async:D: Str() $str, :$scheduler = $*SCHEDULER) {
  19. self.write($!encoder.encode-chars($str))
  20. }
  21. method write(IO::Socket::Async:D: Blob $b, :$scheduler = $*SCHEDULER) {
  22. my $p = Promise.new;
  23. my $v = $p.vow;
  24. nqp::asyncwritebytes(
  25. $!VMIO,
  26. $scheduler.queue,
  27. -> Mu \bytes, Mu \err {
  28. if err {
  29. $v.break(err);
  30. }
  31. else {
  32. $v.keep(bytes);
  33. }
  34. },
  35. nqp::decont($b), SocketCancellation);
  36. $p
  37. }
  38. my class Datagram {
  39. has $.data;
  40. has str $.hostname;
  41. has int $.port;
  42. method decode(|c) {
  43. die "Cannot decode a datagram with Str data" if $!data ~~ Str;
  44. return self.clone(data => $!data.decode(|c));
  45. }
  46. method encode(|c) {
  47. die "Cannot encode a datagram with Blob data" if $!data ~~ Blob;
  48. return self.clone(data => $!data.encode(|c));
  49. }
  50. }
  51. my class SocketReaderTappable does Tappable {
  52. has $!VMIO;
  53. has $!scheduler;
  54. has $!buf;
  55. has $!close-promise;
  56. has $!udp;
  57. method new(Mu :$VMIO!, :$scheduler!, :$buf!, :$close-promise!, :$udp!) {
  58. self.CREATE!SET-SELF($VMIO, $scheduler, $buf, $close-promise, $udp)
  59. }
  60. method !SET-SELF(Mu $!VMIO, $!scheduler, $!buf, $!close-promise, $!udp) { self }
  61. method tap(&emit, &done, &quit, &tap) {
  62. my $buffer := nqp::list();
  63. my int $buffer-start-seq = 0;
  64. my int $done-target = -1;
  65. my int $finished = 0;
  66. sub emit-events() {
  67. until nqp::elems($buffer) == 0 || nqp::isnull(nqp::atpos($buffer, 0)) {
  68. emit(nqp::shift($buffer));
  69. $buffer-start-seq = $buffer-start-seq + 1;
  70. }
  71. if $buffer-start-seq == $done-target {
  72. done();
  73. $finished = 1;
  74. }
  75. }
  76. my $lock = Lock::Async.new;
  77. my $tap;
  78. $lock.protect: {
  79. my $cancellation := nqp::asyncreadbytes(nqp::decont($!VMIO),
  80. $!scheduler.queue(:hint-affinity),
  81. -> Mu \seq, Mu \data, Mu \err, Mu \hostname = Str, Mu \port = Int {
  82. $lock.protect: {
  83. unless $finished {
  84. if err {
  85. quit(err);
  86. $finished = 1;
  87. }
  88. elsif nqp::isconcrete(data) {
  89. my int $insert-pos = seq - $buffer-start-seq;
  90. if $!udp && nqp::isconcrete(hostname) && nqp::isconcrete(port) {
  91. nqp::bindpos($buffer, $insert-pos, Datagram.new(
  92. data => data,
  93. hostname => hostname,
  94. port => port
  95. ));
  96. } else {
  97. nqp::bindpos($buffer, $insert-pos, data);
  98. }
  99. emit-events();
  100. }
  101. else {
  102. $done-target = seq;
  103. emit-events();
  104. }
  105. }
  106. }
  107. },
  108. nqp::decont($!buf), SocketCancellation);
  109. $tap := Tap.new({ nqp::cancel($cancellation) });
  110. tap($tap);
  111. }
  112. $!close-promise.then: {
  113. $lock.protect: {
  114. unless $finished {
  115. done();
  116. $finished = 1;
  117. }
  118. }
  119. }
  120. $tap
  121. }
  122. method live(--> False) { }
  123. method sane(--> True) { }
  124. method serial(--> True) { }
  125. }
  126. multi method Supply(IO::Socket::Async:D: :$bin, :$buf = buf8.new, :$datagram, :$enc, :$scheduler = $*SCHEDULER) {
  127. if $bin {
  128. Supply.new: SocketReaderTappable.new:
  129. :$!VMIO, :$scheduler, :$buf, :$!close-promise, udp => $!udp && $datagram
  130. }
  131. else {
  132. my $bin-supply = self.Supply(:bin, :$datagram);
  133. if $!udp {
  134. supply {
  135. whenever $bin-supply {
  136. emit .decode($enc // $!enc);
  137. }
  138. }
  139. }
  140. else {
  141. Rakudo::Internals.BYTE_SUPPLY_DECODER($bin-supply, $enc // $!enc)
  142. }
  143. }
  144. }
  145. method close(IO::Socket::Async:D: --> True) {
  146. nqp::closefh($!VMIO);
  147. $!close-vow.keep(True);
  148. }
  149. method connect(IO::Socket::Async:U: Str() $host, Int() $port,
  150. :$enc = 'utf-8', :$scheduler = $*SCHEDULER) {
  151. my $p = Promise.new;
  152. my $v = $p.vow;
  153. my $encoding = Encoding::Registry.find($enc);
  154. nqp::asyncconnect(
  155. $scheduler.queue,
  156. -> Mu \socket, Mu \err, Mu \peer-host, Mu \peer-port, Mu \socket-host, Mu \socket-port {
  157. if err {
  158. $v.break(err);
  159. }
  160. else {
  161. my $client_socket := nqp::create(self);
  162. nqp::bindattr($client_socket, IO::Socket::Async, '$!VMIO', socket);
  163. nqp::bindattr($client_socket, IO::Socket::Async, '$!enc', $encoding.name);
  164. nqp::bindattr($client_socket, IO::Socket::Async, '$!encoder',
  165. $encoding.encoder());
  166. nqp::bindattr($client_socket, IO::Socket::Async, '$!peer-host', peer-host);
  167. nqp::bindattr($client_socket, IO::Socket::Async, '$!peer-port', peer-port);
  168. nqp::bindattr($client_socket, IO::Socket::Async, '$!socket-host', socket-host);
  169. nqp::bindattr($client_socket, IO::Socket::Async, '$!socket-port', socket-port);
  170. setup-close($client_socket);
  171. $v.keep($client_socket);
  172. }
  173. },
  174. $host, $port, SocketCancellation);
  175. $p
  176. }
  177. my class SocketListenerTappable does Tappable {
  178. has $!host;
  179. has $!port;
  180. has $!backlog;
  181. has $!encoding;
  182. has $!scheduler;
  183. method new(:$host!, :$port!, :$backlog!, :$encoding!, :$scheduler!) {
  184. self.CREATE!SET-SELF($host, $port, $backlog, $encoding, $scheduler)
  185. }
  186. method !SET-SELF($!host, $!port, $!backlog, $!encoding, $!scheduler) { self }
  187. method tap(&emit, &done, &quit, &tap) {
  188. my $lock := Lock::Async.new;
  189. my $tap;
  190. my int $finished = 0;
  191. $lock.protect: {
  192. my $cancellation := nqp::asynclisten(
  193. $!scheduler.queue(:hint-affinity),
  194. -> Mu \socket, Mu \err, Mu \peer-host, Mu \peer-port,
  195. Mu \socket-host, Mu \socket-port {
  196. $lock.protect: {
  197. if $finished {
  198. # do nothing
  199. }
  200. elsif err {
  201. quit(X::AdHoc.new(payload => err));
  202. $finished = 1;
  203. }
  204. else {
  205. my $client_socket := nqp::create(IO::Socket::Async);
  206. nqp::bindattr($client_socket, IO::Socket::Async,
  207. '$!VMIO', socket);
  208. nqp::bindattr($client_socket, IO::Socket::Async,
  209. '$!enc', $!encoding.name);
  210. nqp::bindattr($client_socket, IO::Socket::Async,
  211. '$!encoder', $!encoding.encoder());
  212. nqp::bindattr($client_socket, IO::Socket::Async,
  213. '$!peer-host', peer-host);
  214. nqp::bindattr($client_socket, IO::Socket::Async,
  215. '$!peer-port', peer-port);
  216. nqp::bindattr($client_socket, IO::Socket::Async,
  217. '$!socket-host', socket-host);
  218. nqp::bindattr($client_socket, IO::Socket::Async,
  219. '$!socket-port', socket-port);
  220. setup-close($client_socket);
  221. emit($client_socket);
  222. }
  223. }
  224. },
  225. $!host, $!port, $!backlog, SocketCancellation);
  226. $tap = Tap.new: {
  227. my $p = Promise.new;
  228. my $v = $p.vow;
  229. nqp::cancelnotify($cancellation, $!scheduler.queue, { $v.keep(True); });
  230. $p
  231. }
  232. tap($tap);
  233. CATCH {
  234. default {
  235. tap($tap = Tap.new({ Nil })) unless $tap;
  236. quit($_);
  237. }
  238. }
  239. }
  240. $tap
  241. }
  242. method live(--> False) { }
  243. method sane(--> True) { }
  244. method serial(--> True) { }
  245. }
  246. method listen(IO::Socket::Async:U: Str() $host, Int() $port, Int() $backlog = 128,
  247. :$enc = 'utf-8', :$scheduler = $*SCHEDULER) {
  248. my $encoding = Encoding::Registry.find($enc);
  249. Supply.new: SocketListenerTappable.new:
  250. :$host, :$port, :$backlog, :$encoding, :$scheduler
  251. }
  252. sub setup-close(\socket --> Nil) {
  253. my $p := Promise.new;
  254. nqp::bindattr(socket, IO::Socket::Async, '$!close-promise', $p);
  255. nqp::bindattr(socket, IO::Socket::Async, '$!close-vow', $p.vow);
  256. }
  257. method udp(IO::Socket::Async:U: :$broadcast, :$enc = 'utf-8', :$scheduler = $*SCHEDULER) {
  258. my $p = Promise.new;
  259. my $encoding = Encoding::Registry.find($enc);
  260. nqp::asyncudp(
  261. $scheduler.queue,
  262. -> Mu \socket, Mu \err {
  263. if err {
  264. $p.break(err);
  265. }
  266. else {
  267. my $client_socket := nqp::create(self);
  268. nqp::bindattr($client_socket, IO::Socket::Async, '$!VMIO', socket);
  269. nqp::bindattr_i($client_socket, IO::Socket::Async, '$!udp', 1);
  270. nqp::bindattr($client_socket, IO::Socket::Async, '$!enc', $encoding.name);
  271. nqp::bindattr($client_socket, IO::Socket::Async, '$!encoder',
  272. $encoding.encoder());
  273. setup-close($client_socket);
  274. $p.keep($client_socket);
  275. }
  276. },
  277. nqp::null_s(), 0, $broadcast ?? 1 !! 0,
  278. SocketCancellation);
  279. await $p
  280. }
  281. method bind-udp(IO::Socket::Async:U: Str() $host, Int() $port, :$broadcast,
  282. :$enc = 'utf-8', :$scheduler = $*SCHEDULER) {
  283. my $p = Promise.new;
  284. my $encoding = Encoding::Registry.find($enc);
  285. nqp::asyncudp(
  286. $scheduler.queue(:hint-affinity),
  287. -> Mu \socket, Mu \err {
  288. if err {
  289. $p.break(err);
  290. }
  291. else {
  292. my $client_socket := nqp::create(self);
  293. nqp::bindattr($client_socket, IO::Socket::Async, '$!VMIO', socket);
  294. nqp::bindattr_i($client_socket, IO::Socket::Async, '$!udp', 1);
  295. nqp::bindattr($client_socket, IO::Socket::Async, '$!enc', $encoding.name);
  296. nqp::bindattr($client_socket, IO::Socket::Async, '$!encoder',
  297. $encoding.encoder());
  298. setup-close($client_socket);
  299. $p.keep($client_socket);
  300. }
  301. },
  302. nqp::unbox_s($host), nqp::unbox_i($port), $broadcast ?? 1 !! 0,
  303. SocketCancellation);
  304. await $p
  305. }
  306. method print-to(IO::Socket::Async:D: Str() $host, Int() $port, Str() $str, :$scheduler = $*SCHEDULER) {
  307. self.write-to($host, $port, $!encoder.encode-chars($str), :$scheduler)
  308. }
  309. method write-to(IO::Socket::Async:D: Str() $host, Int() $port, Blob $b, :$scheduler = $*SCHEDULER) {
  310. my $p = Promise.new;
  311. my $v = $p.vow;
  312. nqp::asyncwritebytesto(
  313. $!VMIO,
  314. $scheduler.queue,
  315. -> Mu \bytes, Mu \err {
  316. if err {
  317. $v.break(err);
  318. }
  319. else {
  320. $v.keep(bytes);
  321. }
  322. },
  323. nqp::decont($b), SocketCancellation,
  324. nqp::unbox_s($host), nqp::unbox_i($port));
  325. $p
  326. }
  327. }