Skip to content

Commit

Permalink
add rejectErrorsOnPush option + better comments
Browse files Browse the repository at this point in the history
  • Loading branch information
adrienjoly committed Oct 11, 2017
1 parent eebc613 commit 5ec4381
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 12 deletions.
8 changes: 3 additions & 5 deletions RateLimiter.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ const MAKE_RECENT_JOB_CHECKER = () => {
* Counts jobs per second to provide stats and commit to rating limit.
* @fires RateLimiter#stats
* @fires RateLimiter#drain
* @fires RateLimiter#avail
*/
class RateLimiter extends EventEmitter {

Expand All @@ -23,7 +22,6 @@ class RateLimiter extends EventEmitter {
* @param {number} opts.rateLimit - Maximum number of jobs to be run per second. If `null`, jobs will be run sequentially.
* @param {number} opts.statsInterval - interval for emitting `stats`, in ms
* @param {SimpleNodeLogger} opts.log - instance of simple-node-logger (optional)
*/
constructor(opts) {
super(opts);
Expand All @@ -41,7 +39,6 @@ class RateLimiter extends EventEmitter {
* @private
*/
_cleanRecentJobs() {
//console.log(now, 'cleaned', this.recentJobs.filter(date => now - date <= ONE_SECOND));
return this.recentJobs.filter(MAKE_RECENT_JOB_CHECKER());
}

Expand Down Expand Up @@ -118,9 +115,10 @@ class RateLimiter extends EventEmitter {
canRunMore() {
if (this.opts.rateLimit === null) {
return this.running === 0; // run jobs sequentially, without applying rate limit
} else {
const nbJobsEndedDuringLastSecond = this._cleanRecentJobs().length;
return this.running + nbJobsEndedDuringLastSecond < this.opts.rateLimit;
}
const nbJobsEndedDuringLastSecond = this._cleanRecentJobs().length;
return this.running + nbJobsEndedDuringLastSecond < this.opts.rateLimit;
}

/**
Expand Down
21 changes: 14 additions & 7 deletions qyu.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@ const DEFAULT_JOB_OPTIONS = {
priority: LOWEST_PRIO, // low job priority by default, when calling push()
};

// useful to get stack traces from UnhandledPromiseRejectionWarning errors
if (process.env.CATCH_UNHANDLED_REJECTIONS) {
process.on('unhandledRejection', r => console.error(r));
}

var nextJobId = 0; // global job counter, used to generate unique ids

/**
Expand All @@ -30,6 +35,7 @@ class Qyu extends EventEmitter {
* @param {number} opts.rateLimit - Maximum number of jobs to be run per second. If `null`, jobs will be run sequentially.
* @param {number} opts.statsInterval - interval for emitting `stats`, in ms
* @param {SimpleNodeLogger} opts.log - instance of simple-node-logger (optional)
* @param {boolean} rejectErrorsOnPush - if true, push()'s premise will reject in case of job error
*/
constructor(opts) {
super(opts);
Expand All @@ -38,7 +44,6 @@ class Qyu extends EventEmitter {
this.log.trace('Qyu:constructor() ', opts);
this.jobs = []; // unsorted array of { job, opts } objects
this.started = false; // turns to `true` when client called `start()`
// NOTE: could use `Symbol` to prevent properties from being accessed/mutated externally
this.rateLimiter = new RateLimiter(this.opts);
this.rateLimiter.on('stats', (stats) => {
this.log.trace('Qyu ⚡️ stats ', stats);
Expand All @@ -49,7 +54,7 @@ class Qyu extends EventEmitter {
* @property {number} nbJobsPerSecond - number of jobs that are processed per second
*/
this.emit('stats', stats);
this._processJobs();
this._processJobs(); // will run a job if possible
});
}

Expand Down Expand Up @@ -108,7 +113,9 @@ class Qyu extends EventEmitter {
if (withError) {
const failObj = Object.assign(jobObj, { error: jobResultOrError });
this._error(failObj);
//job.pushPromise.reject(failObj); // TODO: uncomment if push() should resolve in case of error
if (this.opts.rejectErrorsOnPush) {
job.pushPromise.reject(failObj);
}
} else {
const doneObj = Object.assign(jobObj, { jobResult: jobResultOrError });
this._done(doneObj);
Expand Down Expand Up @@ -161,6 +168,10 @@ class Qyu extends EventEmitter {
} while (this._readyToRunJobs());
}

/**
* emits drain and disables the rate limiter if there are no more jobs to process.
* @private
*/
_drainIfNoMore() {
this.log.trace('Qyu:_drainIfNoMore() ', {
started: this.started,
Expand Down Expand Up @@ -237,7 +248,3 @@ function qyu(opts) {
}

module.exports = qyu;

// useful to get stack traces from UnhandledPromiseRejectionWarning errors:
//process.on('unhandledRejection', r => console.error(r));
// see https://github.com/nodejs/node/issues/9523#issuecomment-259303079

0 comments on commit 5ec4381

Please sign in to comment.