-
-
Notifications
You must be signed in to change notification settings - Fork 109
feat: receiver max_prefetch argument #127
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Codecov Report
❗ Your organization is not using the GitHub App Integration. As a result you may experience degraded service beginning May 15th. Please install the Github App Integration for your organization. Read more. @@ Coverage Diff @@
## develop #127 +/- ##
===========================================
- Coverage 67.62% 64.50% -3.13%
===========================================
Files 37 40 +3
Lines 942 1062 +120
===========================================
+ Hits 637 685 +48
- Misses 305 377 +72
... and 1 file with indirect coverage changes 📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
s3rius
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks great. I like this idea. Before merging it, we need to change API for brokers so this feature will become actually useful.
In async context it's a bit more complicated, because you can spawn async tasks that run actual taskiq tasks, and prefetch feature would reduce execution guarantees. For example. Imagine broker that acks tasks right after yield. It means when somebody iterates over messages from listen async generator, every time he wants to get the next message, we ack previous one. Like this:
async def listen():
async for message in self.read_channel():
yield message.data
await message.ack()What if I create semaphore that's going to execute 3 tasks asynchronously? It's going to acks two previous tasks even if they were not completed. Currently we're thinking how we can surpass this issue. We introduced a semaphore to reduce amount of data we could possibly loose and now I'm thinking about new broker api that returns not bytes, but some sort of an abstraction with ack function, that we can call when task is done.
I would wait for this ackable messages feature before rolling out this one. What do you think?
|
Thank you very much for your contribution. It's really valuable for us. |
|
Im working on nats jetstream broker and backend |
|
So i dont think u need to change API |
|
Okay. Now I see your point. Then, please change default value and we merge it into develop. |
max_prefetchargument for receiverIn Celery there are
worker_prefetch_multiplierVery useful in some cases