Skip to content

Add support for async thread functions #58

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 2, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@

## Next release

- Job class now emits 'progress' events
[#56](https://github.com/andywer/threads.js/pull/56)
- Credits to https://github.com/mmcardle
- Support for async thread functions
- Job class now emits 'progress' events [#56](https://github.com/andywer/threads.js/pull/56), credits to https://github.com/mmcardle

## 0.7.3

Expand Down
23 changes: 23 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,29 @@ module.exports = function(input, done) {
};
```

### Async functions

You can also pass async functions, a.k.a. functions returning a Promise, to spawn threads.

```javascript
const spawn = require('threads').spawn;

const thread = spawn(function ([a, b]) {
// Remember that this function will be run in another execution context.
return new Promise(resolve => {
setTimeout(() => resolve(a + b), 1000)
})
});

thread
.send([ 9, 12 ])
// The handlers come here: (none of them is mandatory)
.on('message', function(response) {
console.log('9 + 12 = ', response);
thread.kill();
});
```


### Thread Pool

Expand Down
21 changes: 20 additions & 1 deletion src/worker.browser/slave.js.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,16 @@ function handlerProgress(progress) {
this.postMessage({ progress : progress });
}

function handlerError(error) {
// Need to clone error manually to avoid DataCloneError, since errors cannot be send
var cloned = {
message: error.message,
name: error.name,
stack: error.stack
};
this.postMessage({ error : cloned });
}

function handlerDoneTransfer() {
var args = Array.prototype.slice.call(arguments);
var lastArg = args.pop();
Expand All @@ -27,6 +37,10 @@ function handlerDoneTransfer() {
this.postMessage({ response : args }, lastArg);
}

function isPromise (thing) {
return thing && typeof thing.then === 'function';
}

self.onmessage = function (event) {
var scripts = event.data.scripts;
if (scripts && scripts.length > 0 && typeof importScripts !== 'function') {
Expand All @@ -48,13 +62,18 @@ self.onmessage = function (event) {

if (event.data.doRun) {
var handler = this.module.exports;

if (typeof handler !== 'function') {
throw new Error('Cannot run thread logic. No handler has been exported.');
}

var preparedHandlerDone = handlerDone.bind(this);
preparedHandlerDone.transfer = handlerDoneTransfer.bind(this);

handler.call(this, event.data.param, preparedHandlerDone, handlerProgress.bind(this));
var returned = handler.call(this, event.data.param, preparedHandlerDone, handlerProgress.bind(this));

if (isPromise(returned)) {
returned.then(preparedHandlerDone, handlerError.bind(this));
}
}
}.bind(self);
13 changes: 12 additions & 1 deletion src/worker.node/slave.js
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ function messageHandlerError(error) {
});
}

function isPromise (thing) {
return thing && typeof thing.then === 'function';
}

process.on('message', function(data) {
if (data.initByScript) {
messageHandler = require(data.script);
Expand All @@ -67,6 +71,13 @@ process.on('message', function(data) {
// so initialization errors will be printed to console
setupErrorCatcher();

messageHandler(data.param, messageHandlerDone, messageHandlerProgress);
const returned = messageHandler(data.param, messageHandlerDone, messageHandlerProgress);

if (isPromise(returned)) {
returned.then(
(result) => messageHandlerDone(result),
(error) => messageHandlerError(error)
);
}
}
});
45 changes: 30 additions & 15 deletions test/spec/worker.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import sinon from 'sinon';
import Worker from '../../lib/worker';
import { config, spawn } from '../../';


const env = typeof window === 'object' ? 'browser' : 'node';

function echoThread(param, done) {
Expand Down Expand Up @@ -39,7 +38,6 @@ function expectEqualBuffers(buffer1, buffer2) {
}
}


describe('Worker', function () {

this.timeout(4000);
Expand All @@ -55,7 +53,6 @@ describe('Worker', function () {
});
});


it('can be spawned', () => {
const worker = spawn();

Expand Down Expand Up @@ -173,6 +170,8 @@ describe('Worker', function () {
it('can update progress', done => {
const progressUpdates = [];
const worker = spawn(progressThread);
let messageHandlerInvoked = false;
let doneHandlerInvoked = false;

worker.on('progress', progress => {
progressUpdates.push(progress);
Expand All @@ -181,23 +180,21 @@ describe('Worker', function () {

worker.on('message', () => {
expect(progressUpdates).to.eql([ 0.3, 0.6 ]);
done();
});
});

it('does also emit "done" event', done => {
const progressUpdates = [];
const worker = spawn(progressThread);

worker.on('progress', progress => {
progressUpdates.push(progress);
messageHandlerInvoked = true;
maybeDone();
});
worker.send();

worker.on('done', () => {
expect(progressUpdates).to.eql([ 0.3, 0.6 ]);
done();
doneHandlerInvoked = true;
maybeDone();
});

function maybeDone () {
if (messageHandlerInvoked && doneHandlerInvoked) {
done();
}
}
});


Expand Down Expand Up @@ -279,4 +276,22 @@ describe('Worker', function () {

}

// For unknown reasons Firefox will choke on the last test cases
// if the following test cases are not at the end:
// (Only in Firefox, not in Chrome, not in node)

it('can run async method (returning a Promise)', done => {
const worker = spawn((param) => Promise.resolve(param));
canSendAndReceiveEcho(worker, done);
});

it('can handle errors in an async method', done => {
const worker = spawn(() => Promise.reject(new Error('Some error')));
worker.on('error', error => {
expect(error.message).to.match(/^Some error$/);
done();
});
worker.send();
});

});