Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions packages/transformers/docs/source/_toctree.yml
Original file line number Diff line number Diff line change
Expand Up @@ -84,4 +84,13 @@
title: Data Structures
title: Utilities
isExpanded: false
- sections:
- local: api/worker
title: Overview
- local: api/worker/worker_pipeline
title: Worker Pipeline
- local: api/worker/worker_pipeline_handler
title: Worker Pipeline Handler
title: Web Workers
isExpanded: false
title: API Reference
3 changes: 3 additions & 0 deletions packages/transformers/src/transformers.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ export * from './models/auto/processing_auto.js';
// Configs
export { PretrainedConfig, AutoConfig } from './configs.js';

// Worker
export * from './worker/worker.js';

// Additional exports
export * from './generation/streamers.js';
export * from './generation/stopping_criteria.js';
Expand Down
28 changes: 28 additions & 0 deletions packages/transformers/src/worker/constants.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/**
* @file Constants for worker pipeline communication.
* @module worker/constants
*/

/**
* Message type for transformer pipeline operations in web workers.
* @constant {string}
*/
export const REQUEST_MESSAGE_TYPE = 'transformersjs_worker_pipeline';

/**
* Message type for invoking callbacks from worker to main thread.
* @constant {string}
*/
export const RESPONSE_MESSAGE_TYPE_INVOKE_CALLBACK = 'transformersjs_worker_invokeCallback';

/**
* Message type for pipeline ready notification.
* @constant {string}
*/
export const RESPONSE_MESSAGE_TYPE_READY = 'transformersjs_worker_ready';

/**
* Message type for pipeline result.
* @constant {string}
*/
export const RESPONSE_MESSAGE_TYPE_RESULT = 'transformersjs_worker_result';
7 changes: 7 additions & 0 deletions packages/transformers/src/worker/worker.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
/**
* @file Web Worker utilities for running pipelines in worker threads.
* @module worker
*/

export { worker_pipeline } from './worker_pipeline.js';
export { worker_pipeline_handler } from './worker_pipeline_handler.js';
120 changes: 120 additions & 0 deletions packages/transformers/src/worker/worker_pipeline.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/**
* @file Web Worker pipeline wrapper for executing pipelines in a worker thread.
* @module worker/worker_pipeline
*/

import {
REQUEST_MESSAGE_TYPE,
RESPONSE_MESSAGE_TYPE_INVOKE_CALLBACK,
RESPONSE_MESSAGE_TYPE_RESULT,
} from './constants.js';

/**
* @typedef {import('../pipelines.js').PipelineType} PipelineType
*/

