Skip to content

Conversation

@Sobes76rus
Copy link
Contributor

@Sobes76rus Sobes76rus commented May 10, 2023

max_prefetch argument for receiver
In Celery there are worker_prefetch_multiplier
Very useful in some cases

@codecov-commenter
Copy link

codecov-commenter commented May 10, 2023

Codecov Report

Merging #127 (1806979) into develop (e66f3aa) will decrease coverage by 3.13%.
The diff coverage is 53.43%.

❗ Your organization is not using the GitHub App Integration. As a result you may experience degraded service beginning May 15th. Please install the Github App Integration for your organization. Read more.

@@             Coverage Diff             @@
##           develop     #127      +/-   ##
===========================================
- Coverage    67.62%   64.50%   -3.13%     
===========================================
  Files           37       40       +3     
  Lines          942     1062     +120     
===========================================
+ Hits           637      685      +48     
- Misses         305      377      +72     
Impacted Files Coverage Δ
taskiq/cli/watcher.py 0.00% <ø> (ø)
taskiq/cli/worker/process_manager.py 0.00% <0.00%> (ø)
taskiq/cli/worker/run.py 0.00% <0.00%> (ø)
taskiq/receiver/params_parser.py 96.87% <ø> (ø)
taskiq/result_backends/dummy.py 100.00% <ø> (ø)
taskiq/utils.py 72.72% <25.00%> (-27.28%) ⬇️
taskiq/middlewares/prometheus_middleware.py 26.78% <26.78%> (ø)
taskiq/brokers/zmq_broker.py 45.71% <28.57%> (+5.17%) ⬆️
taskiq/abc/broker.py 78.49% <60.00%> (-6.90%) ⬇️
taskiq/receiver/receiver.py 91.93% <96.00%> (ø)
... and 13 more

... and 1 file with indirect coverage changes

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

@Sobes76rus Sobes76rus changed the title feat: receiver max_prefetch feat: receiver max_prefetch argument May 10, 2023
Copy link
Member

@s3rius s3rius left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks great. I like this idea. Before merging it, we need to change API for brokers so this feature will become actually useful.

In async context it's a bit more complicated, because you can spawn async tasks that run actual taskiq tasks, and prefetch feature would reduce execution guarantees. For example. Imagine broker that acks tasks right after yield. It means when somebody iterates over messages from listen async generator, every time he wants to get the next message, we ack previous one. Like this:

async def listen():
	async for message in self.read_channel():
		yield message.data
		await message.ack()

What if I create semaphore that's going to execute 3 tasks asynchronously? It's going to acks two previous tasks even if they were not completed. Currently we're thinking how we can surpass this issue. We introduced a semaphore to reduce amount of data we could possibly loose and now I'm thinking about new broker api that returns not bytes, but some sort of an abstraction with ack function, that we can call when task is done.

I would wait for this ackable messages feature before rolling out this one. What do you think?

@s3rius
Copy link
Member

s3rius commented May 10, 2023

Thank you very much for your contribution. It's really valuable for us.

@Sobes76rus
Copy link
Contributor Author

Sobes76rus commented May 10, 2023

Im working on nats jetstream broker and backend
Ack method i implemented as post_save middleware, works fine
My broker implements function which returns me original message with ack method
Then in middleware i just use it

@Sobes76rus
Copy link
Contributor Author

So i dont think u need to change API

@s3rius
Copy link
Member

s3rius commented May 10, 2023

Okay. Now I see your point. Then, please change default value and we merge it into develop.

@s3rius s3rius merged commit e67dab3 into taskiq-python:develop May 10, 2023
@Sobes76rus Sobes76rus deleted the receiver branch May 10, 2023 22:03
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants