Skip to content

Commit e270f0c

Browse files
committed
Breaking changes to API. Have types of consumers now.
Readme not updated.
1 parent d6bc13f commit e270f0c

File tree

3 files changed

+118
-23
lines changed

3 files changed

+118
-23
lines changed

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "job-queue",
3-
"version": "0.0.3",
3+
"version": "0.0.4",
44
"description": "A queueing system to schedule jobs and have multiple rate-limited consumers.",
55
"main": "lib/job-queue.js",
66
"scripts": {

src/job-queue-test.coffee

Lines changed: 106 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -4,31 +4,28 @@ assert = require "assert"
44

55
vows.describe "job-queue"
66
.addBatch
7-
"Test 1":
7+
"With no consumers or jobs":
88
topic: new JobQueue
99
"No consumers": (jobQueue) ->
1010
assert.deepEqual jobQueue.consumers, []
1111
"No pending jobs": (jobQueue) ->
1212
assert.equal jobQueue.pendingJobs, 0
13-
"Test 2":
14-
topic: ->
15-
makeConsumer = (consumerId) ->
16-
(job) ->
17-
job.process consumerId
18-
jobQueue = new JobQueue [1..5].map(makeConsumer), 5, 100
19-
jobQueue.addConsumers [6..10].map(makeConsumer), 8, 200
13+
"With MovingWindowRateLimitedConsumer and 500 jobs":
14+
topic: ->
15+
jobQueue = new JobQueue [1..5].map (consumerId) -> new JobQueue.MovingWindowRateLimitedConsumer ((job) -> job.process consumerId), 5, 100
16+
jobQueue.addConsumers [6..10].map (consumerId) -> new JobQueue.MovingWindowRateLimitedConsumer ((job) -> job.process consumerId), 8, 200
2017
for jobId in [1..500] then do (jobId) ->
2118
jobQueue.enqueue
2219
id: jobId
2320
process: (consumerId) ->
2421
jobQueue
25-
"Initially":
22+
"initially":
2623
topic: (jobQueue) -> jobQueue
2724
"10 consumers": (jobQueue) ->
2825
assert.equal jobQueue.consumers.length, 10
2926
"500 pending jobs": (jobQueue) ->
3027
assert.equal jobQueue.pendingJobs, 500
31-
"After 10 ms":
28+
"after 10 ms":
3229
topic: (jobQueue) ->
3330
setTimeout =>
3431
@callback null, jobQueue
@@ -38,7 +35,7 @@ vows.describe "job-queue"
3835
assert.equal jobQueue.consumers.length, 10
3936
"435 pending jobs": (jobQueue) ->
4037
assert.equal jobQueue.pendingJobs, 435
41-
"After 110 ms":
38+
"after 110 ms":
4239
topic: (jobQueue) ->
4340
setTimeout =>
4441
@callback null, jobQueue
@@ -48,7 +45,7 @@ vows.describe "job-queue"
4845
assert.equal jobQueue.consumers.length, 10
4946
"410 pending jobs": (jobQueue) ->
5047
assert.equal jobQueue.pendingJobs, 410
51-
"After 160 ms":
48+
"after 160 ms":
5249
topic: (jobQueue) ->
5350
setTimeout =>
5451
@callback null, jobQueue
@@ -58,7 +55,7 @@ vows.describe "job-queue"
5855
assert.equal jobQueue.consumers.length, 10
5956
"410 pending jobs": (jobQueue) ->
6057
assert.equal jobQueue.pendingJobs, 410
61-
"After 210 ms":
58+
"after 210 ms":
6259
topic: (jobQueue) ->
6360
setTimeout =>
6461
@callback null, jobQueue
@@ -68,7 +65,7 @@ vows.describe "job-queue"
6865
assert.equal jobQueue.consumers.length, 10
6966
"345 pending jobs": (jobQueue) ->
7067
assert.equal jobQueue.pendingJobs, 345
71-
"After 510 ms":
68+
"after 510 ms":
7269
topic: (jobQueue) ->
7370
setTimeout =>
7471
@callback null, jobQueue
@@ -78,7 +75,100 @@ vows.describe "job-queue"
7875
assert.equal jobQueue.consumers.length, 10
7976
"230 pending jobs": (jobQueue) ->
8077
assert.equal jobQueue.pendingJobs, 230
81-
"After 1010 ms":
78+
"after 1010 ms":
79+
topic: (jobQueue) ->
80+
setTimeout =>
81+
@callback null, jobQueue
82+
, 1010
83+
undefined
84+
"10 consumers": (jobQueue) ->
85+
assert.equal jobQueue.consumers.length, 10
86+
"0 pending jobs": (jobQueue) ->
87+
assert.equal jobQueue.pendingJobs, 0
88+
"With CustomRateLimitedConsumer and 500 jobs":
89+
topic: ->
90+
jobQueue = new JobQueue [1..5].map (consumerId) ->
91+
counter = 1
92+
new JobQueue.CustomRateLimitedConsumer [
93+
(job) ->
94+
job.process consumerId
95+
->
96+
d = new Date
97+
d.setUTCMilliseconds d.getUTCMilliseconds() + (counter++) * 1.5
98+
d
99+
]...
100+
jobQueue.addConsumers [6..10].map (consumerId) ->
101+
counter = 1
102+
new JobQueue.CustomRateLimitedConsumer [
103+
(job) ->
104+
job.process consumerId
105+
->
106+
d = new Date
107+
d.setUTCMilliseconds d.getUTCMilliseconds() + (counter++) * 2.5
108+
d
109+
]...
110+
for jobId in [1..500] then do (jobId) ->
111+
jobQueue.enqueue
112+
id: jobId
113+
process: (consumerId) ->
114+
jobQueue
115+
"initially":
116+
topic: (jobQueue) -> jobQueue
117+
"10 consumers": (jobQueue) ->
118+
assert.equal jobQueue.consumers.length, 10
119+
"500 pending jobs": (jobQueue) ->
120+
assert.equal jobQueue.pendingJobs, 500
121+
"after 10 ms":
122+
topic: (jobQueue) ->
123+
setTimeout =>
124+
@callback null, jobQueue
125+
, 10
126+
undefined
127+
"10 consumers": (jobQueue) ->
128+
assert.equal jobQueue.consumers.length, 10
129+
">= 480 pending jobs": (jobQueue) ->
130+
assert jobQueue.pendingJobs >= 480
131+
"after 110 ms":
132+
topic: (jobQueue) ->
133+
setTimeout =>
134+
@callback null, jobQueue
135+
, 110
136+
undefined
137+
"10 consumers": (jobQueue) ->
138+
assert.equal jobQueue.consumers.length, 10
139+
">= 410 pending jobs": (jobQueue) ->
140+
assert jobQueue.pendingJobs >= 410
141+
"after 160 ms":
142+
topic: (jobQueue) ->
143+
setTimeout =>
144+
@callback null, jobQueue
145+
, 160
146+
undefined
147+
"10 consumers": (jobQueue) ->
148+
assert.equal jobQueue.consumers.length, 10
149+
">= 380 pending jobs": (jobQueue) ->
150+
assert jobQueue.pendingJobs >= 380
151+
"after 210 ms":
152+
topic: (jobQueue) ->
153+
setTimeout =>
154+
@callback null, jobQueue
155+
, 210
156+
undefined
157+
"10 consumers": (jobQueue) ->
158+
assert.equal jobQueue.consumers.length, 10
159+
">= 350 pending jobs": (jobQueue) ->
160+
assert jobQueue.pendingJobs >= 350
161+
"after 510 ms":
162+
topic: (jobQueue) ->
163+
setTimeout =>
164+
@callback null, jobQueue
165+
, 510
166+
undefined
167+
"10 consumers": (jobQueue) ->
168+
assert.equal jobQueue.consumers.length, 10
169+
">= 150 pending jobs": (jobQueue) ->
170+
assert jobQueue.pendingJobs >= 150
171+
"after 1010 ms":
82172
topic: (jobQueue) ->
83173
setTimeout =>
84174
@callback null, jobQueue
@@ -90,8 +180,6 @@ vows.describe "job-queue"
90180
assert.equal jobQueue.pendingJobs, 0
91181
.export module
92182

93-
###
94183
process.on "uncaughtException", (err) ->
95184
console.error "Caught exception: " + err
96-
process.removeAllListeners "uncaughtException"
97-
###
185+
process.removeAllListeners "uncaughtException"

src/job-queue.coffee

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,24 +3,24 @@ class JobQueue
33
@consumers = []
44
@pendingJobs = 0
55
@addConsumers arguments...
6-
addConsumers: (consumes = [], limit, period) =>
7-
@consumers.push consumes.map((x) -> new Consumer x, limit, period)...
6+
addConsumers: (consumers = []) =>
7+
@consumers.push consumers...
88
enqueue: (jobs...) =>
99
jobs.forEach (job) =>
1010
sts =
1111
(@consumers
1212
.map (x) -> consumer: x, timestamp: x.getNextTimestamp()
1313
.sort (x, y) -> x.timestamp - y.timestamp
1414
)[0]
15-
sts.consumer.timestamps.push sts.timestamp
15+
sts.consumer.usedTimestamp? sts.timestamp
1616
setTimeout =>
1717
@pendingJobs--
1818
sts.consumer.consume job
1919
, sts.timestamp - new Date
2020
@pendingJobs++
2121
@pendingJobs
2222

23-
class Consumer
23+
class MovingWindowRateLimitedConsumer
2424
constructor: (@consume, @limit, @period) ->
2525
@timestamps = []
2626
getNextTimestamp: =>
@@ -34,5 +34,12 @@ class Consumer
3434
nextTimestamp = new Date @timestamps[@timestamps.length - @limit]
3535
nextTimestamp.setUTCMilliseconds nextTimestamp.getUTCMilliseconds() + @period
3636
nextTimestamp
37+
usedTimestamp: (timestamp) =>
38+
@timestamps.push timestamp
3739

40+
class CustomRateLimitedConsumer
41+
constructor: (@consume, @getNextTimestamp) ->
42+
43+
JobQueue.MovingWindowRateLimitedConsumer = MovingWindowRateLimitedConsumer
44+
JobQueue.CustomRateLimitedConsumer = CustomRateLimitedConsumer
3845
module.exports = JobQueue

0 commit comments

Comments
 (0)