1. my class ThreadPoolScheduler does Scheduler {
  2. # A concurrent, blocking-on-receive queue.
  3. my class Queue is repr('ConcBlockingQueue') {
  4. method elems() is raw { nqp::elems(self) }
  5. }
  6. # Initialize $*PID here, as we need it for the debug message
  7. # anyway *and* it appears to have a positive effect on stability
  8. # specifically wrt GH #1202.
  9. PROCESS::<$PID> := nqp::p6box_i(my $pid := nqp::getpid);
  10. # Scheduler debug, controlled by an environment variable.
  11. my int $scheduler-debug = so %*ENV<RAKUDO_SCHEDULER_DEBUG>;
  12. my int $scheduler-debug-status = so %*ENV<RAKUDO_SCHEDULER_DEBUG_STATUS>;
  13. sub scheduler-debug($message --> Nil) {
  14. if $scheduler-debug {
  15. note "[SCHEDULER $pid] $message";
  16. }
  17. }
  18. sub scheduler-debug-status($message --> Nil) {
  19. if $scheduler-debug-status {
  20. note "[SCHEDULER $pid] $message";
  21. }
  22. }
  23. # Infrastructure for non-blocking `await` for code running on the
  24. # scheduler.
  25. my constant THREAD_POOL_PROMPT = Mu.new;
  26. class ThreadPoolAwaiter does Awaiter {
  27. has $!queue;
  28. submethod BUILD(:$queue!) {
  29. $!queue := nqp::decont($queue);
  30. }
  31. sub holding-locks() {
  32. nqp::p6bool(nqp::threadlockcount(nqp::currentthread()))
  33. }
  34. method await(Awaitable:D $a) {
  35. holding-locks() || !nqp::isnull(nqp::getlexdyn('$*RAKUDO-AWAIT-BLOCKING'))
  36. ?? Awaiter::Blocking.await($a)
  37. !! self!do-await($a)
  38. }
  39. method !do-await(Awaitable:D $a) {
  40. my $handle := $a.get-await-handle;
  41. if $handle.already {
  42. $handle.success
  43. ?? $handle.result
  44. !! $handle.cause.rethrow
  45. }
  46. else {
  47. my $success;
  48. my $result;
  49. nqp::continuationcontrol(1, THREAD_POOL_PROMPT, -> Mu \c {
  50. $handle.subscribe-awaiter(-> \success, \result {
  51. $success := success;
  52. $result := result;
  53. nqp::push($!queue, { nqp::continuationinvoke(c, nqp::null()) });
  54. Nil
  55. });
  56. });
  57. $success
  58. ?? $result
  59. !! $result.rethrow
  60. }
  61. }
  62. method await-all(Iterable:D \i) {
  63. holding-locks() || !nqp::isnull(nqp::getlexdyn('$*RAKUDO-AWAIT-BLOCKING'))
  64. ?? Awaiter::Blocking.await-all(i)
  65. !! self!do-await-all(i)
  66. }
  67. method !do-await-all(Iterable:D \i) {
  68. # Collect results that are already available, and handles where the
  69. # results are not yet available together with the matching insertion
  70. # indices.
  71. my \results = nqp::list();
  72. my \handles = nqp::list();
  73. my \indices = nqp::list_i();
  74. my int $insert = 0;
  75. my $saw-slip = False;
  76. for i -> $awaitable {
  77. unless nqp::istype($awaitable, Awaitable) {
  78. die "Can only specify Awaitable objects to await (got a $awaitable.^name())";
  79. }
  80. unless nqp::isconcrete($awaitable) {
  81. die "Must specify a defined Awaitable to await (got an undefined $awaitable.^name())";
  82. }
  83. my $handle := $awaitable.get-await-handle;
  84. if $handle.already {
  85. if $handle.success {
  86. my \result = $handle.result;
  87. nqp::bindpos(results, $insert, result);
  88. $saw-slip = True if nqp::istype(result, Slip);
  89. }
  90. else {
  91. $handle.cause.rethrow
  92. }
  93. }
  94. else {
  95. nqp::push(handles, $handle);
  96. nqp::push_i(indices, $insert);
  97. }
  98. ++$insert;
  99. }
  100. # See if we have anything that we really need to suspend for. If
  101. # so, we need to take great care that the continuation taking is
  102. # complete before we try to resume it (completions can happen on
  103. # different threads, and so concurrent with us subscribing, not
  104. # to mention concurrent with each other wanting to resume). We
  105. # use a lock to take care of this, holding the lock until the
  106. # continuation has been taken.
  107. my int $num-handles = nqp::elems(handles);
  108. if $num-handles {
  109. my $continuation;
  110. my $exception;
  111. my $l = Lock.new;
  112. $l.lock;
  113. {
  114. my int $remaining = $num-handles;
  115. loop (my int $i = 0; $i < $num-handles; ++$i) {
  116. my $handle := nqp::atpos(handles, $i);
  117. my int $insert = nqp::atpos_i(indices, $i);
  118. $handle.subscribe-awaiter(-> \success, \result {
  119. my int $resume;
  120. $l.protect: {
  121. if success && $remaining {
  122. nqp::bindpos(results, $insert, result);
  123. $saw-slip = True if nqp::istype(result, Slip);
  124. --$remaining;
  125. $resume = 1 unless $remaining;
  126. }
  127. elsif !nqp::isconcrete($exception) {
  128. $exception := result;
  129. $remaining = 0;
  130. $resume = 1;
  131. }
  132. }
  133. if $resume {
  134. nqp::push($!queue, {
  135. nqp::continuationinvoke($continuation, nqp::null())
  136. });
  137. }
  138. });
  139. }
  140. CATCH {
  141. # Unlock if we fail here, and let the exception
  142. # propagate outwards.
  143. $l.unlock();
  144. }
  145. }
  146. nqp::continuationcontrol(1, THREAD_POOL_PROMPT, -> Mu \c {
  147. $continuation := c;
  148. $l.unlock;
  149. });
  150. # If we got an exception, throw it.
  151. $exception.rethrow if nqp::isconcrete($exception);
  152. }
  153. my \result-list = nqp::p6bindattrinvres(nqp::create(List), List, '$!reified', results);
  154. $saw-slip ?? result-list.map(-> \val { val }).List !! result-list
  155. }
  156. }
  157. # There are three kinds of worker:
  158. # * General worker threads all pull from the main queue. If they have no
  159. # work, they may steal from timer threads.
  160. # * Timer worker threads are intended to handle time-based events. They
  161. # pull events from the time-sensitive queue, and they will not do any
  162. # work stealing so as to be ready and available for timer events. The
  163. # time-sensitive queue will only be returned when a queue is requested
  164. # with the :hint-time-sensitive named argument. Only one timer worker
  165. # will be created on the first request for such a queue; the supervisor
  166. # will then monitor the time-sensitive queue length and add more if
  167. # needed.
  168. # * Affinity worker threads each have their own queue. They are used when
  169. # a queue is requested and :hint-affinity is passed. These are useful
  170. # for things like Proc::Async and IO::Socket::Async, where events will
  171. # be processed using a Supply, which is serial, and so there's no point
  172. # at all in contending over the data. Work will not be stolen from an
  173. # affinity worker thread.
  174. my role Worker {
  175. has $.thread;
  176. has $!scheduler;
  177. # Completed is the number of tasks completed since the last time the
  178. # supervisor checked in.
  179. has atomicint $.completed;
  180. # Total number of tasks completed since creation.
  181. has int $.total;
  182. # Working is 1 if the worker is currently busy, 0 if not.
  183. has int $.working;
  184. # Number of times take-completed has returned zero in a row.
  185. has int $.times-nothing-completed;
  186. # Resets the completed to zero and updates the total.
  187. method take-completed() {
  188. my atomicint $taken;
  189. cas $!completed, -> atomicint $current { $taken = $current; 0 }
  190. if $taken == 0 {
  191. ++$!times-nothing-completed;
  192. }
  193. else {
  194. $!times-nothing-completed = 0;
  195. }
  196. $taken
  197. }
  198. method !run-one(\task --> Nil) {
  199. $!working = 1;
  200. nqp::continuationreset(THREAD_POOL_PROMPT, {
  201. if nqp::istype(task, List) {
  202. my Mu $code := nqp::shift(nqp::getattr(task, List, '$!reified'));
  203. $code(|task);
  204. }
  205. else {
  206. task.();
  207. }
  208. CONTROL {
  209. default {
  210. my Mu $vm-ex := nqp::getattr(nqp::decont($_), Exception, '$!ex');
  211. nqp::getcomp('perl6').handle-control($vm-ex);
  212. }
  213. }
  214. CATCH {
  215. default {
  216. $!scheduler.handle_uncaught($_)
  217. }
  218. }
  219. });
  220. $!working = 0;
  221. ++⚛$!completed;
  222. ++$!total;
  223. }
  224. }
  225. my class GeneralWorker does Worker {
  226. has Queue $!queue;
  227. submethod BUILD(Queue :$queue!, :$!scheduler!) {
  228. $!queue := $queue;
  229. $!thread = Thread.start(:app_lifetime, :name<GeneralWorker>, {
  230. my $*AWAITER := ThreadPoolAwaiter.new(:$!queue);
  231. loop {
  232. self!run-one(nqp::shift($queue));
  233. }
  234. });
  235. }
  236. }
  237. my class TimerWorker does Worker {
  238. has Queue $!queue;
  239. submethod BUILD(Queue :$queue!, :$!scheduler!) {
  240. $!queue := $queue;
  241. $!thread = Thread.start(:app_lifetime, :name<TimerWorker>, {
  242. my $*AWAITER := ThreadPoolAwaiter.new(:$!queue);
  243. loop {
  244. self!run-one(nqp::shift($queue));
  245. }
  246. });
  247. }
  248. }
  249. my class AffinityWorker does Worker {
  250. has Queue $.queue;
  251. submethod BUILD(:$!scheduler!) {
  252. my $queue := $!queue := Queue.CREATE;
  253. $!thread = Thread.start(:app_lifetime, :name<AffinityWorker>, {
  254. my $*AWAITER := ThreadPoolAwaiter.new(:$!queue);
  255. loop {
  256. self!run-one(nqp::shift($queue));
  257. }
  258. });
  259. }
  260. }
  261. # Initial and maximum threads allowed.
  262. has Int $.initial_threads;
  263. has Int $.max_threads;
  264. # All of the worker and queue state below is guarded by this lock.
  265. has Lock $!state-lock = Lock.new;
  266. # The general queue and timer queue, if created.
  267. has Queue $!general-queue;
  268. has Queue $!timer-queue;
  269. # The current lists of workers. Immutable lists; new ones are produced
  270. # upon changes.
  271. has $!general-workers;
  272. has $!timer-workers;
  273. has $!affinity-workers;
  274. # The supervisor thread, if started.
  275. has Thread $!supervisor;
  276. method !general-queue() {
  277. unless $!general-queue.DEFINITE {
  278. $!state-lock.protect: {
  279. unless $!general-queue.DEFINITE {
  280. # We don't have any workers yet, so start one.
  281. $!general-queue := nqp::create(Queue);
  282. $!general-workers := first-worker(
  283. GeneralWorker.new(
  284. queue => $!general-queue,
  285. scheduler => self
  286. )
  287. );
  288. scheduler-debug "Created initial general worker thread";
  289. self!maybe-start-supervisor();
  290. }
  291. }
  292. }
  293. $!general-queue
  294. }
  295. method !timer-queue() {
  296. unless $!timer-queue.DEFINITE {
  297. $!state-lock.protect: {
  298. unless $!timer-queue.DEFINITE {
  299. # We don't have any workers yet, so start one.
  300. $!timer-queue := nqp::create(Queue);
  301. $!timer-workers := first-worker(
  302. TimerWorker.new(
  303. queue => $!timer-queue,
  304. scheduler => self
  305. )
  306. );
  307. scheduler-debug "Created initial timer worker thread";
  308. self!maybe-start-supervisor();
  309. }
  310. }
  311. }
  312. $!timer-queue
  313. }
  314. constant @affinity-add-thresholds = 1, 5, 10, 20, 50, 100;
  315. method !affinity-queue() {
  316. # If there's no affinity workers, start one.
  317. my $cur-affinity-workers := $!affinity-workers;
  318. if $cur-affinity-workers.elems == 0 {
  319. $!state-lock.protect: {
  320. if $!affinity-workers.elems == 0 {
  321. # We don't have any affinity workers yet, so start one
  322. # and return its queue.
  323. $!affinity-workers := first-worker(
  324. AffinityWorker.new(
  325. scheduler => self
  326. )
  327. );
  328. scheduler-debug "Created initial affinity worker thread";
  329. self!maybe-start-supervisor();
  330. return $!affinity-workers[0].queue;
  331. }
  332. }
  333. $cur-affinity-workers := $!affinity-workers; # lost race for first
  334. }
  335. # Otherwise, see which has the least load (this is inherently racey
  336. # and approximate, but enough to help us avoid a busy worker). If we
  337. # find an empty queue, return it immediately.
  338. my $most-free-worker;
  339. my int $i = -1;
  340. nqp::while(
  341. ++$i < nqp::elems($cur-affinity-workers),
  342. nqp::if(
  343. $most-free-worker.DEFINITE,
  344. nqp::stmts(
  345. (my $cand := nqp::atpos($cur-affinity-workers,$i)),
  346. nqp::unless(
  347. (my $queue := $cand.queue).elems,
  348. (return $queue)
  349. ),
  350. nqp::if(
  351. nqp::islt_i($queue.elems,$most-free-worker.queue.elems),
  352. $most-free-worker := $cand
  353. )
  354. ),
  355. ($most-free-worker := nqp::atpos($cur-affinity-workers,$i))
  356. )
  357. );
  358. # Otherwise, check if the queue beats the threshold to add another
  359. # worker thread.
  360. my $chosen-queue := $most-free-worker.queue;
  361. my $threshold = @affinity-add-thresholds[
  362. ($cur-affinity-workers.elems min @affinity-add-thresholds) - 1
  363. ];
  364. if $chosen-queue.elems > $threshold {
  365. # Add another one, unless another thread did too.
  366. $!state-lock.protect: {
  367. if self!total-workers() >= $!max_threads {
  368. scheduler-debug "Will not add extra affinity worker; hit $!max_threads thread limit";
  369. return $chosen-queue;
  370. }
  371. if $cur-affinity-workers.elems != $!affinity-workers.elems {
  372. return $chosen-queue;
  373. }
  374. my $new-worker := AffinityWorker.new(scheduler => self);
  375. $!affinity-workers := push-worker($!affinity-workers,$new-worker);
  376. scheduler-debug "Added an affinity worker thread";
  377. $new-worker.queue
  378. }
  379. }
  380. else {
  381. $chosen-queue
  382. }
  383. }
  384. # Initializing a worker list with a worker, is straightforward and devoid
  385. # of concurrency issues, as we're already in protected code when we do this.
  386. sub first-worker(\first) is raw {
  387. my $workers := nqp::create(IterationBuffer);
  388. nqp::push($workers,first);
  389. $workers
  390. }
  391. # Since the worker lists can be changed during copying, we need to
  392. # just take whatever we can get and assume that it may be gone by
  393. # the time we get to it.
  394. sub push-worker(\workers, \to-push) is raw {
  395. my $new-workers := nqp::clone(workers);
  396. nqp::push($new-workers,to-push);
  397. $new-workers
  398. }
  399. # The supervisor sits in a loop, mostly sleeping. Each time it wakes up,
  400. # it takes stock of the current situation and decides whether or not to
  401. # add threads.
  402. my constant SUPERVISION_INTERVAL = 1e-2;
  403. my constant NUM_SAMPLES = 5;
  404. my constant EXHAUSTED_RETRY_AFTER = 100;
  405. method !maybe-start-supervisor(--> Nil) {
  406. unless $!supervisor.DEFINITE {
  407. $!supervisor = Thread.start(:app_lifetime, :name<Supervisor>, {
  408. sub add-general-worker(--> Nil) {
  409. $!state-lock.protect: {
  410. $!general-workers := push-worker(
  411. $!general-workers,
  412. GeneralWorker.new(
  413. queue => $!general-queue,
  414. scheduler => self
  415. )
  416. );
  417. }
  418. scheduler-debug "Added a general worker thread";
  419. }
  420. sub add-timer-worker(--> Nil) {
  421. $!state-lock.protect: {
  422. $!timer-workers := push-worker(
  423. $!timer-workers,
  424. TimerWorker.new(
  425. queue => $!timer-queue,
  426. scheduler => self
  427. )
  428. );
  429. }
  430. scheduler-debug "Added a timer worker thread";
  431. }
  432. sub getrusage-total() is raw {
  433. my \rusage = nqp::getrusage();
  434. nqp::atpos_i(rusage, nqp::const::RUSAGE_UTIME_SEC) * 1000000
  435. + nqp::atpos_i(rusage, nqp::const::RUSAGE_UTIME_MSEC)
  436. + nqp::atpos_i(rusage, nqp::const::RUSAGE_STIME_SEC) * 1000000
  437. + nqp::atpos_i(rusage, nqp::const::RUSAGE_STIME_MSEC)
  438. }
  439. scheduler-debug "Supervisor started";
  440. my num $last-rusage-time = nqp::time_n;
  441. my int $last-usage = getrusage-total;
  442. my num @last-utils = 0e0 xx NUM_SAMPLES;
  443. my int $cpu-cores = nqp::cpucores();
  444. # These definitions used to live inside the supervisor loop.
  445. # Moving them out of the loop does not improve CPU usage
  446. # noticably, but does seem to save about 3M of memory for
  447. # every 10 seconds of runtime. Whether this is an actual
  448. # leak, or just less churn on garbage collection, remains
  449. # unclear until we have profiling options that also work
  450. # when multiple threads are running.
  451. my int $exhausted;
  452. my num $now;
  453. my num $rusage-period;
  454. my int $current-usage;
  455. my int $usage-delta;
  456. my num $normalized-delta;
  457. my num $per-core;
  458. my num $per-core-util;
  459. my $smooth-per-core-util;
  460. scheduler-debug "Supervisor thinks there are $cpu-cores CPU cores";
  461. loop {
  462. # Wait until the next time we should check how things
  463. # are.
  464. nqp::sleep(SUPERVISION_INTERVAL);
  465. # Work out the delta of CPU usage since last supervision
  466. # and the time period that measurement spans.
  467. $now = nqp::time_n;
  468. $rusage-period = $now - $last-rusage-time;
  469. $last-rusage-time = $now;
  470. $current-usage = getrusage-total();
  471. $usage-delta = $current-usage - $last-usage;
  472. $last-usage = $current-usage;
  473. # Scale this by the time between rusage calls and turn it
  474. # into a per-core utilization percentage.
  475. $normalized-delta = $usage-delta / $rusage-period;
  476. $per-core = $normalized-delta / $cpu-cores;
  477. $per-core-util = 100 * ($per-core / (1000000 * NUM_SAMPLES));
  478. # Since those values are noisy, average the last
  479. # NUM_SAMPLES values to get a smoothed value.
  480. nqp::shift_n(@last-utils);
  481. nqp::push_n(@last-utils,$per-core-util);
  482. $smooth-per-core-util = @last-utils.sum;
  483. scheduler-debug-status "Per-core utilization (approx): $smooth-per-core-util%"
  484. if $scheduler-debug-status;
  485. # exhausted the system allotment of low level threads
  486. if $exhausted {
  487. $exhausted = 0 # for next run of supervisor
  488. if ++$exhausted > EXHAUSTED_RETRY_AFTER;
  489. }
  490. # we can still add threads if necessary
  491. else {
  492. self!tweak-workers($!general-queue, $!general-workers,
  493. &add-general-worker, $cpu-cores, $smooth-per-core-util)
  494. if $!general-queue.DEFINITE && $!general-queue.elems;
  495. self!tweak-workers($!timer-queue, $!timer-workers,
  496. &add-timer-worker, $cpu-cores, $smooth-per-core-util)
  497. if $!timer-queue.DEFINITE && $!timer-queue.elems;
  498. }
  499. # always need to prod affinity workers
  500. self!prod-affinity-workers: $!affinity-workers
  501. if $!affinity-workers.DEFINITE;
  502. CATCH {
  503. when X::Exhausted {
  504. $exhausted = 1;
  505. scheduler-debug .message;
  506. scheduler-debug "Refraining from trying to start new threads";
  507. }
  508. default {
  509. scheduler-debug .gist;
  510. }
  511. }
  512. }
  513. });
  514. }
  515. }
  516. method !prod-affinity-workers (\worker-list --> Nil) {
  517. for ^worker-list.elems {
  518. my $worker := worker-list[$_];
  519. if $worker.working {
  520. $worker.take-completed;
  521. # If an affinity worker completed nothing for some time,
  522. # steal an item from its queue, moving it to general queue.
  523. # This resolves deadlocks in certain cases.
  524. if $worker.times-nothing-completed > 10 {
  525. scheduler-debug "Stealing queue from affinity worker";
  526. my $item := nqp::queuepoll($worker.queue);
  527. nqp::push(self!general-queue, $item)
  528. unless nqp::isnull($item);
  529. }
  530. }
  531. }
  532. }
  533. # Tweak workers for non-empty queues
  534. method !tweak-workers(\queue, \worker-list, &add-worker, $cores, $per-core-util) {
  535. # Go through the worker list. If something is not working, then there
  536. # is at least one worker free to process things in the queue, so we
  537. # don't need to add one.
  538. my int $total-completed;
  539. my int $total-times-nothing-completed;
  540. my int $i = -1;
  541. nqp::while(
  542. ++$i < nqp::elems(worker-list),
  543. nqp::if(
  544. (my $worker := nqp::atpos(worker-list,$i)).working,
  545. nqp::stmts(
  546. ($total-completed += $worker.take-completed),
  547. ($total-times-nothing-completed += $worker.times-nothing-completed)
  548. ),
  549. return
  550. )
  551. );
  552. sub heuristic-check-for-deadlock(--> Nil) {
  553. my int $average-times-nothing-completed
  554. = $total-times-nothing-completed div (worker-list.elems || 1);
  555. if $average-times-nothing-completed > 20 {
  556. scheduler-debug "Heuristic queue progress deadlock situation detected";
  557. add-worker();
  558. }
  559. }
  560. # If we didn't complete anything, then consider adding more threads.
  561. my int $total-workers = self!total-workers();
  562. if $total-completed == 0 {
  563. if $total-workers < $!max_threads {
  564. # There's something in the queue and we haven't completed it.
  565. # If we are still below the CPU core count, just add a worker.
  566. if $total-workers < $cores {
  567. add-worker();
  568. }
  569. # Otherwise, consider utilization. If it's very little then a
  570. # further thread may be needed for deadlock breaking.
  571. elsif $per-core-util < 2 {
  572. scheduler-debug "Heuristic low utilization deadlock situation detected";
  573. add-worker();
  574. }
  575. # Another form of deadlock can happen when one kind of queue
  576. # is being processed but another is not. In that case, the
  577. # number of iterations since nothing was completed by any
  578. # worker will grow.
  579. else {
  580. heuristic-check-for-deadlock
  581. }
  582. }
  583. else {
  584. scheduler-debug "Will not add extra worker; hit $!max_threads thread limit [branch with 0 total completed]";
  585. }
  586. }
  587. elsif $total-times-nothing-completed > 20*$cores {
  588. if $total-workers < $!max_threads {
  589. heuristic-check-for-deadlock
  590. }
  591. else {
  592. scheduler-debug "Will not add extra worker; hit $!max_threads thread limit [branch with some total completed]";
  593. }
  594. }
  595. }
  596. method !total-workers() is raw {
  597. nqp::elems($!general-workers)
  598. + nqp::elems($!timer-workers)
  599. + nqp::elems($!affinity-workers)
  600. }
  601. submethod BUILD(
  602. Int :$!initial_threads = 0,
  603. Int :$!max_threads = (%*ENV<RAKUDO_MAX_THREADS> // 64).Int
  604. --> Nil
  605. ) {
  606. die "Initial thread pool threads ($!initial_threads) must be less than or equal to maximum threads ($!max_threads)"
  607. if $!initial_threads > $!max_threads;
  608. $!general-workers := nqp::create(IterationBuffer);
  609. $!timer-workers := nqp::create(IterationBuffer);
  610. $!affinity-workers := nqp::create(IterationBuffer);
  611. if $!initial_threads > 0 {
  612. # We've been asked to make some initial threads; we interpret this
  613. # as general workers.
  614. $!general-queue := nqp::create(Queue);
  615. nqp::push(
  616. $!general-workers,
  617. GeneralWorker.new(
  618. queue => $!general-queue,
  619. scheduler => self
  620. )
  621. ) for ^$!initial_threads;
  622. scheduler-debug "Created scheduler with $!initial_threads initial general workers";
  623. self!maybe-start-supervisor();
  624. }
  625. else {
  626. scheduler-debug "Created scheduler without initial general workers";
  627. }
  628. }
  629. method queue(Bool :$hint-time-sensitive, :$hint-affinity) {
  630. if $hint-affinity {
  631. self!affinity-queue()
  632. }
  633. elsif $hint-time-sensitive {
  634. self!timer-queue()
  635. }
  636. else {
  637. self!general-queue()
  638. }
  639. }
  640. my class TimerCancellation is repr('AsyncTask') { }
  641. method cue(&code, :$at, :$in, :$every, :$times = 1, :&stop is copy, :&catch ) {
  642. die "Cannot specify :at and :in at the same time"
  643. if $at.defined and $in.defined;
  644. die "Cannot specify :every, :times and :stop at the same time"
  645. if $every.defined and $times > 1 and &stop;
  646. # For $in/$at times, if the resultant delay is less than 0.001 (including
  647. # negatives) equate those to zero. For $every intervals, we convert
  648. # such values to minimum resolution of 0.001 and warn about that
  649. sub to-millis(Numeric() $value, $allow-zero = False) {
  650. my $proposed := (1000 * $value).Int;
  651. $proposed > 0 ?? $proposed
  652. !! $allow-zero ?? 0
  653. !! do {warn "Minimum timer resolution is 1ms; using that "
  654. ~ "instead of {1000 * $value}ms";
  655. 1}
  656. }
  657. my $delay = to-millis ($at ?? $at - now !! $in // 0), True;
  658. # Wrap any catch handler around the code to run.
  659. my &run := &catch ?? wrap-catch(&code, &catch) !! &code;
  660. # need repeating
  661. if $every {
  662. # generate a stopper if needed
  663. if $times > 1 {
  664. my $todo = $times;
  665. &stop = sub { $todo ?? !$todo-- !! True }
  666. }
  667. # we have a stopper
  668. if &stop {
  669. my $handle;
  670. my $cancellation;
  671. sub cancellation() {
  672. $cancellation //=
  673. Cancellation.new(async_handles => [$handle]);
  674. }
  675. $handle := nqp::timer(self!timer-queue(),
  676. { stop() ?? cancellation().cancel !! run() },
  677. $delay, to-millis($every),
  678. TimerCancellation);
  679. cancellation()
  680. }
  681. # no stopper
  682. else {
  683. my $handle := nqp::timer(self!timer-queue(), &run,
  684. $delay, to-millis($every),
  685. TimerCancellation);
  686. Cancellation.new(async_handles => [$handle])
  687. }
  688. }
  689. # only after waiting a bit or more than once
  690. elsif $delay or $times > 1 {
  691. my @async_handles;
  692. @async_handles.push(
  693. nqp::timer(self!timer-queue(), &run, $delay, 0, TimerCancellation)
  694. ) for 1 .. $times;
  695. Cancellation.new(:@async_handles)
  696. }
  697. # just cue the code
  698. else {
  699. nqp::push(self!general-queue(), &run);
  700. Nil
  701. }
  702. }
  703. sub wrap-catch(&code, &catch) {
  704. -> { code(); CATCH { default { catch($_) } } }
  705. }
  706. method loads() is raw {
  707. my int $loads = 0;
  708. $loads = $loads + $!general-queue.elems if $!general-queue;
  709. $loads = $loads + $!timer-queue.elems if $!timer-queue;
  710. my int $i = -1;
  711. nqp::while(
  712. ++$i < nqp::elems($!affinity-workers),
  713. $loads = $loads + nqp::atpos($!affinity-workers,$i).queue.elems
  714. );
  715. $loads
  716. }
  717. # Constants indexing into the data array
  718. my constant SUPERVISOR = 0;
  719. my constant GW = 1;
  720. my constant GTQ = 2;
  721. my constant GTC = 3;
  722. my constant TW = 4;
  723. my constant TTQ = 5;
  724. my constant TTC = 6;
  725. my constant AW = 7;
  726. my constant ATQ = 8;
  727. my constant ATC = 9;
  728. my constant COLUMNS = 10;
  729. # calculate number of tasks completed for a worker list
  730. sub completed(\workers) is raw {
  731. my int $elems = nqp::elems(workers);
  732. my int $completed;
  733. my int $i = -1;
  734. nqp::while(
  735. nqp::islt_i(($i = nqp::add_i($i,1)),$elems),
  736. nqp::stmts(
  737. (my $w := nqp::atpos(workers,$i)),
  738. ($completed = nqp::add_i(
  739. $completed,
  740. nqp::getattr_i($w,$w.WHAT,'$!total')
  741. ))
  742. )
  743. );
  744. $completed
  745. }
  746. proto method usage(|) {*}
  747. multi method usage(ThreadPoolScheduler:U:) is raw {
  748. nqp::setelems(nqp::list_i,COLUMNS)
  749. }
  750. multi method usage(ThreadPoolScheduler:D:) is raw {
  751. my $data := nqp::setelems(nqp::list_i,COLUMNS);
  752. nqp::bindpos_i($data,SUPERVISOR,1) if $!supervisor;
  753. if $!general-workers -> \workers {
  754. nqp::bindpos_i($data,GW,nqp::elems(workers));
  755. nqp::bindpos_i($data,GTQ,nqp::elems($!general-queue))
  756. if $!general-queue;
  757. nqp::bindpos_i($data,GTC,completed(workers));
  758. }
  759. if $!timer-workers -> \workers {
  760. nqp::bindpos_i($data,TW,nqp::elems(workers));
  761. nqp::bindpos_i($data,TTQ,nqp::elems($!timer-queue))
  762. if $!timer-queue;
  763. nqp::bindpos_i($data,TTC,completed(workers));
  764. }
  765. if $!affinity-workers -> \workers {
  766. my int $elems =
  767. nqp::bindpos_i($data,AW,nqp::elems(workers));
  768. my int $completed;
  769. my int $queued;
  770. my int $i = -1;
  771. nqp::while(
  772. nqp::islt_i(($i = nqp::add_i($i,1)),$elems),
  773. nqp::stmts(
  774. (my $w := nqp::atpos(workers,$i)),
  775. ($completed = nqp::add_i(
  776. $completed,
  777. nqp::getattr_i($w,$w.WHAT,'$!total')
  778. )),
  779. ($queued = nqp::add_i(
  780. $queued,
  781. nqp::elems(nqp::getattr($w,$w.WHAT,'$!queue'))
  782. ))
  783. )
  784. );
  785. nqp::bindpos_i($data,ATQ,$queued);
  786. nqp::bindpos_i($data,ATC,$completed);
  787. }
  788. # the final thing
  789. $data
  790. }
  791. }