Skip to content

Commit

Permalink
fix(sandbox): use updateProgress method name
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf authored Jul 15, 2021
1 parent 7299935 commit 27d62c3
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 33 deletions.
51 changes: 29 additions & 22 deletions src/classes/master.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,9 @@ process.on('message', msg => {
processor = promisify(processor);
} else {
const origProcessor = processor;
processor = function() {
processor = function(...args: any[]) {
try {
return Promise.resolve(origProcessor.apply(null, arguments));
return Promise.resolve(origProcessor(...args));
} catch (err) {
return Promise.reject(err);
}
Expand Down Expand Up @@ -92,12 +92,9 @@ process.on('message', msg => {
value: result,
});
} catch (err) {
if (!err.message) {
err = new Error(err);
}
await processSendAsync({
cmd: 'failed',
value: err,
value: !err.message ? new Error(err) : err,
});
} finally {
status = 'IDLE';
Expand Down Expand Up @@ -133,6 +130,27 @@ process.on('uncaughtException', err => {
function wrapJob(job: JobJson): SandboxedJob {
let progressValue = job.progress;

const updateProgress = (progress: number | object) => {
// Locally store reference to new progress value
// so that we can return it from this process synchronously.
progressValue = progress;
// Send message to update job progress.
process.send({
cmd: 'progress',
value: progress,
});
return Promise.resolve();
};

const progress = (progress?: number | object) => {
if (progress) {
return updateProgress(progress);
} else {
// Return the last known progress value.
return progressValue;
}
};

return {
...job,
data: JSON.parse(job.data || '{}'),
Expand All @@ -143,22 +161,11 @@ function wrapJob(job: JobJson): SandboxedJob {
* If no argument is given, it behaves as a sync getter.
* If an argument is given, it behaves as an async setter.
*/
progress: (progress?: any) => {
if (progress) {
// Locally store reference to new progress value
// so that we can return it from this process synchronously.
progressValue = progress;
// Send message to update job progress.
process.send({
cmd: 'progress',
value: progress,
});
return Promise.resolve();
} else {
// Return the last known progress value.
return progressValue;
}
},
progress,
/*
* Emulate the real job `updateProgress` function, should works as `progress` function.
*/
updateProgress,
/*
* Emulate the real job `log` function.
*/
Expand Down
3 changes: 2 additions & 1 deletion src/interfaces/sandboxed-job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@ import { JobsOptions } from './jobs-options';
* @see {@link https://docs.bullmq.io/guide/workers/sandboxed-processors}
*/
export interface SandboxedJob<T = any, R = any>
extends Omit<JobJson, 'data' | 'opts' | 'progress' | 'log' | 'returnValue'> {
extends Omit<JobJson, 'data' | 'opts' | 'progress' | 'returnValue'> {
data: T;
opts: JobsOptions;
progress:
| (() => object | number)
| ((value: object | number) => Promise<void>);
updateProgress: (value: object | number) => Promise<void>;
log: (row: any) => void;
returnValue: R;
}
4 changes: 2 additions & 2 deletions src/test/fixtures/fixture_processor_progress.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ const delay = require('./delay');
module.exports = function(job) {
return delay(50)
.then(() => {
job.progress(10);
job.updateProgress(10);
job.log(job.progress());
return delay(100);
})
Expand All @@ -19,7 +19,7 @@ module.exports = function(job) {
return delay(150);
})
.then(() => {
job.progress(78);
job.updateProgress(78);
job.log(job.progress());
return delay(100);
})
Expand Down
16 changes: 8 additions & 8 deletions src/test/test_sandboxed_process.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ describe('sandboxed process', () => {
drainDelay: 1,
});

const completting = new Promise((resolve, reject) => {
const completing = new Promise((resolve, reject) => {
worker.on('completed', async (job, value) => {
try {
expect(job.data).to.be.eql({ foo: 'bar' });
Expand All @@ -53,7 +53,7 @@ describe('sandboxed process', () => {

await queue.add('test', { foo: 'bar' });

await completting;
await completing;

await worker.close();
});
Expand All @@ -64,7 +64,7 @@ describe('sandboxed process', () => {
drainDelay: 1,
});

const completting = new Promise((resolve, reject) => {
const completing = new Promise((resolve, reject) => {
worker.on('completed', async (job, value) => {
try {
expect(job.data).to.be.eql({ foo: 'bar' });
Expand All @@ -82,7 +82,7 @@ describe('sandboxed process', () => {

await queue.add('foobar', { foo: 'bar' });

await completting;
await completing;
});

it('should process with concurrent processors', async function() {
Expand Down Expand Up @@ -142,7 +142,7 @@ describe('sandboxed process', () => {
queue.add('4', { foo: 'bar4' }),
]);

const completting = new Promise((resolve, reject) => {
const completing = new Promise((resolve, reject) => {
const after4 = after(4, async () => {
expect(worker['childPool'].getAllFree().length).to.eql(1);
await worker.close();
Expand All @@ -164,7 +164,7 @@ describe('sandboxed process', () => {
});
});

await completting;
await completing;
});

it('should process and update progress', async () => {
Expand Down Expand Up @@ -319,7 +319,7 @@ describe('sandboxed process', () => {
drainDelay: 1,
});

const completting = new Promise((resolve, reject) => {
const completing = new Promise((resolve, reject) => {
worker.on('completed', async () => {
try {
expect(Object.keys(worker['childPool'].retained)).to.have.lengthOf(0);
Expand All @@ -336,7 +336,7 @@ describe('sandboxed process', () => {

await queue.add('test', { foo: 'bar' });

await completting;
await completing;

await worker.close();
});
Expand Down

0 comments on commit 27d62c3

Please sign in to comment.