From 01c04d4a72c34c1f986978a01bbbf064d1cc28d7 Mon Sep 17 00:00:00 2001 From: Surma Date: Mon, 14 Sep 2020 17:05:02 +0100 Subject: [PATCH] Add worker pool implementation --- cli/package-lock.json | 7 ++- cli/package.json | 4 +- cli/rollup.config.js | 7 +-- cli/src/index.js | 55 ++++++---------------- cli/src/worker_pool.js | 103 +++++++++++++++++++++++++++++++++++++++++ 5 files changed, 130 insertions(+), 46 deletions(-) create mode 100644 cli/src/worker_pool.js diff --git a/cli/package-lock.json b/cli/package-lock.json index 774ae372..5e0e7afe 100644 --- a/cli/package-lock.json +++ b/cli/package-lock.json @@ -1,5 +1,5 @@ { - "name": "cli", + "name": "@squoosh/cli", "version": "0.1.3", "lockfileVersion": 1, "requires": true, @@ -466,6 +466,11 @@ } } }, + "web-streams-polyfill": { + "version": "3.0.0", + "resolved": "https://registry.npmjs.org/web-streams-polyfill/-/web-streams-polyfill-3.0.0.tgz", + "integrity": "sha512-tcZlIJ+VBxuDXdRFF3PCZTJ3yUISGklG4hkl3CDGOlZ8XwpN90L5YsJNoSPH72wZ4nbsatE/OfIaxfM3p+6W7w==" + }, "wrappy": { "version": "1.0.2", "resolved": "https://registry.npmjs.org/wrappy/-/wrappy-1.0.2.tgz", diff --git a/cli/package.json b/cli/package.json index 188bac0c..d8e7ec39 100644 --- a/cli/package.json +++ b/cli/package.json @@ -13,7 +13,9 @@ "keywords": [], "author": "Surma ", "license": "Apache-2.0", - "dependencies": {}, + "dependencies": { + "web-streams-polyfill": "^3.0.0" + }, "devDependencies": { "@rollup/plugin-commonjs": "^15.0.0", "@rollup/plugin-node-resolve": "^9.0.0", diff --git a/cli/rollup.config.js b/cli/rollup.config.js index d4543b01..e5373e73 100644 --- a/cli/rollup.config.js +++ b/cli/rollup.config.js @@ -19,9 +19,10 @@ export default { cjs(), asset(), json(), - terser({ - mangle: true - }) + null && + terser({ + mangle: true + }) ], external: [ "os", diff --git a/cli/src/index.js b/cli/src/index.js index 5a11156d..6d1303ef 100644 --- a/cli/src/index.js +++ b/cli/src/index.js @@ -1,12 +1,13 @@ import { program } from "commander"; import JSON5 from "json5"; -import { Worker, isMainThread, parentPort } from "worker_threads"; +import { isMainThread } from "worker_threads"; import { cpus } from "os"; import { extname, join, basename } from "path"; import { promises as fsp } from "fs"; import { version } from "json:../package.json"; import supportedFormats from "./codecs.js"; +import WorkerPool from "./worker_pool.js"; // Our decoders currently rely on a `ImageData` global. import ImageData from "./image_data.js"; @@ -30,29 +31,6 @@ async function decodeFile(file) { return rgba; } -function uuid() { - return Array.from({ length: 16 }, () => - Math.floor(Math.random() * 256).toString(16) - ).join(""); -} - -// Adds a unique ID to the message payload and waits -// for the worker to respond with that id as a signal -// that the job is done. -function jobPromise(worker, msg) { - return new Promise(resolve => { - const id = uuid(); - worker.postMessage(Object.assign(msg, { id })); - worker.on("message", function f(msg) { - if (msg.id !== id) { - return; - } - worker.off("message", f); - resolve(msg); - }); - }); -} - /* const butteraugliGoal = 1.4; const maxRounds = 8; @@ -107,12 +85,8 @@ const visdifModule = require("../codecs/visdif/visdif.js"); async function processFiles(files) { // Create output directory await fsp.mkdir(program.outputDir, { recursive: true }); - const pool = Array.from( - { length: cpus().length }, - () => new Worker(__filename) - ); - let i = 0; - const jobs = []; + const workerPool = new WorkerPool(cpus().length, __filename); + for (const file of files) { const ext = extname(file); const base = basename(file, ext); @@ -128,20 +102,20 @@ async function processFiles(files) { JSON5.parse(program[encName]) ); const outputFile = join(program.outputDir, `${base}.${value.extension}`); - jobs.push( - jobPromise(pool[i], { + workerPool + .dispatchJob({ bitmap, outputFile, encName, encConfig }) - ); - i = (i + 1) % pool.length; + .then(({ outputSize }) => { + console.log(`Written ${file}. Size: ${outputSize}`); + }); } } // Wait for all jobs to finish - await Promise.allSettled(jobs); - pool.forEach(worker => worker.terminate()); + await workerPool.join(); } if (isMainThread) { @@ -162,9 +136,9 @@ if (isMainThread) { program.parse(process.argv); } else { - parentPort.on( - "message", - async ({ id, bitmap, outputFile, encName, encConfig, done }) => { + WorkerPool.useThisThreadAsWorker( + async ({ id, bitmap, outputFile, encName, encConfig }) => { + console.log("received", { outputFile, encName }); const encoder = await supportedFormats[encName].enc(); const out = encoder.encode( bitmap.data.buffer, @@ -173,8 +147,7 @@ if (isMainThread) { encConfig ); await fsp.writeFile(outputFile, out); - // Signal we are done with this job - parentPort.postMessage({ id }); + return { outputSize: out.length }; } ); } diff --git a/cli/src/worker_pool.js b/cli/src/worker_pool.js new file mode 100644 index 00000000..e14374f6 --- /dev/null +++ b/cli/src/worker_pool.js @@ -0,0 +1,103 @@ +import { Worker, parentPort } from "worker_threads"; +import { TransformStream } from "web-streams-polyfill"; + +function uuid() { + return Array.from({ length: 16 }, () => + Math.floor(Math.random() * 256).toString(16) + ).join(""); +} + +function jobPromise(worker, msg) { + return new Promise(resolve => { + const id = uuid(); + worker.postMessage({ msg, id }); + worker.on("message", function f({ result, id: rid }) { + if (rid !== id) { + return; + } + worker.off("message", f); + resolve(result); + }); + }); +} + +export default class WorkerPool { + constructor(numWorkers, workerFile) { + this.closing = false; + this.jobQueue = new TransformStream(); + this.workerQueue = new TransformStream(); + + const writer = this.workerQueue.writable.getWriter(); + for (let i = 0; i < numWorkers; i++) { + writer.write(new Worker(workerFile)); + } + writer.releaseLock(); + + this.done = this._readLoop(); + } + + async _readLoop() { + const reader = this.jobQueue.readable.getReader(); + while (true) { + const { value, done } = await reader.read(); + if (done) { + this.workerQueue.writable.close(); + await this._terminateAll(); + return; + } + const { msg, resolve } = value; + const worker = await this._nextWorker(); + jobPromise(worker, msg).then(result => { + resolve(result); + // If we are in the process of closing, `workerQueue` is + // already closed and we can’t requeue the worker. + if (this.closing) { + worker.terminate(); + return; + } + const writer = this.workerQueue.writable.getWriter(); + writer.write(worker); + writer.releaseLock(); + }); + } + } + + async _nextWorker() { + const reader = this.workerQueue.readable.getReader(); + const { value, done } = await reader.read(); + reader.releaseLock(); + return value; + } + + async _terminateAll() { + while (true) { + const worker = await this._nextWorker(); + if (!worker) { + return; + } + worker.terminate(); + } + } + + async join() { + this.closing = true; + this.jobQueue.writable.close(); + await this.done; + } + + dispatchJob(msg) { + return new Promise(resolve => { + const writer = this.jobQueue.writable.getWriter(); + writer.write({ msg, resolve }); + writer.releaseLock(); + }); + } + + static useThisThreadAsWorker(cb) { + parentPort.addEventListener("message", async ev => { + const { msg, id } = ev.data; + const result = await cb(msg); + parentPort.postMessage({ result, id }); + }); + } +}