diff --git a/cli/src/index.js b/cli/src/index.js index d7c97eda..390dc43d 100644 --- a/cli/src/index.js +++ b/cli/src/index.js @@ -183,31 +183,36 @@ function progressTracker(results) { return tracker; } -async function checkInputFilesValid(files) { +async function getInputFiles(paths) { const validFiles = []; - for (const file of files) { - try { - await fsp.stat(file); - } catch (err) { - if (err.code === 'ENOENT') { - console.warn( - `Warning: Input file does not exist: ${resolvePath(file)}`, - ); - continue; - } else { - throw err; + for (const path of paths) { + const files = (await fsp.lstat(path)).isDirectory() + ? (await fsp.readdir(path)).map(file => join(path, file)) + : [path]; + for (const file of files) { + try { + await fsp.stat(file); + } catch (err) { + if (err.code === 'ENOENT') { + console.warn( + `Warning: Input file does not exist: ${resolvePath(file)}`, + ); + continue; + } else { + throw err; + } } - } - validFiles.push(file); + validFiles.push(file); + } } return validFiles; } async function processFiles(files) { - files = await checkInputFilesValid(files); + files = await getInputFiles(files); const parallelism = cpus().length; diff --git a/cli/src/worker_pool.js b/cli/src/worker_pool.js index 90830f5d..57ebbfa7 100644 --- a/cli/src/worker_pool.js +++ b/cli/src/worker_pool.js @@ -24,7 +24,7 @@ function jobPromise(worker, msg) { export default class WorkerPool { constructor(numWorkers, workerFile) { - this.closing = false; + this.numWorkers = numWorkers; this.jobQueue = new TransformStream(); this.workerQueue = new TransformStream(); @@ -42,7 +42,6 @@ export default class WorkerPool { while (true) { const { value, done } = await reader.read(); if (done) { - this.workerQueue.writable.close(); await this._terminateAll(); return; } @@ -50,12 +49,6 @@ export default class WorkerPool { const worker = await this._nextWorker(); jobPromise(worker, msg).then((result) => { resolve(result); - // If we are in the process of closing, `workerQueue` is - // already closed and we can’t requeue the worker. - if (this.closing) { - worker.terminate(); - return; - } const writer = this.workerQueue.writable.getWriter(); writer.write(worker); writer.releaseLock(); @@ -71,18 +64,15 @@ export default class WorkerPool { } async _terminateAll() { - while (true) { + for (let n = 0; n < this.numWorkers; n++) { const worker = await this._nextWorker(); - if (!worker) { - return; - } worker.terminate(); } + this.workerQueue.writable.close(); } async join() { - this.closing = true; - this.jobQueue.writable.close(); + this.jobQueue.writable.getWriter().close(); await this.done; }