Switch to crossbeam-channel

Still not perfect due to usage of a static global, but this is much cleaner and more efficient thanks to proper blocking of Workers that wait for new messages instead of a manual spin-loop.
This commit is contained in:
Ingvar Stepanyan
2020-04-29 15:29:57 +01:00
committed by Ingvar Stepanyan
parent 9420dba3bc
commit 47f9d22dd8
18 changed files with 331 additions and 85 deletions

View File

@@ -464,7 +464,7 @@ checksum = "388a1df253eca08550bef6c72392cfe7c30914bf41df5269b68cbd6ff8f570a3"
name = "squoosh-oxipng"
version = "0.1.0"
dependencies = [
"crossbeam-deque",
"crossbeam-channel",
"log",
"once_cell",
"oxipng",

View File

@@ -13,15 +13,15 @@ oxipng = { version = "3.0.0", default-features = false, features = ["parallel"]
wasm-bindgen = "0.2.64"
log = { version = "0.4", features = ["release_max_level_off"] }
rayon = { version = "1.3.0", optional = true }
crossbeam-deque = { version = "0.7.3", optional = true }
once_cell = { version = "1.3.1", optional = true }
crossbeam-channel = { version = "0.4.2", optional = true }
[profile.release]
lto = true
opt-level = "s"
[features]
parallel = ["oxipng/parallel", "rayon", "crossbeam-deque", "once_cell"]
parallel = ["oxipng/parallel", "rayon", "crossbeam-channel", "once_cell"]
[package.metadata.wasm-pack.profile.release]
wasm-opt = ["-O", "--no-validation"]

View File

@@ -0,0 +1,5 @@
# OxiPNG
- Source: <https://github.com/shssoichiro/oxipng>
- Version: v3.0.0
- License: MIT

View File

@@ -1,32 +1,32 @@
/* tslint:disable */
/* eslint-disable */
/**
* @returns {any}
*/
export function worker_initializer(): any;
/**
* @param {number} num
*/
export function start_main_thread(num: number): void;
/**
*/
export function start_worker_thread(): void;
/**
* @param {Uint8Array} data
* @param {number} level
* @returns {Uint8Array}
*/
export function optimise(data: Uint8Array, level: number): Uint8Array;
/**
* @param {number} num
* @returns {any}
*/
export function worker_initializer(num: number): any;
/**
*/
export function start_main_thread(): void;
/**
*/
export function start_worker_thread(): void;
export type InitInput = RequestInfo | URL | Response | BufferSource | WebAssembly.Module;
export interface InitOutput {
readonly worker_initializer: () => number;
readonly start_main_thread: (a: number) => void;
readonly start_worker_thread: () => void;
readonly optimise: (a: number, b: number, c: number, d: number) => void;
readonly malloc: (a: number) => number;
readonly free: (a: number) => void;
readonly worker_initializer: (a: number) => number;
readonly start_main_thread: () => void;
readonly start_worker_thread: () => void;
readonly __wbindgen_export_0: WebAssembly.Memory;
readonly __wbindgen_malloc: (a: number) => number;
readonly __wbindgen_free: (a: number, b: number) => void;

View File

@@ -33,40 +33,6 @@ function getStringFromWasm0(ptr, len) {
return cachedTextDecoder.decode(getUint8Memory0().slice(ptr, ptr + len));
}
function getObject(idx) { return heap[idx]; }
function dropObject(idx) {
if (idx < 36) return;
heap[idx] = heap_next;
heap_next = idx;
}
function takeObject(idx) {
const ret = getObject(idx);
dropObject(idx);
return ret;
}
/**
* @returns {any}
*/
export function worker_initializer() {
var ret = wasm.worker_initializer();
return takeObject(ret);
}
/**
* @param {number} num
*/
export function start_main_thread(num) {
wasm.start_main_thread(num);
}
/**
*/
export function start_worker_thread() {
wasm.start_worker_thread();
}
let WASM_VECTOR_LEN = 0;
function passArray8ToWasm0(arg, malloc) {
@@ -103,6 +69,40 @@ export function optimise(data, level) {
return v1;
}
function getObject(idx) { return heap[idx]; }
function dropObject(idx) {
if (idx < 36) return;
heap[idx] = heap_next;
heap_next = idx;
}
function takeObject(idx) {
const ret = getObject(idx);
dropObject(idx);
return ret;
}
/**
* @param {number} num
* @returns {any}
*/
export function worker_initializer(num) {
var ret = wasm.worker_initializer(num);
return takeObject(ret);
}
/**
*/
export function start_main_thread() {
wasm.start_main_thread();
}
/**
*/
export function start_worker_thread() {
wasm.start_worker_thread();
}
async function load(module, imports, maybe_memory) {
if (typeof Response === 'function' && module instanceof Response) {
memory = imports.wbg.memory = new WebAssembly.Memory({initial:17,maximum:16384,shared:true});

View File

@@ -1,11 +1,11 @@
/* tslint:disable */
/* eslint-disable */
export function worker_initializer(): number;
export function start_main_thread(a: number): void;
export function start_worker_thread(): void;
export function optimise(a: number, b: number, c: number, d: number): void;
export function malloc(a: number): number;
export function free(a: number): void;
export function worker_initializer(a: number): number;
export function start_main_thread(): void;
export function start_worker_thread(): void;
export const __wbindgen_export_0: WebAssembly.Memory;
export function __wbindgen_malloc(a: number): number;
export function __wbindgen_free(a: number, b: number): void;

View File

@@ -1,15 +1,15 @@
{
"name": "oxipng",
"name": "squoosh-oxipng",
"collaborators": [
"Ingvar Stepanyan <me@rreverser.com>"
],
"version": "0.1.0",
"files": [
"oxipng_bg.wasm",
"oxipng.js",
"oxipng.d.ts"
"squoosh_oxipng_bg.wasm",
"squoosh_oxipng.js",
"squoosh_oxipng.d.ts"
],
"module": "oxipng.js",
"types": "oxipng.d.ts",
"module": "squoosh_oxipng.js",
"types": "squoosh_oxipng.d.ts",
"sideEffects": false
}

View File

@@ -0,0 +1,46 @@
/* tslint:disable */
/* eslint-disable */
/**
* @param {Uint8Array} data
* @param {number} level
* @returns {Uint8Array}
*/
export function optimise(data: Uint8Array, level: number): Uint8Array;
/**
* @param {number} num
* @returns {any}
*/
export function worker_initializer(num: number): any;
/**
*/
export function start_main_thread(): void;
/**
*/
export function start_worker_thread(): void;
export type InitInput = RequestInfo | URL | Response | BufferSource | WebAssembly.Module;
export interface InitOutput {
readonly malloc: (a: number) => number;
readonly free: (a: number) => void;
readonly optimise: (a: number, b: number, c: number, d: number) => void;
readonly worker_initializer: (a: number) => number;
readonly start_main_thread: () => void;
readonly start_worker_thread: () => void;
readonly __wbindgen_export_0: WebAssembly.Memory;
readonly __wbindgen_malloc: (a: number) => number;
readonly __wbindgen_free: (a: number, b: number) => void;
readonly __wbindgen_start: () => void;
}
/**
* If `module_or_path` is {RequestInfo} or {URL}, makes a request and
* for everything else, calls `WebAssembly.instantiate` directly.
*
* @param {InitInput | Promise<InitInput>} module_or_path
* @param {WebAssembly.Memory} maybe_memory
*
* @returns {Promise<InitOutput>}
*/
export default function init (module_or_path?: InitInput | Promise<InitInput>, maybe_memory: WebAssembly.Memory): Promise<InitOutput>;

View File

@@ -0,0 +1,180 @@
let wasm;
let memory;
const heap = new Array(32).fill(undefined);
heap.push(undefined, null, true, false);
let heap_next = heap.length;
function addHeapObject(obj) {
if (heap_next === heap.length) heap.push(heap.length + 1);
const idx = heap_next;
heap_next = heap[idx];
heap[idx] = obj;
return idx;
}
let cachedTextDecoder = new TextDecoder('utf-8', { ignoreBOM: true, fatal: true });
cachedTextDecoder.decode();
let cachegetUint8Memory0 = null;
function getUint8Memory0() {
if (cachegetUint8Memory0 === null || cachegetUint8Memory0.buffer !== wasm.__wbindgen_export_0.buffer) {
cachegetUint8Memory0 = new Uint8Array(wasm.__wbindgen_export_0.buffer);
}
return cachegetUint8Memory0;
}
function getStringFromWasm0(ptr, len) {
return cachedTextDecoder.decode(getUint8Memory0().slice(ptr, ptr + len));
}
let WASM_VECTOR_LEN = 0;
function passArray8ToWasm0(arg, malloc) {
const ptr = malloc(arg.length * 1);
getUint8Memory0().set(arg, ptr / 1);
WASM_VECTOR_LEN = arg.length;
return ptr;
}
let cachegetInt32Memory0 = null;
function getInt32Memory0() {
if (cachegetInt32Memory0 === null || cachegetInt32Memory0.buffer !== wasm.__wbindgen_export_0.buffer) {
cachegetInt32Memory0 = new Int32Array(wasm.__wbindgen_export_0.buffer);
}
return cachegetInt32Memory0;
}
function getArrayU8FromWasm0(ptr, len) {
return getUint8Memory0().subarray(ptr / 1, ptr / 1 + len);
}
/**
* @param {Uint8Array} data
* @param {number} level
* @returns {Uint8Array}
*/
export function optimise(data, level) {
try {
const retptr = wasm.__wbindgen_export_1.value - 16;
wasm.__wbindgen_export_1.value = retptr;
var ptr0 = passArray8ToWasm0(data, wasm.__wbindgen_malloc);
var len0 = WASM_VECTOR_LEN;
wasm.optimise(retptr, ptr0, len0, level);
var r0 = getInt32Memory0()[retptr / 4 + 0];
var r1 = getInt32Memory0()[retptr / 4 + 1];
var v1 = getArrayU8FromWasm0(r0, r1).slice();
wasm.__wbindgen_free(r0, r1 * 1);
return v1;
} finally {
wasm.__wbindgen_export_1.value += 16;
}
}
function getObject(idx) { return heap[idx]; }
function dropObject(idx) {
if (idx < 36) return;
heap[idx] = heap_next;
heap_next = idx;
}
function takeObject(idx) {
const ret = getObject(idx);
dropObject(idx);
return ret;
}
/**
* @param {number} num
* @returns {any}
*/
export function worker_initializer(num) {
var ret = wasm.worker_initializer(num);
return takeObject(ret);
}
/**
*/
export function start_main_thread() {
wasm.start_main_thread();
}
/**
*/
export function start_worker_thread() {
wasm.start_worker_thread();
}
async function load(module, imports, maybe_memory) {
if (typeof Response === 'function' && module instanceof Response) {
memory = imports.wbg.memory = new WebAssembly.Memory({initial:17,maximum:16384,shared:true});
if (typeof WebAssembly.instantiateStreaming === 'function') {
try {
return await WebAssembly.instantiateStreaming(module, imports);
} catch (e) {
if (module.headers.get('Content-Type') != 'application/wasm') {
console.warn("`WebAssembly.instantiateStreaming` failed because your server does not serve wasm with `application/wasm` MIME type. Falling back to `WebAssembly.instantiate` which is slower. Original error:\n", e);
} else {
throw e;
}
}
}
const bytes = await module.arrayBuffer();
return await WebAssembly.instantiate(bytes, imports);
} else {
memory = imports.wbg.memory = maybe_memory;
const instance = await WebAssembly.instantiate(module, imports);
if (instance instanceof WebAssembly.Instance) {
return { instance, module };
} else {
return instance;
}
}
}
async function init(input, maybe_memory) {
if (typeof input === 'undefined') {
input = import.meta.url.replace(/\.js$/, '_bg.wasm');
}
const imports = {};
imports.wbg = {};
imports.wbg.__wbindgen_module = function() {
var ret = init.__wbindgen_wasm_module;
return addHeapObject(ret);
};
imports.wbg.__wbindgen_memory = function() {
var ret = wasm.__wbindgen_export_0;
return addHeapObject(ret);
};
imports.wbg.__wbg_of_6510501edc06d65e = function(arg0, arg1) {
var ret = Array.of(takeObject(arg0), takeObject(arg1));
return addHeapObject(ret);
};
imports.wbg.__wbindgen_throw = function(arg0, arg1) {
throw new Error(getStringFromWasm0(arg0, arg1));
};
if (typeof input === 'string' || (typeof Request === 'function' && input instanceof Request) || (typeof URL === 'function' && input instanceof URL)) {
input = fetch(input);
}
const { instance, module } = await load(await input, imports, maybe_memory);
wasm = instance.exports;
init.__wbindgen_wasm_module = module;
wasm.__wbindgen_start();
return wasm;
}
export default init;

Binary file not shown.

View File

@@ -0,0 +1,12 @@
/* tslint:disable */
/* eslint-disable */
export function malloc(a: number): number;
export function free(a: number): void;
export function optimise(a: number, b: number, c: number, d: number): void;
export function worker_initializer(a: number): number;
export function start_main_thread(): void;
export function start_worker_thread(): void;
export const __wbindgen_export_0: WebAssembly.Memory;
export function __wbindgen_malloc(a: number): number;
export function __wbindgen_free(a: number, b: number): void;
export function __wbindgen_start(): void;

Binary file not shown.

View File

@@ -43,14 +43,20 @@ function getArrayU8FromWasm0(ptr, len) {
* @returns {Uint8Array}
*/
export function optimise(data, level) {
try {
const retptr = wasm.__wbindgen_export_0.value - 16;
wasm.__wbindgen_export_0.value = retptr;
var ptr0 = passArray8ToWasm0(data, wasm.__wbindgen_malloc);
var len0 = WASM_VECTOR_LEN;
wasm.optimise(8, ptr0, len0, level);
var r0 = getInt32Memory0()[8 / 4 + 0];
var r1 = getInt32Memory0()[8 / 4 + 1];
wasm.optimise(retptr, ptr0, len0, level);
var r0 = getInt32Memory0()[retptr / 4 + 0];
var r1 = getInt32Memory0()[retptr / 4 + 1];
var v1 = getArrayU8FromWasm0(r0, r1).slice();
wasm.__wbindgen_free(r0, r1 * 1);
return v1;
} finally {
wasm.__wbindgen_export_0.value += 16;
}
}
async function load(module, imports) {

View File

@@ -1,4 +1,5 @@
use wasm_bindgen::prelude::*;
use oxipng::AlphaOptim;
mod malloc_shim;

View File

@@ -1,4 +1,4 @@
use crossbeam_deque::Injector;
use crossbeam_channel::{Sender, Receiver, bounded};
use once_cell::sync::OnceCell;
use wasm_bindgen::prelude::*;
use wasm_bindgen::JsValue;
@@ -9,31 +9,27 @@ extern "C" {
fn array_of_2(a: JsValue, b: JsValue) -> JsValue;
}
static TASKS: OnceCell<Injector<rayon::ThreadBuilder>> = OnceCell::new();
static CHANNEL: OnceCell<(Sender<rayon::ThreadBuilder>, Receiver<rayon::ThreadBuilder>)> = OnceCell::new();
#[wasm_bindgen]
pub fn worker_initializer() -> JsValue {
TASKS.get_or_init(Injector::new);
pub fn worker_initializer(num: usize) -> JsValue {
CHANNEL.get_or_init(|| bounded(num));
array_of_2(wasm_bindgen::module(), wasm_bindgen::memory())
}
#[wasm_bindgen]
pub fn start_main_thread(num: usize) {
let tasks = TASKS.get().unwrap();
pub fn start_main_thread() {
let (sender, _) = CHANNEL.get().unwrap();
rayon::ThreadPoolBuilder::new()
.num_threads(num)
.spawn_handler(|thread| Ok(tasks.push(thread)))
.num_threads(sender.capacity().unwrap())
.spawn_handler(|thread| Ok(sender.send(thread).unwrap_throw()))
.build_global()
.unwrap_throw()
}
#[wasm_bindgen]
pub fn start_worker_thread() {
let tasks = TASKS.get().unwrap();
loop {
if let crossbeam_deque::Steal::Success(task) = tasks.steal() {
return task.run();
}
}
let (_, receiver) = CHANNEL.get().unwrap();
receiver.recv().unwrap_throw().run()
}