Update code for the review comments

* Make decode module return value `ImageData`
* Fix global definition of ImageData
* Use concrete Encoder types for encode functions
* Use ArrayBufferView in FileLike instead of using a similar type
* Throw error when the `encode` functions
return null
* Use generic types for WorkerPool
* Fix `encode` function typing
in `index.ts`
* Remove ts-ignore for web-streams-polyfill
and handle nulls for TransformStream
* Fix rollup entry point (now we need to have
`index.ts` instead of `index.js`)
This commit is contained in:
ergunsh
2021-08-06 16:05:14 +03:00
parent de4eb9c8f7
commit fafcf97f0c
4 changed files with 82 additions and 56 deletions

View File

@@ -1,7 +1,5 @@
import { Worker, parentPort } from 'worker_threads';
// @ts-ignore
import { TransformStream } from 'web-streams-polyfill';
import type { JobMessage } from './index';
function uuid() {
return Array.from({ length: 16 }, () =>
@@ -9,28 +7,16 @@ function uuid() {
).join('');
}
function jobPromise(worker: Worker, msg: JobMessage) {
return new Promise((resolve, reject) => {
const id = uuid();
worker.postMessage({ msg, id });
worker.on('message', function f({ error, result, id: rid }) {
if (rid !== id) {
return;
}
if (error) {
reject(error);
return;
}
worker.off('message', f);
resolve(result);
});
});
interface Job<I> {
msg: I;
resolve: Function;
reject: Function;
}
export default class WorkerPool {
export default class WorkerPool<I, O> {
public numWorkers: number;
public jobQueue: TransformStream;
public workerQueue: TransformStream;
public jobQueue: TransformStream<Job<I>, Job<I>>;
public workerQueue: TransformStream<Worker, Worker>;
public done: Promise<void>;
constructor(numWorkers: number, workerFile: string) {
@@ -55,9 +41,14 @@ export default class WorkerPool {
await this._terminateAll();
return;
}
if (!value) {
throw new Error('Reader did not return any value');
}
const { msg, resolve, reject } = value;
const worker = await this._nextWorker();
jobPromise(worker, msg)
this.jobPromise(worker, msg)
.then((result) => resolve(result))
.catch((reason) => reject(reason))
.finally(() => {
@@ -73,6 +64,10 @@ export default class WorkerPool {
const reader = this.workerQueue.readable.getReader();
const { value } = await reader.read();
reader.releaseLock();
if (!value) {
throw new Error('No worker left');
}
return value;
}
@@ -89,7 +84,7 @@ export default class WorkerPool {
await this.done;
}
dispatchJob(msg: JobMessage): Promise<any> {
dispatchJob(msg: I): Promise<O> {
return new Promise((resolve, reject) => {
const writer = this.jobQueue.writable.getWriter();
writer.write({ msg, resolve, reject });
@@ -97,7 +92,25 @@ export default class WorkerPool {
});
}
static useThisThreadAsWorker(cb: (msg: JobMessage) => any) {
private jobPromise(worker: Worker, msg: I) {
return new Promise((resolve, reject) => {
const id = uuid();
worker.postMessage({ msg, id });
worker.on('message', function f({ error, result, id: rid }) {
if (rid !== id) {
return;
}
if (error) {
reject(error);
return;
}
worker.off('message', f);
resolve(result);
});
});
}
static useThisThreadAsWorker<I, O>(cb: (msg: I) => O) {
parentPort!.on('message', async (data) => {
const { msg, id } = data;
try {