Skip to content

Commit 3703dde

Browse files
committed
Allow chaining on addJob
1 parent 8c250bf commit 3703dde

File tree

4 files changed

+44
-20
lines changed

4 files changed

+44
-20
lines changed

examples/fast.js

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,21 @@
11
const fetch = require('node-fetch'),
22
{ worker, createTask } = require('..')(),
33
manobi = worker('manobi-fast');
4-
4+
55
manobi.addJob('compress', (ctx, done) => {
66
done(null, {sucesso: 'muleque'});
77
});
88

99
(async () => {
10-
return createTask({
11-
date: new Date().toISOString(),
12-
job: 'compress',
13-
worker: 'manobi-fast'
10+
await manobi.start().then(() => {
11+
return manobi.stop();
1412
});
13+
// return createTask({
14+
// date: new Date().toISOString(),
15+
// job: 'compress',
16+
// worker: 'manobi-fast'
17+
// }).then(() => {
18+
// manobi.tasks.stop();
19+
// return manobi.storage.unsubscribe();
20+
// });
1521
})();

lib/couchdb-adaptor.js

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ module.exports = class Adaptor {
1717
constructor(){
1818
this.name = undefined;
1919
this.onChange = undefined;
20+
this.feed = {};
2021
}
2122

2223
/**
@@ -69,10 +70,10 @@ module.exports = class Adaptor {
6970
*/
7071
async subscribe(){
7172
// for cold start
72-
const docs = await this.looseTasks();
73-
docs.forEach((doc) => {
74-
this.onChange(this.formatDoc(doc));
75-
});
73+
const docs = await this.looseTasks();
74+
docs.forEach((doc) => {
75+
this.onChange(this.formatDoc(doc));
76+
});
7677

7778
const name = this.name;
7879
this.feed = this.db.follow({
@@ -92,8 +93,17 @@ module.exports = class Adaptor {
9293
const task = this.formatDoc(doc, deleted);
9394
this.onChange(task);
9495
});
96+
9597
// Start the stream
96-
this.feed.follow();
98+
return this.feed.follow();
99+
}
100+
101+
/**
102+
* @method unsubscribe
103+
*/
104+
unsubscribe(){
105+
this.feed.off('change', this.onChange);
106+
return this.feed.stop();
97107
}
98108

99109
/**

lib/worker.js

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,21 @@ module.exports = class Worker extends Scheduler {
2323
this.storage.name = this.name;
2424
this.storage.onChange = (change) => this.changeHandler.call(this, change);
2525
this.storage.connect(options);
26-
this.storage.subscribe();
26+
}
27+
28+
/**
29+
* @method start
30+
*/
31+
start(){
32+
return this.storage.subscribe();
33+
}
34+
35+
/**
36+
* @method stop
37+
*/
38+
stop(){
39+
this.tasks.stop();
40+
return this.storage.unsubscribe();
2741
}
2842

2943
enqueue(id, fn){
@@ -56,18 +70,11 @@ module.exports = class Worker extends Scheduler {
5670
addJob(name, job){
5771
if(name && job && name.length > 0 && (typeof job === 'function')){
5872
this.jobs[name] = job;
59-
return this.jobs;
73+
return this;
6074
}
6175
throw new Error('"name {String}" and job {Function} are both required');
6276
}
6377

64-
/**
65-
* @method exit
66-
*/
67-
exit(){
68-
this.storage.unsubscribe()
69-
}
70-
7178
/**
7279
* @method createContext
7380
* @param {Object} task

spec/worker.spec.js

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ describe('Worker', () => {
3535
done(null, true);
3636
});
3737
});
38+
3839
afterEach(() => {
3940
worker.tasks.stop();
4041
})
@@ -60,7 +61,7 @@ describe('Worker', () => {
6061
describe('#exit', () => {
6162
it('calls adaptor unsubscribe method', () => {
6263
const spy = sinon.spy(worker.storage, 'unsubscribe');
63-
worker.exit()
64+
worker.stop();
6465
expect(spy).to.have.been.called;
6566
})
6667
});

0 commit comments

Comments
 (0)