Skip to content

Commit

Permalink
fix: only increment the current_concurrency counter if the job max …
Browse files Browse the repository at this point in the history
…`max_concurrency` set

Seems to have been a small oversight when popping idempotent jobs
from the queue on Redis.

Fixes: Issue NicolasLM#15
  • Loading branch information
nisimond committed Mar 26, 2024
1 parent b236f46 commit 0aaefa9
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 1 deletion.
2 changes: 1 addition & 1 deletion spinach/brokers/redis_scripts/get_jobs_from_queue.lua
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ repeat
-- job is idempotent, must track if it's running
redis.call('hset', running_jobs_key, job["id"], job_json)
-- If tracking concurrency, bump the current value.
if max_concurrency ~= -1 then
if max_concurrency ~= nil and max_concurrency ~= -1 then
redis.call('hincrby', current_concurrency_key, job['task_name'], 1)
end
end
Expand Down
14 changes: 14 additions & 0 deletions tests/test_redis_brokers.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,20 @@ def test_cant_exceed_max_concurrency(broker):
queued = broker._r.lpop(broker._to_namespaced('foo_queue'))
assert json.loads(queued.decode())['id'] == str(job2.id)

def test_does_not_set_concurrency_key_when_no_max_concurrency(broker):
job = Job(
CONCURRENT_TASK_NAME, 'foo_queue', datetime.now(timezone.utc), 1,
# kwargs help with debugging but are not part of the test.
task_kwargs=dict(name='job'),
)
broker.enqueue_jobs([job])
returned_jobs = broker.get_jobs_from_queue('foo_queue', 2)
assert returned_jobs[0].task_kwargs == dict(name='job')
current = broker._r.hget(
broker._to_namespaced(CURRENT_CONCURRENCY_KEY), CONCURRENT_TASK_NAME
)
assert current is None


def test_get_jobs_from_queue_returns_all_requested(broker):
# If a job is not returned because it was over concurrency limits,
Expand Down

0 comments on commit 0aaefa9

Please sign in to comment.