-
Notifications
You must be signed in to change notification settings - Fork 0
/
Queue.ts
59 lines (45 loc) · 1.31 KB
/
Queue.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
/**
* Sends your verify email
*
* @author Faiz A. Farooqui <faiz@geekyants.com>
*/
import * as kue from 'kue';
import Locals from './Locals';
import Log from '../middlewares/Log';
class Queue {
public jobs: any;
constructor() {
this.jobs = kue.createQueue({
prefix: Locals.config().redisPrefix,
redis: {
port: Locals.config().redisHttpPort,
host: Locals.config().redisHttpHost,
db: Locals.config().redisDB
}
});
this.jobs
.on('job enqueue', (_id, _type) => Log.info(`Queue :: #${_id} Processing of type '${_type}'`))
.on('job complete', (_id) => this.removeProcessedJob(_id));
}
public dispatch (_jobName: string, _args: object, _callback: Function): void {
this.jobs.create(_jobName, _args).save();
this.process(_jobName, 3, _callback);
}
private removeProcessedJob (_id): void {
Log.info(`Queue :: #${_id} Processed`);
kue.Job.get(_id, (_err, _job) => {
if (_err) { return; }
_job.remove((_err) => {
if (_err) { throw _err; }
Log.info(`Queue :: #${_id} Removed Processed Job`);
});
});
}
private process (_jobName: string, _count: number, _callback: Function): void {
this.jobs.process(_jobName, _count, (_job, _done) => {
_done(); // Notifies KUE about the completion of the job!
_callback(_job.data);
});
}
}
export default new Queue;