-
Notifications
You must be signed in to change notification settings - Fork 184
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Behaviour of noack() - events always being acknowledged #189
Comments
I am also experiencing this behaviour. I am currently using this solution. It would be cool if we can have it implemented. |
@davidfarrugia When you're using this solution, what is the behavior of the agent when a message is not acknowledged? I am expecting the agent to continuously re-process the latest event which has not been acknowledged but I'm not seeing this behavior, instead I just see that the event is processed once but I don't see that the event is processed again. I am trying to use this code to manually acknowledge requests depending on the response code of an HTTP request so that the HTTP request can be retried until success, but I can't seem to get the async def test_noack_agent(stream):
async for events in stream.noack().noack_take(1, within=1):
for event in events:
app.logger.info("this should repeat indefinitely") but I dont see the logging message being repeated, any advice? It is my understanding that the event should be continuously reprocessed if it is not acknowledged. Am I misunderstanding something? Thank you very much for the help |
Hey @karlokr-p, I am a developer with @davidfarrugia. The implementation presented does not handle the repetition. You would need to handle the repetition via code. You could do this before the yield via:
|
Hi @zerafachris, thank you so much for the help, this solved my issue! |
@zerafachris Hello, sorry for the noise... does it work using |
@ostetsenko were you able to get the working solution for Noack streams? It still seems to acknowledge the event even when not marked acknowledge. |
Hi @ostetsenko and @mishranik, sorry I didnt see your messages sooner! Here's some boilerplate code showing our solution: @app.agent(some_topic)
async def broadcast_message(stream):
"""Broadcast message"""
async for events in stream.noack().noack_take(10, within=1):
for event in events:
message = get_message(event.value)
url = get_url(event.value)
while not event.acked:
try:
timeout = aiohttp.ClientTimeout(total=settings.DEFAULT_HTTP_TIMEOUT_S)
async with aiohttp.ClientSession(json_serialize=ujson.dumps, timeout=timeout) as session:
async with session.post(
url=url,
json=message,
) as response:
if response.status == 200:
app.logger.info(f"Message sent successfully")
await stream.ack(event)
event.acked = True
yield event.value
elif response.status == 400:
app.logger.error(f"Bad request, continue")
await stream.ack(event)
event.acked = True
else:
await asyncio.sleep(0.5) # sleep 1s so we do not send too many request at once
app.logger.warning(f"Received HTTP {response.status} from {url}. Retrying")
yield event.value
except Exception as error:
app.logger.error("Uknown error, try again")
yield event.value
await asyncio.sleep(1)
yield event.value This has worked for us in testing and production |
@karlokr-p Looks good. What happens with |
For me, the |
Checklist
master
branch of Faust.Steps to reproduce
I am running an agent similar to the following:
This is slightly different than what I intend to run for production, but in essence its the same. In the above example I am manually sending messages to the topic and only want to ack the message if the message
id
is divisible by 2.You can see the full code for this test scenario here: https://github.com/panevo/faust-retry-experiment
Expected behavior
I am expecting that any message that does not have
id
divisible by 2 to raise an exception and not be acknowledged, and thus to be re-processed by the agent since the offset should not changeActual behavior
All messages are always acknowledged. Exception is raised for
id
's indivisible by 2, but is not retried by the agentVersions
Python version
3.7
Faust version
faust-streaming==0.6.4
and tried with
faust-streaming==0.6.9
as well
Operating system
Linux
Kafka version
docker image: confluentinc/cp-kafka:latest
The text was updated successfully, but these errors were encountered: