Skip to content

Commit

Permalink
simplified logic
Browse files Browse the repository at this point in the history
  • Loading branch information
adrienjoly committed Oct 10, 2017
1 parent 49dd33b commit 6815e43
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 66 deletions.
33 changes: 1 addition & 32 deletions RateLimiter.js
Original file line number Diff line number Diff line change
Expand Up @@ -104,42 +104,11 @@ class RateLimiter extends EventEmitter {
jobEnded() {
--this.running;
this._appendEndedJob(); // mutates this.recentJobs
const nextExpiredJobDate = this.recentJobs[0]; // must be done early, to prevent race condition
const canRunMore = this.canRunMore();
this.log.trace('RateLimiter:jobEnded => running: ', this.running || '0', ', canRunMore: ', canRunMore || 'false');
this.log.trace('RateLimiter:jobEnded => running: ', this.running || '0');
if (this.running === 0) {
this.log.trace('RateLimiter ⚡️ drain');
process.nextTick(() => this.emit('drain'));
}
if (canRunMore) {
// rate limit is not exceeded => ask Qyu for another job
this.log.trace('RateLimiter ⚡️ avail');
process.nextTick(() => this.emit('avail'));
} else {
// rate limit is temporally exceeded => wait a bit before asking Qyu for another job
this._emitOnNextAvail(nextExpiredJobDate);
}
}

/**
* Will emit `avail` asap while respecting rate limit.
* @private
* @param {Date} nextExpiredJobDate - end date of the first recent job to expire.
*/
_emitOnNextAvail(nextExpiredJobDate) {
this.log.trace('RateLimiter:_emitOnNextAvail, nextExpiredJobDate: ', nextExpiredJobDate || '(none)');
if (!nextExpiredJobDate) return;
const remainingMsUntilAvail = ONE_SECOND - (new Date() - nextExpiredJobDate);
this.log.debug('RateLimiter:_emitOnNextAvail will emit in ', remainingMsUntilAvail, ' ms ...');
setTimeout(() => {
const canRunMore = this.canRunMore();
this.log.trace('RateLimiter:_emitOnNextAvail [timeout] canRunMore: ', canRunMore || 'false');
//if (canRunMore) {
this.log.trace('RateLimiter ⚡️ avail');
this.emit('avail');
//}
// TODO: if still not ready, try again later?
}, remainingMsUntilAvail);
}

/**
Expand Down
43 changes: 16 additions & 27 deletions qyu.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ const LOWEST_PRIO = 10;
const DEFAULT_QUEUE_OPTIONS = {
log: { trace: () => {}, debug: () => {} }, // can be replaced by instance of simple-node-logger
rateLimit: null, // falsy => process in series. otherwise: max number of jobs to run within 1 second
statsInterval: 1000, // emit `stats` every second
statsInterval: 500, // emit `stats` every second
};

const DEFAULT_JOB_OPTIONS = {
Expand Down Expand Up @@ -49,8 +49,8 @@ class Qyu extends EventEmitter {
* @property {number} nbJobsPerSecond - number of jobs that are processed per second
*/
this.emit('stats', stats);
this._processJobs();
});
this.rateLimiter.on('avail', this._processJobs.bind(this));
}

/**
Expand Down Expand Up @@ -104,23 +104,18 @@ class Qyu extends EventEmitter {
_jobEnded(job, withError, jobResultOrError) {
this.log.trace('Qyu:_jobEnded() ', Array.prototype.slice.call(arguments));
this.rateLimiter.jobEnded();
const jobObj = { jobId: job.id };
if (withError) {
const failObj = {
jobId: job.id,
error: jobResultOrError,
};
const failObj = Object.assign(jobObj, { error: jobResultOrError });
this._error(failObj);
//job.pushPromise.reject(failObj); // TODO: uncomment if push() should resolve in case of error
} else {
const doneObj = {
jobId: job.id,
jobResult: jobResultOrError,
res: null, // TODO
};
const doneObj = Object.assign(jobObj, { jobResult: jobResultOrError });
this._done(doneObj);
job.pushPromise.resolve(doneObj);
}
this._drainIfNoMore();
this._processJobs();
}

/**
Expand All @@ -143,8 +138,8 @@ class Qyu extends EventEmitter {
remaining: this.jobs.map(j => j.id),
readyToRunJobs
});
if (this._readyToRunJobs()) {
this.rateLimiter.toggle(true); // necessary for jobs pushed after drain
if (readyToRunJobs) {
//this.rateLimiter.toggle(true); // necessary for jobs pushed after drain
const priority = Math.min.apply(Math, this.jobs.map(job => job.opts.priority));
const job = this.jobs.find(job => job.opts.priority === priority);
this.jobs = this.jobs.filter(j => j.id !== job.id); // remove job from queue
Expand All @@ -161,18 +156,9 @@ class Qyu extends EventEmitter {
* @private
*/
_processJobs() {
const readyToRunJobs = this._readyToRunJobs();
this.log.trace('Qyu:_processJobs() ', {
started: this.started,
running: this.rateLimiter.running,
remaining: this.jobs.map(j => j.id),
readyToRunJobs
});
if (readyToRunJobs) {
do {
this._processJob();
} while (this._readyToRunJobs());
}
do {
this._processJob();
} while (this._readyToRunJobs());
}

_drainIfNoMore() {
Expand Down Expand Up @@ -209,7 +195,10 @@ class Qyu extends EventEmitter {
opts: Object.assign({}, DEFAULT_JOB_OPTIONS, opts),
pushPromise: { resolve, reject }
});
this._processJob(); // useful for when jobs were pushed after Qyu was started
if (this.started) {
this.rateLimiter.toggle(true); // necessary for jobs pushed after drain
}
this._processJobs(); // useful for when jobs were pushed after Qyu was started
});
}

Expand All @@ -235,8 +224,8 @@ class Qyu extends EventEmitter {
this.started = true;
// throw 'dumm2'; // for testing
this.rateLimiter.toggle(true); // makes sure that the interval is started asap
this._processJobs();
this._drainIfNoMore();
this._processJobs();
resolve();
});
}
Expand Down
11 changes: 4 additions & 7 deletions qyu.log
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,18 @@
[QYU-LOG] TRACE Qyu:pause()
[QYU-LOG] TRACE Qyu:start()
[QYU-LOG] TRACE RateLimiter:toggle true
[QYU-LOG] TRACE Qyu:_processJobs() {"started":true,"running":0,"remaining":[0],"readyToRunJobs":true}
[QYU-LOG] TRACE Qyu:_drainIfNoMore() {"started":true,"running":0,"remaining":[0]}
[QYU-LOG] TRACE Qyu:_processJob() {"started":true,"running":0,"remaining":[0],"readyToRunJobs":true}
[QYU-LOG] TRACE RateLimiter:toggle true
[QYU-LOG] DEBUG Qyu starting job {"id":0,"opts":{"priority":1},"pushPromise":{}}
[QYU-LOG] TRACE RateLimiter:jobStarted => running: 1
[QYU-LOG] TRACE Qyu:_drainIfNoMore() {"started":true,"running":1,"remaining":[]}
[QYU-LOG] TRACE RateLimiter:toggle false
Job done 0
No more jobs to do
[QYU-LOG] TRACE Qyu:_jobEnded() [{"id":0,"opts":{"priority":1},"pushPromise":{}},false,{"Hello":"world!"}]
[QYU-LOG] TRACE RateLimiter:jobEnded => running: 0, canRunMore: true
[QYU-LOG] TRACE RateLimiter:jobEnded => running: 0
[QYU-LOG] TRACE RateLimiter ⚡️ drain
[QYU-LOG] TRACE RateLimiter ⚡️ avail
[QYU-LOG] TRACE Qyu ⚡️ done {"jobId":0,"jobResult":{"Hello":"world!"},"res":null}
[QYU-LOG] TRACE Qyu ⚡️ done {"jobId":0,"jobResult":{"Hello":"world!"}}
[QYU-LOG] TRACE Qyu:_drainIfNoMore() {"started":true,"running":0,"remaining":[]}
[QYU-LOG] TRACE Qyu ⚡️ drain
[QYU-LOG] TRACE RateLimiter:toggle false
[QYU-LOG] TRACE Qyu:_processJobs() {"started":true,"running":0,"remaining":[],"readyToRunJobs":0}
[QYU-LOG] TRACE Qyu:_processJob() {"started":true,"running":0,"remaining":[],"readyToRunJobs":0}

0 comments on commit 6815e43

Please sign in to comment.