Add worker pool implementation

This commit is contained in:
Surma
2020-09-14 17:05:02 +01:00
parent df45b996d1
commit 01c04d4a72
5 changed files with 130 additions and 46 deletions

7
cli/package-lock.json generated
View File

@@ -1,5 +1,5 @@
{ {
"name": "cli", "name": "@squoosh/cli",
"version": "0.1.3", "version": "0.1.3",
"lockfileVersion": 1, "lockfileVersion": 1,
"requires": true, "requires": true,
@@ -466,6 +466,11 @@
} }
} }
}, },
"web-streams-polyfill": {
"version": "3.0.0",
"resolved": "https://registry.npmjs.org/web-streams-polyfill/-/web-streams-polyfill-3.0.0.tgz",
"integrity": "sha512-tcZlIJ+VBxuDXdRFF3PCZTJ3yUISGklG4hkl3CDGOlZ8XwpN90L5YsJNoSPH72wZ4nbsatE/OfIaxfM3p+6W7w=="
},
"wrappy": { "wrappy": {
"version": "1.0.2", "version": "1.0.2",
"resolved": "https://registry.npmjs.org/wrappy/-/wrappy-1.0.2.tgz", "resolved": "https://registry.npmjs.org/wrappy/-/wrappy-1.0.2.tgz",

View File

@@ -13,7 +13,9 @@
"keywords": [], "keywords": [],
"author": "Surma <surma@surma.dev>", "author": "Surma <surma@surma.dev>",
"license": "Apache-2.0", "license": "Apache-2.0",
"dependencies": {}, "dependencies": {
"web-streams-polyfill": "^3.0.0"
},
"devDependencies": { "devDependencies": {
"@rollup/plugin-commonjs": "^15.0.0", "@rollup/plugin-commonjs": "^15.0.0",
"@rollup/plugin-node-resolve": "^9.0.0", "@rollup/plugin-node-resolve": "^9.0.0",

View File

@@ -19,9 +19,10 @@ export default {
cjs(), cjs(),
asset(), asset(),
json(), json(),
terser({ null &&
mangle: true terser({
}) mangle: true
})
], ],
external: [ external: [
"os", "os",

View File

@@ -1,12 +1,13 @@
import { program } from "commander"; import { program } from "commander";
import JSON5 from "json5"; import JSON5 from "json5";
import { Worker, isMainThread, parentPort } from "worker_threads"; import { isMainThread } from "worker_threads";
import { cpus } from "os"; import { cpus } from "os";
import { extname, join, basename } from "path"; import { extname, join, basename } from "path";
import { promises as fsp } from "fs"; import { promises as fsp } from "fs";
import { version } from "json:../package.json"; import { version } from "json:../package.json";
import supportedFormats from "./codecs.js"; import supportedFormats from "./codecs.js";
import WorkerPool from "./worker_pool.js";
// Our decoders currently rely on a `ImageData` global. // Our decoders currently rely on a `ImageData` global.
import ImageData from "./image_data.js"; import ImageData from "./image_data.js";
@@ -30,29 +31,6 @@ async function decodeFile(file) {
return rgba; return rgba;
} }
function uuid() {
return Array.from({ length: 16 }, () =>
Math.floor(Math.random() * 256).toString(16)
).join("");
}
// Adds a unique ID to the message payload and waits
// for the worker to respond with that id as a signal
// that the job is done.
function jobPromise(worker, msg) {
return new Promise(resolve => {
const id = uuid();
worker.postMessage(Object.assign(msg, { id }));
worker.on("message", function f(msg) {
if (msg.id !== id) {
return;
}
worker.off("message", f);
resolve(msg);
});
});
}
/* /*
const butteraugliGoal = 1.4; const butteraugliGoal = 1.4;
const maxRounds = 8; const maxRounds = 8;
@@ -107,12 +85,8 @@ const visdifModule = require("../codecs/visdif/visdif.js");
async function processFiles(files) { async function processFiles(files) {
// Create output directory // Create output directory
await fsp.mkdir(program.outputDir, { recursive: true }); await fsp.mkdir(program.outputDir, { recursive: true });
const pool = Array.from( const workerPool = new WorkerPool(cpus().length, __filename);
{ length: cpus().length },
() => new Worker(__filename)
);
let i = 0;
const jobs = [];
for (const file of files) { for (const file of files) {
const ext = extname(file); const ext = extname(file);
const base = basename(file, ext); const base = basename(file, ext);
@@ -128,20 +102,20 @@ async function processFiles(files) {
JSON5.parse(program[encName]) JSON5.parse(program[encName])
); );
const outputFile = join(program.outputDir, `${base}.${value.extension}`); const outputFile = join(program.outputDir, `${base}.${value.extension}`);
jobs.push( workerPool
jobPromise(pool[i], { .dispatchJob({
bitmap, bitmap,
outputFile, outputFile,
encName, encName,
encConfig encConfig
}) })
); .then(({ outputSize }) => {
i = (i + 1) % pool.length; console.log(`Written ${file}. Size: ${outputSize}`);
});
} }
} }
// Wait for all jobs to finish // Wait for all jobs to finish
await Promise.allSettled(jobs); await workerPool.join();
pool.forEach(worker => worker.terminate());
} }
if (isMainThread) { if (isMainThread) {
@@ -162,9 +136,9 @@ if (isMainThread) {
program.parse(process.argv); program.parse(process.argv);
} else { } else {
parentPort.on( WorkerPool.useThisThreadAsWorker(
"message", async ({ id, bitmap, outputFile, encName, encConfig }) => {
async ({ id, bitmap, outputFile, encName, encConfig, done }) => { console.log("received", { outputFile, encName });
const encoder = await supportedFormats[encName].enc(); const encoder = await supportedFormats[encName].enc();
const out = encoder.encode( const out = encoder.encode(
bitmap.data.buffer, bitmap.data.buffer,
@@ -173,8 +147,7 @@ if (isMainThread) {
encConfig encConfig
); );
await fsp.writeFile(outputFile, out); await fsp.writeFile(outputFile, out);
// Signal we are done with this job return { outputSize: out.length };
parentPort.postMessage({ id });
} }
); );
} }

103
cli/src/worker_pool.js Normal file
View File

@@ -0,0 +1,103 @@
import { Worker, parentPort } from "worker_threads";
import { TransformStream } from "web-streams-polyfill";
function uuid() {
return Array.from({ length: 16 }, () =>
Math.floor(Math.random() * 256).toString(16)
).join("");
}
function jobPromise(worker, msg) {
return new Promise(resolve => {
const id = uuid();
worker.postMessage({ msg, id });
worker.on("message", function f({ result, id: rid }) {
if (rid !== id) {
return;
}
worker.off("message", f);
resolve(result);
});
});
}
export default class WorkerPool {
constructor(numWorkers, workerFile) {
this.closing = false;
this.jobQueue = new TransformStream();
this.workerQueue = new TransformStream();
const writer = this.workerQueue.writable.getWriter();
for (let i = 0; i < numWorkers; i++) {
writer.write(new Worker(workerFile));
}
writer.releaseLock();
this.done = this._readLoop();
}
async _readLoop() {
const reader = this.jobQueue.readable.getReader();
while (true) {
const { value, done } = await reader.read();
if (done) {
this.workerQueue.writable.close();
await this._terminateAll();
return;
}
const { msg, resolve } = value;
const worker = await this._nextWorker();
jobPromise(worker, msg).then(result => {
resolve(result);
// If we are in the process of closing, `workerQueue` is
// already closed and we cant requeue the worker.
if (this.closing) {
worker.terminate();
return;
}
const writer = this.workerQueue.writable.getWriter();
writer.write(worker);
writer.releaseLock();
});
}
}
async _nextWorker() {
const reader = this.workerQueue.readable.getReader();
const { value, done } = await reader.read();
reader.releaseLock();
return value;
}
async _terminateAll() {
while (true) {
const worker = await this._nextWorker();
if (!worker) {
return;
}
worker.terminate();
}
}
async join() {
this.closing = true;
this.jobQueue.writable.close();
await this.done;
}
dispatchJob(msg) {
return new Promise(resolve => {
const writer = this.jobQueue.writable.getWriter();
writer.write({ msg, resolve });
writer.releaseLock();
});
}
static useThisThreadAsWorker(cb) {
parentPort.addEventListener("message", async ev => {
const { msg, id } = ev.data;
const result = await cb(msg);
parentPort.postMessage({ result, id });
});
}
}