-
Notifications
You must be signed in to change notification settings - Fork 287
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Upgrade from 0.34.0 to 0.36.0 causes consumer to stop consuming but keep running #638
Comments
Thanks for reporting it, we'll investigate further. Can you provide some snippet of your code? What consumer are you using? Stream, base? the properties, any logs, etc. |
Confirming that I too encountered a similar issue, which resulted in significant increases in idle time being reported across the board, and some instances of what appeared to be an executor stall (almost like if some blocking calls were on the async path). I happened to be using a producer for this, with no consumers on my particular service, though I do use a long publish batching window of 1 second. Downgrading from |
@zhrebicek @neoeinstein The event API does allow some transient errors to be propagated that the callback API did not. @scanterog brought this up to upstream here: confluentinc/librdkafka#4493 . Is it possible that your code might have not handled these explicitly, leading to either the consumer or producer becoming idle? This may manifest with |
We tested internally today the |
On our side, we are using I did also confirm that we did not see any errors bubble up out of the Edit to confirm the properties we set: bootstrap.servers=<broker>
client.id=<identifier>
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.username=<username>
sasl.password=<password>
request.required.acks=-1
queue.buffering.max.ms=1000 On a side note, I do have an internal strongly-typed convention for adding these to the ClientConfig if that has any interest to you. We have logging set to Our rdkafka dependency is set up to statically link |
We use client.id: <id>
enable.auto.commit: true
enable.auto.offset.store: false
enable.partition.eof: false
linger.ms: 100
enable.idempotence: true @davidblewett We can look into handling explicitly the errors you speak about, but we basically construct new consumer on any error. loop {
let consumer = Arc::new(try_create_consumer_from_config(
kafka_config.consumer.clone(),
)?);
...
consumer.subscribe(&topics)?;
match run(
... (consumer.stream ... inside -> anyhow::Result<()>)
)
.await
{
Ok(()) => break Ok(()),
Err(error) => warn!(?error, "Processor failed"),
}
} But all we have is this on rebalance
Edit: I see one of these couple of days back, but only one instance of it and I am not even sure it happened on
|
One addition we had no issues for producers ( Thanks a lot for looking into this! |
I went ahead and yanked the |
I'm surprised we're seeing a working case of the producer with @zhrebicek and a non working one with @neoeinstein. I wonder what the differences would be here. @zhrebicek I see you're setting @neoeinstein can you provide an exact snippet of what you're doing? I wasn't able to reproduce it as well with the |
@scanterog We do it like this consumer
.stream()
.take_until(token.wait_for_shutdown())
.err_into::<anyhow::Error>()
// Rest of pipe working with messages.
// Last step before storing offsets is publishing converted data to other topic.
.try_buffered(converter_config.max_in_flight)
.try_for_each(|message| {
let span = message.span();
let consumer = consumer.clone();
async move {
consumer.store_offset(message.topic(), message.partition(), message.offset())?;
Ok(())
}.instrument(span)
})
.await |
@zhrebicek yeah that translates to stopping the stream entirely when an error is encountered. If a kafka cluster is under high load, then most likely you'll have a lot of requests retries that will be bubbled up as a transient errors. That's usually persistent which translates to the consumer not making progress at all and you'll be recreating the consumers constantly. The only thing that does not add up to me here is that I'd expect still the error to be logged everytime the stream stop and the consumer is recreated. Are you logging these events? One potential solution is to stop forwarding those transient errors to the app (as with the callback API) but sadly rdkafka does not provide any api to identify those and will require to scan the rdkafka code base and filter those codes in our side. |
@scanterog we are logging all the errors, and there are no consequent errors after the thing is stuck, so it should not be recreating in loop. From what I remember there were at most 3 consequent errors like this that would cause recreation of it all per pod and even with that other pods did not stuck. |
@zhrebicek I see. We haven't able to reproduce this so far. We have tests running for over 5 days and we have tried several operations on the cluster (rolling restarts, broker replacements, ungraceful shutdown). I'll try to setup a service using the pattern you've shared before and see whether we can get closer to understanding what's going on. |
@scanterog If I would be able to reproduce on One note, maybe set high replication factor, so it has more work with |
@zhrebicek no worries! Totally get that. what RF are you using and what kafka version? |
@scanterog |
@zhrebicek would it be possible to run a shadow service (that send all reads to dev/null) with this version and debug level enabled (same for rdkafka logs) on the environment where you're getting this issue? Sadly we aren't able to reproduce this issue so there must be something specific we're missing. @neoeinstein this also applies for the FutureProducer. If you can provide that information would be extremely useful for the project! Thanks both! |
I will see what time allows me to try. |
Thanks @zhrebicek ! appreciate that |
Same problem here, took a few hours to diagnose that this was the issue. |
@Ten0 until we have a way of reproducing the issue, we can't work on a fix. That commit does change the underlying implementation, but aside from some errors bubbling up that didn't before there shouldn't have been a change in behavior. It is also the same mechanism that the Confluent Kafka libraries use. This change has been in use and seeing a lot of traffic and we haven't observed the reported issue. Are you sure you aren't accidentally swallowing some exceptions? We need a self contained example that triggers the behavior to investigate further. |
Unfortunately that is going to be too hard to extract for us as well :( Our code looks like basically like this: let stream: rdkafka::MessageStream<_> = stream_consumer.stream();
loop {
let msg = stream.next().await?;
// do stuff with the message
} The librdkafka parameters are: session.timeout.ms = "6000"
group.id = "some_group_id"
enable.auto.commit = "true"
auto.commit.interval.ms = "1000"
enable.auto.offset.store = "false"
enable.partition.eof = "false"
statistics.interval.ms = "1000" We are handling ~300k lines/s dispatched on several servers.
Yes I am sure that we aren't accidentally dropping any error either returned by this library or printed to
From what I read in this topic there have been 3 different people encountering this precise issue (same blockage, same memory increase...) since this precise release: @zhrebicek @neoeinstein and myself. I still believe it is time to acknowledge that this issue indeed does exist and has a strong impact despite a minimal reproduction not being established yet.
|
I'm not sure it is related, but upgrading to 0.36.0 makes topic deletion hang forever, if you have uncommitted messages |
I think I was able to reproduce the issue locally. Running high-level consumer with following timing-related settings:
After calling My guess is that there's something wrong with the underlying On a side note, another issue I noticed is that since 0.36.2 the consumer starts printing verbose log messages if the |
Minimal example to reproduce the issue (can be included as-is in the crate's high-level consumer integration tests):
On v0.35.0 this completes w/o issues, while on v0.36.2 it gets stuck after the first message w/o any error. You can also try lowering the timeout of 5s, on my machine it still fails on 0.36.2 with a timeout of 1s and works with 500ms. |
Interestingly, the issue seems to be directly related to If I bump it from 500 to 5000ms, the consumer keeps working for any poll delays at or below 5000ms and fails for values above. |
I just ran into this issue; might I suggest yanking all variants of the 0.36 branch until a proper patch lands? This will prevent others from unknowingly having issues. |
I don't think that's how yank is supposed to be used and I am sure it will cause even more problems for existing users. one example is libraries that depend on 0.36.
…On Tue, Apr 9, 2024, at 01:09, Joe Wilm wrote:
I just ran into this issue; might I suggest yanking all variants of the 0.36 branch until a proper patch lands? This will prevent others from unknowingly having issues.
—
Reply to this email directly, view it on GitHub <#638 (comment)>, or unsubscribe <https://github.com/notifications/unsubscribe-auth/AAGMPRIG45ZTB6W3V4ZWIF3Y4MPS3AVCNFSM6AAAAABARRFVNSVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZDANBTG44TANBWGI>.
You are receiving this because you are subscribed to this thread.Message ID: ***@***.***>
|
Libraries that depend on 0.36 might get a warning. But people starting a new project or upgrading their dependencies stacks might introduce this new bug in their software. As said above, I've noticed similar behaviour using 0.36.2 in two separate projects that use a Feel free to tell me if you want some information about my setup (configuration, topics setup, ...). Will rollback to 0.35.0 to see if it fixes the issue. |
if i want to depend on a library that depends on 0.36, I cannot do that anymore, and some (if not most) libraries' CI does not check in their lockfiles. So I do think yanking will 100% break somebody's build when it doesn't have to. If you want to fix the onboarding experience you could re-publish 0.34 as 0.37.
…On Tue, Apr 9, 2024, at 15:20, Lopez Benjamin wrote:
> I don't think that's how yank is supposed to be used and I am sure it will cause even more problems for existing users. one example is libraries that depend on 0.36.
>
Libraries that depend on 0.36 might get a warning. But people starting a new project or upgrading their dependencies stacks might introduce this new bug in their software.
As said above, I've noticed similar behaviour using 0.36.2 in two separate projects that use a `StreamConsumer`. Getting some memory leaks after a long period of time (first leak started after 10 days of runtime).
Feel free to tell me if you want some information about my setup (configuration, topics setup, ...).
image.png (view on web) <https://github.com/fede1024/rust-rdkafka/assets/7684550/530696b7-fcb2-4c58-b1a6-b1405a1cabef>
Will rollback to 0.35.0 to see if it fixes the issue.
—
Reply to this email directly, view it on GitHub <#638 (comment)>, or unsubscribe <https://github.com/notifications/unsubscribe-auth/AAGMPRIU7EPFEODN3EJNGMDY4PTK3AVCNFSM6AAAAABARRFVNSVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZDANBVGE3DMMRRGQ>.
You are receiving this because you are subscribed to this thread.Message ID: ***@***.***>
|
I'll mention that 0.35.0 is what we consider the last-known-good version of From an ecosystem perspective, I lean toward the yank. If a library is forcing you onto 0.36, then that library is forcing you to use a version of Of note, lib.rs indicates that the following libraries are depending on the 0.36.x line of Crates that have some general usage, with prior version supporting 0.35
New crates that don't have support for 0.35 [only ever depended on 0.36]:
Crates that have earlier versions that support 0.35 [but little general usage]
Depended on only as a dev-dependency |
I think that's the point of contention. We use 0.36 just fine and the maintainers have trouble reproducing the issue. It's not universally broken. |
If yanking is considered to be too problematic, then perhaps reverting the event API work and publishing a patch version of 0.36 would be more appropriate. Several users have reported production issues with the current release, and it would be good stewardship to prevent others from dealing with that headache. |
There seems to be a minimal repro above as well now so maybe fixing it could also be an option. |
We're seeing the same problem with |
…fede1024/rust-rdkafka#638 (comment) for more details. No visible performance differences.
…fede1024/rust-rdkafka#638 (comment) for more details. No visible performance differences.
* Use topic partition queue splitting feature from rdkafka. This allows to split the main consumer queue into subqueues, such that we can spawn a subtask for each topic-partition tuple. This roughly gives us 8-10x improvement in throughput (on my machine). * Bump to rust-rdkafka 0.35, the latest release that seems to be ok. See fede1024/rust-rdkafka#638 (comment) for more details. No visible performance differences. * Add restate.kafka_ingress.requests.total metric. This is useful to graph the consumption rate. * Feedback * Plumb task center from constructor * Make sure we cancel child tasks when we're shutting down
Now that this landed, can you test against the latest |
there are many other bugfixes in master, it seems a release is warranted anyway |
Is it confirmed that #666 fixes this issue, or is that still an open question? |
It's an open question specifically since nobody confirmed here that it solves the issue. |
Perhaps a release could be cut to make it easier to test and provide feedback on the potential fix? |
I can compile some of our services against main and let them run in our acceptance env for a while, but we were unable to properly reproduce the issue on demand. It just seemed to happen every now and then. So not sure how representative that test would be... |
Just read the repro suggestion a couple of comments ago, can play with that as well I guess. |
As expected reproducing it in our stack proved to be quite difficult. But when running the integration test provided by @pi-xel I can confirm that this test fails when using version 0.36.2 and succeeds when using either 0.35.0 or master. So to the best of my knowledge I would say this issue is resolved using #666 and can be closed. Since there are more fixes in master (especially #686 which we now hit consistently), it would be great if we could get a new release 🙏🏻 |
I released |
As the title says, after upgrade from
0.34.0
to0.36.0
we started getting Kafka consumers that would stop consuming messages, start leaking memory and do not leave the consumer group.Maybe it has to do with change to use event interface, but that is just wild guess.
0.34.0
Disclaimer: Running on the
0.34.0
for half day without it reproducing and it happened many times with0.36.0
. Would report back if it still happens on0.34.0
and the origin is elsewhere.The text was updated successfully, but these errors were encountered: