Skip to content

Commit e28e6a9

Browse files
committed
FIX scheduling mismatch with mileseconds
1 parent eea3dcc commit e28e6a9

File tree

7 files changed

+49
-27
lines changed

7 files changed

+49
-27
lines changed

examples/failed.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ manobi.addJob('compress', async (ctx, done) => {
88

99

1010
(async () => {
11+
await manobi.start();
1112
return createTask({
1213
date: new Date().toISOString(),
1314
job: 'compress',

examples/fast.js

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,30 @@
1-
const fetch = require('node-fetch'),
2-
{ worker, createTask } = require('..')(),
1+
const { worker, createTask } = require('..')(),
32
manobi = worker('manobi-fast');
43

54
manobi.addJob('compress', (ctx, done) => {
6-
done(null, {sucesso: 'muleque'});
5+
done(null, {noerrorbaby: true});
76
});
87

98
(async () => {
109
await manobi.start()
11-
await createTask({
10+
/*await createTask({
1211
date: new Date().toISOString(),
1312
job: 'compress',
13+
status: 'waiting',
1414
worker: 'manobi-fast',
1515
repeat: {
16-
interval: {seconds: 1}
16+
limit: 10,
17+
interval: {
18+
seconds: 6
19+
}
20+
},
21+
repeat: {
22+
limit: 10,
23+
interval: {
24+
seconds: 1
25+
}
1726
}
1827
});
28+
*/
1929
//await manobi.stop()
2030
})();

lib/context.js

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@ module.exports = class Context {
3636
}
3737

3838
startTimeout(){
39-
const ms = this.task.timeout || 5000;
4039
this.timeout = setTimeout(() => {
4140
this.cancel();
4241
this.done({error: 'Job timed out'});
@@ -47,6 +46,7 @@ module.exports = class Context {
4746
* @param {Function} job - The job function to be executed
4847
*/
4948
start(job){
49+
console.log(new Date().toISOString(), 'Started Running', this.task.id, 'date', this.task.date);
5050
performance.mark('job_start');
5151
// Defines a timeout for the job
5252
this.startTimeout();
@@ -89,6 +89,7 @@ module.exports = class Context {
8989
* @returns {Number} duration - Time elapsed from start to stop in ms
9090
*/
9191
stop(){
92+
console.log(new Date().toISOString(), 'Stoping');
9293
this.running = undefined;
9394
clearTimeout(this.timeout);
9495
performance.mark('job_stop');
@@ -102,13 +103,12 @@ module.exports = class Context {
102103
* @param {Number} current - Current state of property like repeat.limit=100
103104
* @returns {Boolean} - rerunable or not
104105
*/
105-
rerunable(attr, current){
106+
rerunable(attr, current = 0){
106107
const {task} = this,
107108
node = task[attr] || {},
108109
interval = node.interval,
109110
limit = node.limit || Infinity,
110111
canRerun = current < limit;
111-
112112
if(node && interval && canRerun){
113113
return delay(task.date, interval);
114114
}
@@ -135,12 +135,16 @@ module.exports = class Context {
135135
* @private
136136
* @param {Object | null} err
137137
* @param {Object | null} res
138-
* @returns {String} - done, complete, retry, fail
138+
* @returns {String} - done, complete, retry, failed
139139
*/
140140
nextStatus(err, res){
141-
const success = this.repeatable() ? 'done' : 'complete';
142-
const fail = this.retryable() ? 'retry' : 'failed';
143-
return err ? fail : success;
141+
const success = () => {
142+
return this.repeatable() ? 'done' : 'complete';
143+
}
144+
const fail = () => {
145+
return this.retryable() ? 'retry' : 'failed';
146+
}
147+
return err ? fail() : success();
144148
}
145149

146150
/**

lib/couchdb-adaptor.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,6 @@ module.exports = class Adaptor {
7979
this.feed = this.db.follow({
8080
include_docs: true,
8181
since: 'now',
82-
heartbeat: 1000,
8382
/*
8483
* TODO: use mango query
8584
*/
@@ -108,7 +107,6 @@ module.exports = class Adaptor {
108107
* @method unsubscribe
109108
*/
110109
unsubscribe(){
111-
this.feed.off('change', this._change);
112110
return this.feed.stop();
113111
}
114112

@@ -147,6 +145,8 @@ module.exports = class Adaptor {
147145
_id,
148146
};
149147
delete document.id;
148+
console.log(new Date().toISOString(), 'sending update')
149+
console.log('----------------------------------');
150150
return this.db.insert(document);
151151
}
152152

lib/scheduler.js

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,21 +13,22 @@ module.exports = class Scheduler {
1313
}
1414

1515
schedule(task, callback){
16-
const date = Date.parse(task.date);
16+
const date = new Date(Date.parse(task.date)).setMilliseconds(0)
17+
// Remove previous scheduled task
18+
this.unschedule(task.id);
1719

1820
if(Scheduler.isOutdated(date)){
21+
console.log(new Date().toISOString(), 'executing outdated', task.id, 'date', task.date);
1922
return callback();
2023
}
2124

22-
// Remove previous scheduled task
23-
this.unschedule(task.id);
24-
2525
// Assigns a function that calls the job with data as param
2626
this.scheduled[task.id] = {
2727
run: callback,
2828
date: task.date // For logging reasons schedule the original form
2929
}
30-
console.log('Scheduling', task.id, 'date', task.date);
30+
31+
console.log(new Date().toISOString(), 'Scheduling', task.id, 'date', task.date);
3132
return this.tasks.once(date, this.scheduled[task.id]['run']);
3233
}
3334
/*

lib/worker.js

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@ module.exports = class Worker extends Scheduler {
2121
// Setup storage connection
2222
this.storage = new Adaptor();
2323
this.storage.name = this.name;
24-
this.storage.onChange = (change) => this.changeHandler.call(this, change);
24+
this.storage.onChange = (change) => {
25+
this.changeHandler.call(this, change);
26+
}
2527
this.storage.connect(options);
2628
}
2729

@@ -91,16 +93,20 @@ module.exports = class Worker extends Scheduler {
9193
* @memberof Context
9294
*/
9395
ctx.done = (err, res) => {
94-
if(this.finished){
96+
console.log(new Date().toISOString(), 'Done called with', err, res);
97+
98+
if(ctx.finished){
9599
throw new Error('Done called twice');
96100
}
97101

98-
this.finished = true;
102+
ctx.finished = true;
99103
ctx.stop();
100104

101105
// Enqueue the task update
102106
this.enqueue(task.id, _ => {
103-
return this.storage.update(ctx.next(err, res));
107+
const next = ctx.next(err, res);
108+
console.log(new Date().toISOString(), 'Queued updated', task.id, 'date', task.date);
109+
return this.storage.update(next);
104110
});
105111
}
106112
return ctx;
@@ -117,7 +123,6 @@ module.exports = class Worker extends Scheduler {
117123

118124
return () => {
119125
context.start(job);
120-
console.log('Running', task.id, 'date', task.date);
121126
}
122127
}
123128

@@ -129,16 +134,17 @@ module.exports = class Worker extends Scheduler {
129134
if(!change){
130135
throw new Error(`Undefined task, it expects an object as argument`);
131136
}
137+
138+
console.log(new Date().toISOString(), 'update received', change.id, 'date', change.date);
132139

133140
['id'].forEach(attr => {
134141
if(!change[attr]){
135142
throw new Error(`Invalid task, "${attr}" is missing`);
136143
}
137144
});
138-
139-
console.log('Change', change.id, 'date', change.date);
140145

141146
if(change.status === 'cancel'){
147+
console.log(new Date().toISOString(), 'task have being deleted', change.id, 'date', change.date);
142148
return this.unschedule(change.id);
143149
};
144150

spec/scheduler.spec.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ describe('Scheduler', () => {
2020
it('Append the task to memory book', () => {
2121
const now = DateTime.local(),
2222
date = now.plus({'seconds': 20}).toISO(),
23-
timestamp = DateTime.fromISO(date).toMillis(),
23+
timestamp = DateTime.fromISO(date).startOf('second').toMillis(),
2424
task = {
2525
date,
2626
id: '857957d1f1631ac8714b5d1cfd000d39'

0 commit comments

Comments
 (0)