Skip to content

Feat(Stream): Use redis stream #504

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

saber-solooki
Copy link

@saber-solooki saber-solooki commented Jul 10, 2025

The whole PR got inspired from these two PR: #451 and #492 (Great job). Also I took a look at this library to get some idea. (Great Library)

I decided to create another PR for redis stream because I noticed some improvements on those PR which I try to explain here:

#451 try to use redis stream but still using _pool_iteration which is somehow utilizing the old implementation without any benefit. Not sure what stream feature is actually happening there.
However, it has some nice ideas, like passing the flag to decide whether the user wants to use the stream or not.

On the other hand, the second PR has a more accurate usage of stream, but still has some ideas for implementation which I didn't understand correctly. On the other hand, it assume the user only wants to use stream by removing the old implementation while enqueuing the job.

The second thing that I can't support was registering 3 different task in the worker:

asyncio.ensure_future(self.run_delayed_queue_poller()),
asyncio.ensure_future(self.run_stream_reader()),
asyncio.ensure_future(self.run_idle_consumer_cleanup()),

The run_delayed_queue_poller always tries to read jobs from the simple queue and put it in the stream. If you take a close look, it has some logic, some places to keep putting the task in the queue, and again read it here and put it in the stream.

The run_stream_reader is the actual method to read tasks from the stream.
The run_idle_consumer_cleanup also tries to remove idle consumers from the Redis. Imagine if we have multiple workers who always try to clean Redis. It can cause some clean operations on Redis, which I believe we can improve. I try to address removing the consumer in the close method like this part of the library. We need to clean up the consumer because we are generating a consumer name each time the worker is brought up. I can't imagine any easy way to consistently generate a unique name for our consumers.

Also I removed _unclaim_job from the implementation since from my understanding, the usage was to unclaim any job that consumer is trying to get while it is already taken by another consumer. Well, that is the whole point of using Redis stream to prevent such scenario and it shouldn't happen unless there is something wrong with Redis.

As a final note, I also added some improvements in some places to make the logic more robust, like if self.use_stream

It is worth mentioning that we have been using this implementation on our product environment for almost 2 months and everything is working as expected. All the benefits and all metric improvements are almost close to what is mentioned in this PR.

Thanks again @RB387 and @ajac-zero for your great job. If you are still interested in the topic, I appreciate having you review 🙏

If everything is fine from the owner, I will add the final tests to have 100% test coverage regarding my changes.

@codecov-commenter
Copy link

codecov-commenter commented Jul 10, 2025

⚠️ Please install the 'codecov app svg image' to ensure uploads and comments are reliably processed by Codecov.

Codecov Report

Attention: Patch coverage is 50.00000% with 74 lines in your changes missing coverage. Please review.

Files with missing lines Patch % Lines
arq/worker.py 44.85% 51 Missing and 8 partials ⚠️
arq/connections.py 35.71% 7 Missing and 2 partials ⚠️
arq/jobs.py 75.00% 3 Missing and 2 partials ⚠️
arq/utils.py 50.00% 1 Missing ⚠️

❗ Your organization needs to install the Codecov GitHub app to enable full functionality.

@@            Coverage Diff             @@
##             main     #504      +/-   ##
==========================================
- Coverage   96.27%   90.30%   -5.98%     
==========================================
  Files          11       12       +1     
  Lines        1074     1196     +122     
  Branches      209      223      +14     
==========================================
+ Hits         1034     1080      +46     
- Misses         19       83      +64     
- Partials       21       33      +12     
Files with missing lines Coverage Δ
arq/constants.py 100.00% <100.00%> (ø)
arq/lua_script.py 100.00% <100.00%> (ø)
arq/utils.py 98.78% <50.00%> (-1.22%) ⬇️
arq/jobs.py 95.45% <75.00%> (-2.71%) ⬇️
arq/connections.py 83.04% <35.71%> (-7.03%) ⬇️
arq/worker.py 87.60% <44.85%> (-9.57%) ⬇️

Continue to review full report in Codecov by Sentry.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 7a911f3...ab1a21a. Read the comment docs.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants