Skip to content

Commit 1aab58f

Browse files
committed
Added Queue Lifespan functionality
1 parent 30e4a3d commit 1aab58f

File tree

2 files changed

+522
-3
lines changed

2 files changed

+522
-3
lines changed

Models/Queue.js

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -128,9 +128,19 @@ export class Queue {
128128
* queue.start() will return early with a false boolean value instead
129129
* of running multiple queue processing loops concurrently.
130130
*
131+
* Lifespan can be passed to start() in order to run the queue for a specific amount of time before stopping.
132+
* This is useful, as an example, for OS background tasks which typically are time limited.
133+
*
134+
* NOTE: If lifespan is set, only jobs with a timeout property at least 500ms less than remaining lifespan will be processed
135+
* during queue processing lifespan. This is to buffer for the small amount of time required to query Realm for suitable
136+
* jobs, and to mark such jobs as complete or failed when job finishes processing.
137+
*
138+
* IMPORTANT: Jobs with timeout set to 0 that run indefinitely will not be processed if the queue is running with a lifespan.
139+
*
140+
* @param lifespan {number} - If lifespan is passed, the queue will start up and run for lifespan ms, then queue will be stopped.
131141
* @return {boolean|undefined} - False if queue is already started. Otherwise nothing is returned when queue finishes processing.
132142
*/
133-
async start() {
143+
async start(lifespan = 0) {
134144

135145
// If queue is already running, don't fire up concurrent loop.
136146
if (this.status == 'active') {
@@ -139,7 +149,18 @@ export class Queue {
139149

140150
this.status = 'active';
141151

142-
let concurrentJobs = await this.getConcurrentJobs();
152+
// Get jobs to process
153+
const startTime = Date.now();
154+
let lifespanRemaining = null;
155+
let concurrentJobs = [];
156+
157+
if (lifespan !== 0) {
158+
lifespanRemaining = lifespan - (Date.now() - startTime);
159+
lifespanRemaining = (lifespanRemaining === 0) ? -1 : lifespanRemaining; // Handle exactly zero lifespan remaining edge case.
160+
concurrentJobs = await this.getConcurrentJobs(lifespanRemaining);
161+
} else {
162+
concurrentJobs = await this.getConcurrentJobs();
163+
}
143164

144165
while (this.status == 'active' && concurrentJobs.length) {
145166

@@ -153,7 +174,13 @@ export class Queue {
153174
await Promise.all(processingJobs.map(promiseReflect));
154175

155176
// Get next batch of jobs.
156-
concurrentJobs = await this.getConcurrentJobs();
177+
if (lifespan !== 0) {
178+
lifespanRemaining = lifespan - (Date.now() - startTime);
179+
lifespanRemaining = (lifespanRemaining === 0) ? -1 : lifespanRemaining; // Handle exactly zero lifespan remaining edge case.
180+
concurrentJobs = await this.getConcurrentJobs(lifespanRemaining);
181+
} else {
182+
concurrentJobs = await this.getConcurrentJobs();
183+
}
157184

158185
}
159186

0 commit comments

Comments
 (0)