Parallel OxiPNG improvements

- Refactor to work around Chromium's issue with postMessage queuing. https://bugs.chromium.org/p/chromium/issues/detail?id=1075645
 - Convert codec code to TypeScript.
 - Make separate parallel and non-parallel builds.
 - Switch to nightly Rust for OxiPNG to allow parallel builds (but also reuse it for regular builds to avoid installing two toolchains).
This commit is contained in:
Ingvar Stepanyan
2020-04-29 12:29:20 +01:00
parent b7ef7f92be
commit 9c60d3286e
15 changed files with 129 additions and 190 deletions

View File

@@ -12,13 +12,16 @@ crate-type = ["cdylib"]
oxipng = { version = "3.0.0", default-features = false, features = ["parallel"] }
wasm-bindgen = "0.2.64"
log = { version = "0.4", features = ["release_max_level_off"] }
rayon = "1.3.0"
crossbeam-deque = "0.7.3"
once_cell = "1.3.1"
rayon = { version = "1.3.0", optional = true }
crossbeam-deque = { version = "0.7.3", optional = true }
once_cell = { version = "1.3.1", optional = true }
[profile.release]
lto = true
opt-level = "s"
[features]
parallel = ["oxipng/parallel", "rayon", "crossbeam-deque", "once_cell"]
[package.metadata.wasm-pack.profile.release]
wasm-opt = ["-O", "--no-validation"]

View File

@@ -1,4 +1,4 @@
FROM rust
FROM rustlang/rust:nightly
RUN rustup target add wasm32-unknown-unknown
RUN curl https://rustwasm.github.io/wasm-pack/installer/init.sh -sSf | sh

View File

@@ -6,8 +6,10 @@ echo "============================================="
echo "Compiling wasm"
echo "============================================="
(
rm -rf pkg
CC=/opt/wasi-sdk/bin/clang RUSTFLAGS='-C target-feature=+atomics,+bulk-memory' rustup run nightly wasm-pack build -t web -- -Z build-std=panic_abort,std
export CC=/opt/wasi-sdk/bin/clang
rm -rf pkg,{-parallel}
wasm-pack build -t web
RUSTFLAGS='-C target-feature=+atomics,+bulk-memory' wasm-pack build -t web -d pkg-parallel -- -Z build-std=panic_abort,std --features=parallel
rm pkg/.gitignore
)
echo "============================================="

Binary file not shown.

View File

@@ -1,36 +1,21 @@
/* tslint:disable */
/* eslint-disable */
/**
* @returns {any}
*/
export function worker_initializer(): any;
/**
* @param {number} num
*/
export function start_main_thread(num: number): void;
/**
*/
export function start_worker_thread(): void;
/**
* @param {Uint8Array} data
* @param {number} level
* @returns {Uint8Array}
* @param {Uint8Array} data
* @param {number} level
* @returns {Uint8Array}
*/
export function optimise(data: Uint8Array, level: number): Uint8Array;
export type InitInput = RequestInfo | URL | Response | BufferSource | WebAssembly.Module;
export interface InitOutput {
readonly worker_initializer: () => number;
readonly start_main_thread: (a: number) => void;
readonly start_worker_thread: () => void;
readonly memory: WebAssembly.Memory;
readonly optimise: (a: number, b: number, c: number, d: number) => void;
readonly malloc: (a: number) => number;
readonly free: (a: number) => void;
readonly __wbindgen_export_0: WebAssembly.Memory;
readonly __wbindgen_malloc: (a: number) => number;
readonly __wbindgen_free: (a: number, b: number) => void;
readonly __wbindgen_start: () => void;
}
/**
@@ -38,9 +23,8 @@ export interface InitOutput {
* for everything else, calls `WebAssembly.instantiate` directly.
*
* @param {InitInput | Promise<InitInput>} module_or_path
* @param {WebAssembly.Memory} maybe_memory
*
* @returns {Promise<InitOutput>}
*/
export default function init (module_or_path?: InitInput | Promise<InitInput>, maybe_memory: WebAssembly.Memory): Promise<InitOutput>;
export default function init (module_or_path?: InitInput | Promise<InitInput>): Promise<InitOutput>;

