Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Behaviour of noack() - events always being acknowledged #189

Closed
2 tasks done
karlokr-p opened this issue Sep 8, 2021 · 9 comments
Closed
2 tasks done

Behaviour of noack() - events always being acknowledged #189

karlokr-p opened this issue Sep 8, 2021 · 9 comments

Comments

@karlokr-p
Copy link

karlokr-p commented Sep 8, 2021

Checklist

  • I have included information about relevant versions
  • I have verified that the issue persists when using the master branch of Faust.

Steps to reproduce

I am running an agent similar to the following:

@app.agent(notification_topic)
async def downtime_notification(stream):
    async for event in stream.noack().events():

        logger.info(f"Notification received with id={event.value.id}, retry={event.value.retry}")

        if int(event.value.id) % 2 == 0:
            logger.info(f"Acknoledge {event.value.id}")
            await stream.ack(event)
        else:
            raise Exception("not acknowledged") 
    
        yield event

This is slightly different than what I intend to run for production, but in essence its the same. In the above example I am manually sending messages to the topic and only want to ack the message if the message id is divisible by 2.

You can see the full code for this test scenario here: https://github.com/panevo/faust-retry-experiment

Expected behavior

I am expecting that any message that does not have id divisible by 2 to raise an exception and not be acknowledged, and thus to be re-processed by the agent since the offset should not change

Actual behavior

All messages are always acknowledged. Exception is raised for id's indivisible by 2, but is not retried by the agent

Versions

  • Python version
    3.7

  • Faust version
    faust-streaming==0.6.4
    and tried with
    faust-streaming==0.6.9
    as well

  • Operating system
    Linux

  • Kafka version
    docker image: confluentinc/cp-kafka:latest

@davidfarrugia
Copy link

I am also experiencing this behaviour. I am currently using this solution. It would be cool if we can have it implemented.

@karlokr-p
Copy link
Author

karlokr-p commented Sep 30, 2021

@davidfarrugia When you're using this solution, what is the behavior of the agent when a message is not acknowledged? I am expecting the agent to continuously re-process the latest event which has not been acknowledged but I'm not seeing this behavior, instead I just see that the event is processed once but I don't see that the event is processed again.

I am trying to use this code to manually acknowledge requests depending on the response code of an HTTP request so that the HTTP request can be retried until success, but I can't seem to get the noack_take() solution to work, it seems like events are still being acknowledged. I have tried with the following simple code in my agent:

async def test_noack_agent(stream):
    async for events in stream.noack().noack_take(1, within=1):
        for event in events:
            app.logger.info("this should repeat indefinitely")

but I dont see the logging message being repeated, any advice? It is my understanding that the event should be continuously reprocessed if it is not acknowledged. Am I misunderstanding something?

Thank you very much for the help

@zerafachris
Copy link
Contributor

Hey @karlokr-p, I am a developer with @davidfarrugia. The implementation presented does not handle the repetition. You would need to handle the repetition via code. You could do this before the yield via:

  • collect the payload state/key,
  • if state/key is not acknowledged, try and reprocess
  • else, yield event.value

@karlokr-p
Copy link
Author

Hi @zerafachris, thank you so much for the help, this solved my issue!

@ostetsenko
Copy link
Contributor

@zerafachris
@karlokr-p
@mdrago98

Hello, sorry for the noise... does it work using async for event in stream.noack().events() now?
What does @zerafachris' implementation from the description above look like?

@mishranik
Copy link

@ostetsenko were you able to get the working solution for Noack streams? It still seems to acknowledge the event even when not marked acknowledge.

@karlokr-p
Copy link
Author

Hi @ostetsenko and @mishranik, sorry I didnt see your messages sooner!

Here's some boilerplate code showing our solution:

@app.agent(some_topic)
async def broadcast_message(stream):
    """Broadcast message"""
    async for events in stream.noack().noack_take(10, within=1):
        for event in events:

            message = get_message(event.value)
            url = get_url(event.value)

            while not event.acked:
                try:
                    timeout = aiohttp.ClientTimeout(total=settings.DEFAULT_HTTP_TIMEOUT_S)
                    async with aiohttp.ClientSession(json_serialize=ujson.dumps, timeout=timeout) as session:
                        async with session.post(
                            url=url,
                            json=message,
                        ) as response:
                            if response.status == 200:
                                app.logger.info(f"Message sent successfully")
                                await stream.ack(event)
                                event.acked = True
                                yield event.value
                            elif response.status == 400:
                                app.logger.error(f"Bad request, continue")
                                await stream.ack(event)
                                event.acked = True
                            else:
                                await asyncio.sleep(0.5)  # sleep 1s so we do not send too many request at once
                                app.logger.warning(f"Received HTTP {response.status} from {url}. Retrying")
                            yield event.value
                except Exception as error:
                    app.logger.error("Uknown error, try again")
                    yield event.value
                    await asyncio.sleep(1)

            yield event.value

This has worked for us in testing and production

@ostetsenko
Copy link
Contributor

@karlokr-p Looks good. What happens with yield event.value / How and when it will be reprocessed? Will it be reprocessed in the next iteration of "async for events in stream.noack().noack_take(10, within=1):" loop?

@gpkc
Copy link

gpkc commented Apr 6, 2023

For me, the noack solution is working. To reprocess it I added the processing_guarantee="exactly_once" setting to the Faust app though. I am not sure why this is the case, since this seems to just change the isolation level of the consumers, and I am not using Faust or transactions for producing events.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants