From e7e205c326dfcc8c14760f42179013f91aa3ccb9 Mon Sep 17 00:00:00 2001 From: Surma Date: Tue, 5 Jan 2021 14:26:26 +0000 Subject: [PATCH] Simplify WorkerPool joining (closes #925) --- cli/src/worker_pool.js | 18 ++++-------------- 1 file changed, 4 insertions(+), 14 deletions(-) diff --git a/cli/src/worker_pool.js b/cli/src/worker_pool.js index 90830f5d..57ebbfa7 100644 --- a/cli/src/worker_pool.js +++ b/cli/src/worker_pool.js @@ -24,7 +24,7 @@ function jobPromise(worker, msg) { export default class WorkerPool { constructor(numWorkers, workerFile) { - this.closing = false; + this.numWorkers = numWorkers; this.jobQueue = new TransformStream(); this.workerQueue = new TransformStream(); @@ -42,7 +42,6 @@ export default class WorkerPool { while (true) { const { value, done } = await reader.read(); if (done) { - this.workerQueue.writable.close(); await this._terminateAll(); return; } @@ -50,12 +49,6 @@ export default class WorkerPool { const worker = await this._nextWorker(); jobPromise(worker, msg).then((result) => { resolve(result); - // If we are in the process of closing, `workerQueue` is - // already closed and we can’t requeue the worker. - if (this.closing) { - worker.terminate(); - return; - } const writer = this.workerQueue.writable.getWriter(); writer.write(worker); writer.releaseLock(); @@ -71,18 +64,15 @@ export default class WorkerPool { } async _terminateAll() { - while (true) { + for (let n = 0; n < this.numWorkers; n++) { const worker = await this._nextWorker(); - if (!worker) { - return; - } worker.terminate(); } + this.workerQueue.writable.close(); } async join() { - this.closing = true; - this.jobQueue.writable.close(); + this.jobQueue.writable.getWriter().close(); await this.done; }