View File

@@ -1,21 +1,5 @@
let wasm;
let memory;
const heap = new Array(32).fill(undefined);
heap.push(undefined, null, true, false);
let heap_next = heap.length;
function addHeapObject(obj) {
if (heap_next === heap.length) heap.push(heap.length + 1);
const idx = heap_next;
heap_next = heap[idx];
heap[idx] = obj;
return idx;
}
let cachedTextDecoder = new TextDecoder('utf-8', { ignoreBOM: true, fatal: true });
@@ -23,48 +7,14 @@ cachedTextDecoder.decode();
let cachegetUint8Memory0 = null;
function getUint8Memory0() {
if (cachegetUint8Memory0 === null || cachegetUint8Memory0.buffer !== wasm.__wbindgen_export_0.buffer) {
cachegetUint8Memory0 = new Uint8Array(wasm.__wbindgen_export_0.buffer);
if (cachegetUint8Memory0 === null || cachegetUint8Memory0.buffer !== wasm.memory.buffer) {
cachegetUint8Memory0 = new Uint8Array(wasm.memory.buffer);
}
return cachegetUint8Memory0;
}
function getStringFromWasm0(ptr, len) {
return cachedTextDecoder.decode(getUint8Memory0().slice(ptr, ptr + len));
}
function getObject(idx) { return heap[idx]; }
function dropObject(idx) {
if (idx < 36) return;
heap[idx] = heap_next;
heap_next = idx;
}
function takeObject(idx) {
const ret = getObject(idx);
dropObject(idx);
return ret;
}
/**
* @returns {any}
*/
export function worker_initializer() {
var ret = wasm.worker_initializer();
return takeObject(ret);
}
/**
* @param {number} num
*/
export function start_main_thread(num) {
wasm.start_main_thread(num);
}
/**
*/
export function start_worker_thread() {
wasm.start_worker_thread();
return cachedTextDecoder.decode(getUint8Memory0().subarray(ptr, ptr + len));
}
let WASM_VECTOR_LEN = 0;
@@ -78,8 +28,8 @@ function passArray8ToWasm0(arg, malloc) {
let cachegetInt32Memory0 = null;
function getInt32Memory0() {
if (cachegetInt32Memory0 === null || cachegetInt32Memory0.buffer !== wasm.__wbindgen_export_0.buffer) {
cachegetInt32Memory0 = new Int32Array(wasm.__wbindgen_export_0.buffer);
if (cachegetInt32Memory0 === null || cachegetInt32Memory0.buffer !== wasm.memory.buffer) {
cachegetInt32Memory0 = new Int32Array(wasm.memory.buffer);
}
return cachegetInt32Memory0;
}
@@ -93,25 +43,19 @@ function getArrayU8FromWasm0(ptr, len) {
* @returns {Uint8Array}
*/
export function optimise(data, level) {
try {
const retptr = wasm.__wbindgen_export_1.value - 16;
wasm.__wbindgen_export_1.value = retptr;
var ptr0 = passArray8ToWasm0(data, wasm.__wbindgen_malloc);
var len0 = WASM_VECTOR_LEN;
wasm.optimise(retptr, ptr0, len0, level);
var r0 = getInt32Memory0()[retptr / 4 + 0];
var r1 = getInt32Memory0()[retptr / 4 + 1];
var v1 = getArrayU8FromWasm0(r0, r1).slice();
wasm.__wbindgen_free(r0, r1 * 1);
return v1;
} finally {
wasm.__wbindgen_export_1.value += 16;
}
var ptr0 = passArray8ToWasm0(data, wasm.__wbindgen_malloc);
var len0 = WASM_VECTOR_LEN;
wasm.optimise(8, ptr0, len0, level);
var r0 = getInt32Memory0()[8 / 4 + 0];
var r1 = getInt32Memory0()[8 / 4 + 1];
var v1 = getArrayU8FromWasm0(r0, r1).slice();
wasm.__wbindgen_free(r0, r1 * 1);
return v1;
}
async function load(module, imports, maybe_memory) {
async function load(module, imports) {
if (typeof Response === 'function' && module instanceof Response) {
memory = imports.wbg.memory = new WebAssembly.Memory({initial:17,maximum:16384,shared:true});
if (typeof WebAssembly.instantiateStreaming === 'function') {
try {
return await WebAssembly.instantiateStreaming(module, imports);
@@ -130,7 +74,7 @@ async function load(module, imports, maybe_memory) {
return await WebAssembly.instantiate(bytes, imports);
} else {
memory = imports.wbg.memory = maybe_memory;
const instance = await WebAssembly.instantiate(module, imports);
if (instance instanceof WebAssembly.Instance) {
@@ -142,24 +86,12 @@ async function load(module, imports, maybe_memory) {
}
}
async function init(input, maybe_memory) {
async function init(input) {
if (typeof input === 'undefined') {
input = import.meta.url.replace(/\.js$/, '_bg.wasm');
}
const imports = {};
imports.wbg = {};
imports.wbg.__wbindgen_module = function() {
var ret = init.__wbindgen_wasm_module;
return addHeapObject(ret);
};
imports.wbg.__wbindgen_memory = function() {
var ret = wasm.__wbindgen_export_0;
return addHeapObject(ret);
};
imports.wbg.__wbg_of_6510501edc06d65e = function(arg0, arg1) {
var ret = Array.of(takeObject(arg0), takeObject(arg1));
return addHeapObject(ret);
};
imports.wbg.__wbindgen_throw = function(arg0, arg1) {
throw new Error(getStringFromWasm0(arg0, arg1));
};
@@ -168,11 +100,11 @@ async function init(input, maybe_memory) {
input = fetch(input);
}
const { instance, module } = await load(await input, imports, maybe_memory);
const { instance, module } = await load(await input, imports);
wasm = instance.exports;
init.__wbindgen_wasm_module = module;
wasm.__wbindgen_start();
return wasm;
}

View File

@@ -1,12 +1,8 @@
/* tslint:disable */
/* eslint-disable */
export function worker_initializer(): number;
export function start_main_thread(a: number): void;
export function start_worker_thread(): void;
export const memory: WebAssembly.Memory;
export function optimise(a: number, b: number, c: number, d: number): void;
export function malloc(a: number): number;
export function free(a: number): void;
export const __wbindgen_export_0: WebAssembly.Memory;
export function __wbindgen_malloc(a: number): number;
export function __wbindgen_free(a: number, b: number): void;
export function __wbindgen_start(): void;

View File

@@ -0,0 +1 @@
nightly

View File

@@ -1,23 +0,0 @@
import initOxiPNG, { worker_initializer, start_main_thread, optimise } from './pkg/oxipng.js';
import wasmUrl from "./pkg/oxipng_bg.wasm";
async function startMainThread() {
let num = navigator.hardwareConcurrency;
await initOxiPNG(fetch(wasmUrl));
let workerInit = worker_initializer();
let workers = [];
for (let i = 0; i < num; i++) {
workers.push(new Promise(resolve => {
let worker = new Worker("./worker.js", { type: "module" });
worker.postMessage(workerInit);
worker.addEventListener('message', resolve, { once: true });
}));
}
await Promise.all(workers);
start_main_thread(num);
return {
optimise
};
}
export default startMainThread();

28
codecs/oxipng/spawn.ts Normal file
View File

@@ -0,0 +1,28 @@
import initOxiPNG, {
worker_initializer,
start_main_thread,
optimise,
} from './pkg-parallel';
import wasmUrl from './pkg-parallel/oxipng_bg.wasm';
import type { WorkerInit } from './worker';
function initWorker(worker: Worker, workerInit: WorkerInit) {
return new Promise((resolve) => {
worker.postMessage(workerInit);
worker.addEventListener('message', () => resolve(), { once: true });
});
}
async function startMainThread() {
const num = navigator.hardwareConcurrency;
const workers = Array.from({ length: num }, () => new Worker('./worker', { type: 'module' }));
await initOxiPNG(fetch(wasmUrl), undefined as any);
const workerInit: WorkerInit = worker_initializer();
await Promise.all(workers.map(worker => initWorker(worker, workerInit)));
start_main_thread(num);
return {
optimise,
};
}
export default startMainThread();

View File

@@ -1,47 +1,11 @@
use wasm_bindgen::prelude::*;
mod malloc_shim;
use crossbeam_deque::Injector;
use once_cell::sync::OnceCell;
use wasm_bindgen::prelude::*;
use wasm_bindgen::JsValue;
use oxipng::AlphaOptim;
#[cfg(feature = "parallel")]
pub mod parallel;
#[wasm_bindgen]
extern "C" {
#[wasm_bindgen(js_namespace = Array, js_name = of)]
fn array_of_2(a: JsValue, b: JsValue) -> JsValue;
}
static TASKS: OnceCell<Injector<rayon::ThreadBuilder>> = OnceCell::new();
#[wasm_bindgen]
pub fn worker_initializer() -> JsValue {
TASKS.get_or_init(Injector::new);
array_of_2(wasm_bindgen::module(), wasm_bindgen::memory())
}
#[wasm_bindgen]
pub fn start_main_thread(num: usize) {
let tasks = TASKS.get().unwrap();
rayon::ThreadPoolBuilder::new()
.num_threads(num)
.spawn_handler(|thread| Ok(tasks.push(thread)))
.build_global()
.unwrap_throw()
}
#[wasm_bindgen]
pub fn start_worker_thread() {
let tasks = TASKS.get().unwrap();
loop {
if let crossbeam_deque::Steal::Success(task) = tasks.steal() {
return task.run();
}
}
}
#[wasm_bindgen(catch)]
pub fn optimise(data: &[u8], level: u8) -> Vec<u8> {
let mut options = oxipng::Options::from_preset(level);
options.alphas.insert(AlphaOptim::Black);

View File

@@ -0,0 +1,39 @@
use crossbeam_deque::Injector;
use once_cell::sync::OnceCell;
use wasm_bindgen::prelude::*;
use wasm_bindgen::JsValue;
#[wasm_bindgen]
extern "C" {
#[wasm_bindgen(js_namespace = Array, js_name = of)]
fn array_of_2(a: JsValue, b: JsValue) -> JsValue;
}
static TASKS: OnceCell<Injector<rayon::ThreadBuilder>> = OnceCell::new();
#[wasm_bindgen]
pub fn worker_initializer() -> JsValue {
TASKS.get_or_init(Injector::new);
array_of_2(wasm_bindgen::module(), wasm_bindgen::memory())
}
#[wasm_bindgen]
pub fn start_main_thread(num: usize) {
let tasks = TASKS.get().unwrap();
rayon::ThreadPoolBuilder::new()
.num_threads(num)
.spawn_handler(|thread| Ok(tasks.push(thread)))
.build_global()
.unwrap_throw()
}
#[wasm_bindgen]
pub fn start_worker_thread() {
let tasks = TASKS.get().unwrap();
loop {
if let crossbeam_deque::Steal::Success(task) = tasks.steal() {
return task.run();
}
}
}

View File

@@ -1,7 +0,0 @@
import initOxiPNG, { start_worker_thread } from './pkg/squoosh_oxipng.js';
addEventListener('message', async ({ data: [module, memory] }) => {
postMessage(null);
await initOxiPNG(module, memory);
start_worker_thread();
}, { once: true });

20
codecs/oxipng/worker.ts Normal file
View File

@@ -0,0 +1,20 @@
/// <reference lib="webworker" />
import initOxiPNG, { start_worker_thread } from './pkg-parallel';
export type WorkerInit = [WebAssembly.Module, WebAssembly.Memory];
addEventListener(
'message',
async (event) => {
// Tell the "main" thread that we've received the message.
//
// At this point, the "main" thread can run Wasm that
// will synchronously block waiting on other atomics.
postMessage(null);
await initOxiPNG(...(event.data as WorkerInit));
start_worker_thread();
},
{ once: true },
);

View File

@@ -1,5 +1,5 @@
// @ts-ignore
import optimiser from '../../../codecs/oxipng/spawn.js';
import optimiser from '../../../codecs/oxipng/spawn';
import { EncodeOptions } from './encoder-meta';
export async function compress(data: ArrayBuffer, options: EncodeOptions): Promise<ArrayBuffer> {