Skip to content

Commit

Permalink
Merge pull request #4312 from coralproject/wyattjoh/promise-worker-th…
Browse files Browse the repository at this point in the history
…reads

Update workers to use promise based callbacks instead of timeouts
  • Loading branch information
tessalt authored Aug 21, 2023
2 parents b177389 + f77828a commit 9e31852
Showing 1 changed file with 39 additions and 63 deletions.
102 changes: 39 additions & 63 deletions src/core/server/services/comments/pipeline/phases/wordList/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,25 @@ import {
const WORKER_SCRIPT =
"./dist/core/server/services/comments/pipeline/phases/wordList/worker.js";

interface PromiseCallbacks<R> {
resolve: (result: R) => void;
reject: (err: Error) => void;
}

export class WordListService {
private worker: Worker;

private onMessageDelegate: (event: MessageEvent) => void;
private results: Map<string, WordListWorkerResult>;
private readonly callbacks: Map<
string,
PromiseCallbacks<WordListWorkerResult>
> = new Map();
private logger: Logger;
private sanitizer: Sanitize;

constructor(logger: Logger, numWorkers = 3) {
constructor(logger: Logger) {
this.logger = logger;

this.results = new Map<string, WordListWorkerResult>();
this.onMessageDelegate = this.onMessage.bind(this);

this.worker = new Worker(WORKER_SCRIPT);
Expand All @@ -52,20 +59,37 @@ export class WordListService {
});
}

private sleep(ms: number) {
return new Promise<void>((resolve) => {
setTimeout(() => {
resolve();
}, ms);
});
}

private onMessage(result: WordListWorkerResult) {
if (!result) {
return;
}

this.results.set(result.id, result);
// Get the callbacks for this result.
const callbacks = this.callbacks.get(result.id);
if (!callbacks) {
throw new Error(`Invalid result id: ${result.id}`);
}

// Delete the callbacks for this result.
this.callbacks.delete(result.id);

// Resolve the promise.
if (result.ok) {
callbacks.resolve(result);
} else {
callbacks.reject(result.err!);
}
}

private send(message: WordListWorkerMessage) {
// Create a new promise to wait for the worker to finish.
const promise = new Promise<WordListWorkerResult>((resolve, reject) => {
this.callbacks.set(message.id, { resolve, reject });
});

this.worker.postMessage(message);

return promise;
}

public async initialize(
Expand All @@ -87,31 +111,7 @@ export class WordListService {
data,
};

const builder = async () => {
let hasResult = this.results.has(message.id);
while (!hasResult) {
await this.sleep(1);
hasResult = this.results.has(message.id);
}

const result = this.results.get(message.id);
if (!result) {
this.results.delete(message.id);
return {
id: message.id,
tenantID,
ok: false,
err: new Error("result was undefined"),
};
}

this.results.delete(message.id);
return result;
};

this.worker.postMessage(message);
const result = await builder();

const result = await this.send(message);
if (!result.ok || result.err) {
this.logger.error(
{ tenantID: result.tenantID, id: result.id },
Expand Down Expand Up @@ -143,32 +143,8 @@ export class WordListService {
data,
};

this.worker.postMessage(message);

const builder = async () => {
let hasResult = this.results.has(message.id);
while (!hasResult) {
await this.sleep(1);
hasResult = this.results.has(message.id);
}

const result = this.results.get(message.id);
if (!result) {
this.results.delete(message.id);
return {
id: message.id,
tenantID,
ok: false,
err: new Error("result was undefined"),
};
}

this.results.delete(message.id);
return result;
};

const result = await builder();

// Send the message to the worker.
const result = await this.send(message);
if (!result.ok || result.err) {
return {
isMatched: false,
Expand Down

0 comments on commit 9e31852

Please sign in to comment.