/**
* Creates a pipeline that runs in a Web Worker.
* @param {Worker} worker - The Web Worker instance to use for pipeline execution
* @param {PipelineType} task - The pipeline task type
* @param {string} model_id - The model identifier to load
* @param {Record<string, any>} [options={}] - Options for pipeline initialization
* @returns {Promise<Function>} A function that executes the pipeline and returns a Promise with the result
*/
export const worker_pipeline = (worker, task, model_id, options = {}) =>
new Promise((resolve, reject) => {
/**
* Map storing callback functions by their ID.
* @type {Map<string, Function>}
*/
const callback_map = new Map();

/**
* Map storing promise resolvers/rejecters for each message.
* @type {Map<number | "init", { resolve: Function; reject: Function }>}
*/
const messages_resolvers_map = new Map();

/**
* Counter for generating unique message IDs.
* @type {number}
*/
let message_id_counter = 0;

/**
* Serializes options, converting functions to references that can be invoked via postMessage.
* @param {Record<string, any>} options - The options object to serialize
* @returns {Record<string, any>} The serialized options object
*/
const serialize_options = (options) => {
const out = {};
Object.entries(options ?? {}).forEach(([key, value]) => {
if (typeof value === 'function') {
const function_id = `cb_${key}`;
callback_map.set(function_id, value);
out[key] = { __fn: true, functionId: function_id };
} else {
out[key] = value;
}
});
return out;
};

/**
* Message event handler for processing worker responses.
* @param {MessageEvent} e - The message event from the worker
*/
worker.onmessage = (e) => {
const msg = e.data;

// Handle callback invocations from the worker
if (msg?.type === RESPONSE_MESSAGE_TYPE_INVOKE_CALLBACK) {
const { functionId, args } = msg;
const fn = callback_map.get(functionId);
if (fn) {
fn(...args);
}
}

// Handle result messages
if (msg?.type === RESPONSE_MESSAGE_TYPE_RESULT) {
if (msg?.id === 'init') {
// Initial setup complete - resolve with the pipeline function
resolve((data, pipe_options) => {
return new Promise((resolve, reject) => {
const id = message_id_counter++;
messages_resolvers_map.set(id, { resolve, reject });
worker.postMessage({
id,
type: REQUEST_MESSAGE_TYPE,
data,
task,
model_id,
options: options ? serialize_options(options) : {},
pipe_options,
});
});
});
} else {
// Regular pipeline execution result
const resolver = messages_resolvers_map.get(msg.id);
if (resolver) {
if (msg.error) resolver.reject(msg.error);
else resolver.resolve(msg.result);
messages_resolvers_map.delete(msg.id);
}
}
}
};

// Initialize the pipeline in the worker
messages_resolvers_map.set('init', { resolve, reject });
worker.postMessage({
id: 'init',
type: REQUEST_MESSAGE_TYPE,
data: null,
task: task ?? '',
model_id: model_id ?? '',
options: options ? serialize_options(options) : {},
});
});
74 changes: 74 additions & 0 deletions packages/transformers/src/worker/worker_pipeline_handler.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/**
* @file Web Worker pipeline handler for managing transformer pipelines in a worker context.
* @module worker/worker_pipeline_handler
*/

import { pipeline } from '../pipelines.js';
import {
REQUEST_MESSAGE_TYPE,
RESPONSE_MESSAGE_TYPE_INVOKE_CALLBACK,
RESPONSE_MESSAGE_TYPE_READY,
RESPONSE_MESSAGE_TYPE_RESULT,
} from './constants.js';

/**
* Cache for storing initialized pipelines by their configuration.
* @type {Map<string, any>}
*/
const pipelines = new Map();

/**
* Creates a web worker pipeline handler that manages pipeline creation and execution.
* @returns {{onmessage: (event: MessageEvent) => Promise<void>}} Handler object with onmessage method
*/
export const worker_pipeline_handler = () => {
/**
* Converts serialized options back to their original form.
* Handles special cases like callback functions that were serialized with __fn marker.
* @param {Record<string, any>} options - The options object to unserialize
* @returns {Record<string, any>} The unserialized options object
*/
const unserialize_options = (options) => {
const out = {};
Object.entries(options ?? {}).forEach(([key, value]) => {
if (typeof value === 'object' && value && '__fn' in value && value.__fn) {
out[key] = (...args) =>
self.postMessage({
type: RESPONSE_MESSAGE_TYPE_INVOKE_CALLBACK,
function_id: 'functionId' in value ? value.functionId : null,
args,
});
} else {
out[key] = value;
}
});
return out;
};

return {
/**
* Message event handler for processing pipeline requests.
* @param {MessageEvent} event - The message event from the main thread
* @returns {Promise<void>}
*/
onmessage: async (event) => {
if (!event?.data || event.data?.type !== REQUEST_MESSAGE_TYPE) return;

const { id, data, task, model_id, options, pipe_options = {} } = event.data;

const key = JSON.stringify({ task, model_id, options });
let pipe = pipelines.get(key);

if (!pipe) {
pipe = await pipeline(task, model_id, unserialize_options(options));
pipelines.set(key, pipe);
}

self.postMessage({ id, type: RESPONSE_MESSAGE_TYPE_READY });

const result = data ? await pipe(data, pipe_options) : null;

self.postMessage({ id, type: RESPONSE_MESSAGE_TYPE_RESULT, result });
},
};
};
Loading