Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PulsarListener does not recover after it has startup failure #816

Closed
eljefe6a opened this issue Aug 23, 2024 · 16 comments
Closed

PulsarListener does not recover after it has startup failure #816

eljefe6a opened this issue Aug 23, 2024 · 16 comments
Labels
type: feature A new feature or enhacement
Milestone

Comments

@eljefe6a
Copy link

I'm hitting an issue where the consumption randomly stops working. I haven't seen any pattern to it. I'm using spring-pulsar 1.1.0.

I've spent the past 4 hours trying to figure out why data wasn't being saved to the database. It turned out that the spring-pulsar consumers weren't receiving data. I added more logging to the @PulsarListener annotated method to log out the object after the first line of code. It still didn't log out. I added more logging to the @PulsarListener annotated method, and it started working. I restarted the container (with no code changes), and it stopped logging. I restarted again with no code changes, and it started logging again.

The methods are consuming an Avro topic. The data is being produced, and I verified with the pulsar-admin that new messages are arriving. There are five different methods in that class that are @PulsarListener listener annotated. With this bug, some methods will receive messages and others won't. No exceptions are being logged, except some initial exclusive consumer exceptions while the other container is being terminated that stop being logged after startup. The methods are using both topicPattern and topics. The process is running in a K8s container.

Is there any known limitation or bug that I'm hitting? It makes no sense that the methods stop receiving data with no code changes.

@onobc
Copy link
Collaborator

onobc commented Aug 23, 2024

Hi @eljefe6a,

There are five different methods in that class that are @PulsarListener listener annotated

The 5 listeners consuming from the same topic, are you wanting each listener (method) to consume every message or have them shared across the listeners?

It sounds like all methods are using the same subscription in an exclusive mode - which won't work and could explain the random behavior.

Are you specifying subscriptionType and/or subscriptionName on the @PulsarListener annotation? If not specified, the default type is exclusive.

Anytime I start mentioning subscriptions, I always do myself a favor and look at the nice diagram from the Pulsar docs that details the flexible subscription model.

Can you share the 5 methods (just signatures and listener annotations)?

Thanks

@eljefe6a
Copy link
Author

They are consuming five different topics. Each one is using a different subscriber name.

Here two examples:

@PulsarListener(subscriptionName = "${spring.pulsar.client.configurations.subscription-prefix}-OrderInfoWebserviceListener-sub", topicPattern = "${spring.pulsar.client.configurations.league-topic-pattern}", schemaType = SchemaType.AVRO)
@PulsarListener(subscriptionName = "${spring.pulsar.client.configurations.subscription-prefix}-GameInfoWebserviceListener-sub", topics = {
            "${spring.pulsar.client.configurations.game-info-topic}" }, schemaType = SchemaType.AVRO)

@onobc
Copy link
Collaborator

onobc commented Aug 24, 2024

The methods are consuming an Avro topic

I took this to mean they were on the same topic. I am glad that they are not on the same topic though as that would be a strange use of the listener :)

Your listeners above look fine. I can't reproduce this locally. Can you do the following:

  • pare down the # of listeners. start w/ 1 and see if it works. add 2nd and see if it works... and find where it breaks?
  • provide client and server logs around the time this is happening
  • if nothing comes out of that, provide a simple app that reproduces the behavior

I wish I had a quick fix/answer for you but I can't reproduce it.

Thanks

@eljefe6a
Copy link
Author

I think I've narrowed it down. It looks like the PulsarListener doesn't work correctly after recovering from exclusive consumer exceptions like:

{"TimeStamp":"2024-08-24T08:13:31.2454382Z","Log":"org.springframework.pulsar.PulsarException: {"errorMsg":"Exclusive consumer is already connected","reqId":2154963840234886721, "remote":"dns/ip", "local":"/100.100.193.80:44716"}"}
{"TimeStamp":"2024-08-24T08:13:31.2454803Z","Log":"\tat org.springframework.pulsar.core.DefaultPulsarConsumerFactory.createConsumer(DefaultPulsarConsumerFactory.java:109) ~[spring-pulsar-1.1.0.jar!/:1.1.0]"}
{"TimeStamp":"2024-08-24T08:13:31.245514Z","Log":"\tat org.springframework.pulsar.listener.DefaultPulsarMessageListenerContainer$Listener.\u003Cinit\u003E(DefaultPulsarMessageListenerContainer.java:303) ~[spring-pulsar-1.1.0.jar!/:1.1.0]"}
{"TimeStamp":"2024-08-24T08:13:31.2455478Z","Log":"\tat org.springframework.pulsar.listener.DefaultPulsarMessageListenerContainer.doStart(DefaultPulsarMessageListenerContainer.java:128) ~[spring-pulsar-1.1.0.jar!/:1.1.0]"}
{"TimeStamp":"2024-08-24T08:13:31.2455844Z","Log":"\tat org.springframework.pulsar.listener.AbstractPulsarMessageListenerContainer.start(AbstractPulsarMessageListenerContainer.java:103) ~[spring-pulsar-1.1.0.jar!/:1.1.0]"}
{"TimeStamp":"2024-08-24T08:13:31.2456175Z","Log":"\tat org.springframework.pulsar.listener.ConcurrentPulsarMessageListenerContainer.doStart(ConcurrentPulsarMessageListenerContainer.java:100) ~[spring-pulsar-1.1.0.jar!/:1.1.0]"}
{"TimeStamp":"2024-08-24T08:13:31.2456512Z","Log":"\tat org.springframework.pulsar.listener.AbstractPulsarMessageListenerContainer.start(AbstractPulsarMessageListenerContainer.java:103) ~[spring-pulsar-1.1.0.jar!/:1.1.0]"}
{"TimeStamp":"2024-08-24T08:13:31.2456837Z","Log":"\tat org.springframework.pulsar.config.GenericListenerEndpointRegistry.startIfNecessary(GenericListenerEndpointRegistry.java:234) ~[spring-pulsar-1.1.0.jar!/:1.1.0]"}
{"TimeStamp":"2024-08-24T08:13:31.2457204Z","Log":"\tat org.springframework.pulsar.config.GenericListenerEndpointRegistry.start(GenericListenerEndpointRegistry.java:186) ~[spring-pulsar-1.1.0.jar!/:1.1.0]"}
{"TimeStamp":"2024-08-24T08:13:31.2457527Z","Log":"\tat org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:288) ~[spring-context-6.1.8.jar!/:6.1.8]"}
{"TimeStamp":"2024-08-24T08:13:31.2457847Z","Log":"\tat org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:471) ~[spring-context-6.1.8.jar!/:6.1.8]"}
{"TimeStamp":"2024-08-24T08:13:31.2458178Z","Log":"\tat java.base/java.lang.Iterable.forEach(Iterable.java:75) ~[na:na]"}
{"TimeStamp":"2024-08-24T08:13:31.2458543Z","Log":"\tat org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:260) ~[spring-context-6.1.8.jar!/:6.1.8]"}
{"TimeStamp":"2024-08-24T08:13:31.2458873Z","Log":"\tat org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:205) ~[spring-context-6.1.8.jar!/:6.1.8]"}
{"TimeStamp":"2024-08-24T08:13:31.2459198Z","Log":"\tat org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:981) ~[spring-context-6.1.8.jar!/:6.1.8]"}
{"TimeStamp":"2024-08-24T08:13:31.2459553Z","Log":"\tat org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:627) ~[spring-context-6.1.8.jar!/:6.1.8]"}
{"TimeStamp":"2024-08-24T08:13:31.2460066Z","Log":"\tat org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:146) ~[spring-boot-3.3.0.jar!/:3.3.0]"}
{"TimeStamp":"2024-08-24T08:13:31.2460501Z","Log":"\tat org.springframework.boot.SpringApplication.refresh(SpringApplication.java:754) ~[spring-boot-3.3.0.jar!/:3.3.0]"}
{"TimeStamp":"2024-08-24T08:13:31.2461002Z","Log":"\tat org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:456) ~[spring-boot-3.3.0.jar!/:3.3.0]"}
{"TimeStamp":"2024-08-24T08:13:31.2461517Z","Log":"\tat org.springframework.boot.SpringApplication.run(SpringApplication.java:335) ~[spring-boot-3.3.0.jar!/:3.3.0]"}
{"TimeStamp":"2024-08-24T08:13:31.2461936Z","Log":"\tat org.springframework.boot.SpringApplication.run(SpringApplication.java:1363) ~[spring-boot-3.3.0.jar!/:3.3.0]"}
{"TimeStamp":"2024-08-24T08:13:31.2462494Z","Log":"\tat org.springframework.boot.SpringApplication.run(SpringApplication.java:1352) ~[spring-boot-3.3.0.jar!/:3.3.0]"}
{"TimeStamp":"2024-08-24T08:13:31.2463064Z","Log":"\tat ai.alphabet.webservices.WebservicesApplication.main(WebservicesApplication.java:78) ~[!/:0.2.0]"}
{"TimeStamp":"2024-08-24T08:13:31.2463462Z","Log":"\tat java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]"}
{"TimeStamp":"2024-08-24T08:13:31.2464029Z","Log":"\tat java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[na:na]"}
{"TimeStamp":"2024-08-24T08:13:31.2464501Z","Log":"\tat java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]"}
{"TimeStamp":"2024-08-24T08:13:31.2464996Z","Log":"\tat java.base/java.lang.reflect.Method.invoke(Method.java:568) ~[na:na]"}
{"TimeStamp":"2024-08-24T08:13:31.2465381Z","Log":"\tat org.springframework.boot.loader.launch.Launcher.launch(Launcher.java:91) ~[webservices.jar:0.2.0]"}
{"TimeStamp":"2024-08-24T08:13:31.2465771Z","Log":"\tat org.springframework.boot.loader.launch.Launcher.launch(Launcher.java:53) ~[webservices.jar:0.2.0]"}
{"TimeStamp":"2024-08-24T08:13:31.2466101Z","Log":"\tat org.springframework.boot.loader.launch.JarLauncher.main(JarLauncher.java:58) ~[webservices.jar:0.2.0]"}
{"TimeStamp":"2024-08-24T08:13:31.246644Z","Log":"Caused by: org.apache.pulsar.client.api.PulsarClientException$ConsumerBusyException: {"errorMsg":"Exclusive consumer is already connected","reqId":2154963840234886721, "remote":"pb1-pc-96dccc53.aws-use2-production-snci-pool-kid.streamnative.aws.snio.cloud/18.223.234.31:6651", "local":"/100.100.193.80:44716"}"}
{"TimeStamp":"2024-08-24T08:13:31.2466958Z","Log":"\tat org.apache.pulsar.client.api.PulsarClientException.unwrap(PulsarClientException.java:1093) ~[pulsar-client-api-3.2.3.jar!/:3.2.3]"}
{"TimeStamp":"2024-08-24T08:13:31.2467498Z","Log":"\tat org.apache.pulsar.client.impl.ConsumerBuilderImpl.subscribe(ConsumerBuilderImpl.java:103) ~[pulsar-client-all-3.2.3.jar!/:3.2.3]"}
{"TimeStamp":"2024-08-24T08:13:31.2467902Z","Log":"\tat org.springframework.pulsar.core.DefaultPulsarConsumerFactory.createConsumer(DefaultPulsarConsumerFactory.java:106) ~[spring-pulsar-1.1.0.jar!/:1.1.0]"}

I've tested this a few times. When the exclusive consumer exception doesn't happen, the consumption starts and works fine. When the exception happens, the consumption doesn't start.

@eljefe6a
Copy link
Author

I've run it several more times, and I've confirmed the issue. The Exclusive consumer is already connected seems to prevent the consumer from receiving messages. The SpringBoot process is running K8S.

The container restart process guarantees that two of the same processes are running. One will provision and start while the other waits and will then deprovision.

If I manually stop the container, wait, and then start, the process won't receive an exclusive consumer error and will always receive the messages.

@onobc onobc changed the title Randomly Stops Consuming PulsarListener does not recover after it has startup failure Aug 24, 2024
@onobc
Copy link
Collaborator

onobc commented Aug 24, 2024

Nice sleuthing @eljefe6a !

Yeh, the listeners are currently not very robust during startup in the sense that if they fail on startup they do not retry. I have notated your use case in the above feature request and we will be sure your use case is included in the solve. We now have 2 users asking for this so we will look into bumping the priority.

Thanks for the detailed analysis - super helpful.

@onobc
Copy link
Collaborator

onobc commented Aug 24, 2024

I am going to close this now as a duplicate of #445.

@onobc onobc closed this as completed Aug 24, 2024
@onobc onobc reopened this Aug 25, 2024
@onobc
Copy link
Collaborator

onobc commented Aug 25, 2024

After some thought, we have re-opened this issue to focus on "Connection Retries" described in here.

@onobc onobc added the type: feature A new feature or enhacement label Aug 25, 2024
@onobc onobc added this to the 1.2.0-M2 milestone Aug 25, 2024
@eljefe6a
Copy link
Author

Thanks for picking this up. If you go the retry route without some kind of notify/exit on failure, that won't fully solve the problem. Is there a way for it to continue retrying in the background in a while loop so that there isn't a state where the consumer doesn't connect again?

@onobc
Copy link
Collaborator

onobc commented Aug 26, 2024

Is there a way for it to continue retrying in the background in a while loop so that there isn't a state where the consumer doesn't connect again?

Yep, planning on using spring-retry but in another thread. So first attempt will be synchronous and if it fails then we will async retry every N units (configurable).

@onobc
Copy link
Collaborator

onobc commented Aug 26, 2024

@eljefe6a I also thought of a possible workaround for now. You could append an id to the subscription name which would in effect make the 2 processes use a different subscription. Below is example of using random number (which is not ideal).

@PulsarListener(topics = "my-topic", subscriptionName = "my-sub-#{T(java.lang.Math).random()}",
  1. Do you have an id for the process you could use for this?
  2. Both processes would be processing the same data and this may be a problem for your scenario?

@eljefe6a
Copy link
Author

The workaround I'm doing now is operational. I'm waiting for the other container to stop before starting the new one.

onobc added a commit to onobc/spring-pulsar that referenced this issue Sep 1, 2024
Previously, when a listener container failed to start, it
would only log the exception. This commit introduces
`StartupFailurePolicy` that allows listener containers to CONTINUE,
STOP, RETRY when an error is encountered on startup.

See spring-projects#445
See spring-projects#816
onobc added a commit to onobc/spring-pulsar that referenced this issue Sep 2, 2024
Previously, when a listener container failed to start, it
would only log the exception. This commit introduces
`StartupFailurePolicy` that allows listener containers to CONTINUE,
STOP, RETRY when an error is encountered on startup.

See spring-projects#445
See spring-projects#816
onobc added a commit to onobc/spring-pulsar that referenced this issue Sep 2, 2024
Previously, when a listener container failed to start, it
would only log the exception. This commit introduces
`StartupFailurePolicy` that allows listener containers to CONTINUE,
STOP, RETRY when an error is encountered on startup.

See spring-projects#445
See spring-projects#816
onobc added a commit to onobc/spring-pulsar that referenced this issue Sep 3, 2024
Previously, when a listener container failed to start, it
would only log the exception. This commit introduces
`StartupFailurePolicy` that allows listener containers to CONTINUE,
STOP, RETRY when an error is encountered on startup.

See spring-projects#445
See spring-projects#816
onobc added a commit to onobc/spring-pulsar that referenced this issue Sep 4, 2024
Previously, when a listener container failed to start, it
would only log the exception. This commit introduces
`StartupFailurePolicy` that allows listener containers to CONTINUE,
STOP, RETRY when an error is encountered on startup.

See spring-projects#445
See spring-projects#816
onobc added a commit to onobc/spring-pulsar that referenced this issue Sep 4, 2024
Previously, when a listener container failed to start, it
would only log the exception. This commit introduces
`StartupFailurePolicy` that allows listener containers to CONTINUE,
STOP, RETRY when an error is encountered on startup.

See spring-projects#445
See spring-projects#816
onobc added a commit to onobc/spring-pulsar that referenced this issue Sep 4, 2024
Previously, when a listener container failed to start, it
would only log the exception. This commit introduces
`StartupFailurePolicy` that allows listener containers to CONTINUE,
STOP, RETRY when an error is encountered on startup.

See spring-projects#445
See spring-projects#816
onobc added a commit to onobc/spring-pulsar that referenced this issue Sep 5, 2024
Previously, when a listener container failed to start, it
would only log the exception. This commit introduces
`StartupFailurePolicy` that allows listener containers to CONTINUE,
STOP, RETRY when an error is encountered on startup.

See spring-projects#445
See spring-projects#816
onobc added a commit to onobc/spring-pulsar that referenced this issue Sep 5, 2024
Previously, when a listener container failed to start, it
would only log the exception. This commit introduces
`StartupFailurePolicy` that allows listener containers to CONTINUE,
STOP, RETRY when an error is encountered on startup.

See spring-projects#445
See spring-projects#816
onobc added a commit that referenced this issue Sep 6, 2024
Previously, when a listener container failed to start, it
would only log the exception. This commit introduces
`StartupFailurePolicy` that allows listener containers to CONTINUE,
STOP, RETRY when an error is encountered on startup.

See #445
See #816
@onobc
Copy link
Collaborator

onobc commented Sep 6, 2024

Hi @eljefe6a , the ability to configure startup failure policy has just been merged. The follow up is to add Spring Boot config props to tune the policy. By default, it is set to fail fast.

When set to RETRY the default is 3 times w/ a fixed 10sec delay between attempts. Likely in your case you will want to specify a custom retry template bean to account for the time it takes your pods to come online etc..

We will be following up w/ some docs on how to configure this w/ and w/o Spring Boot.

@eljefe6a
Copy link
Author

eljefe6a commented Sep 6, 2024

Awesome. Thanks for making this change so quickly.

For posterity and those who encounter this issue later, does this restart issue affect consumers that have successfully been created on startup but hit a connectivity issue later in the process' lifecycle?

@onobc
Copy link
Collaborator

onobc commented Sep 6, 2024

Awesome. Thanks for making this change so quickly.

You are more than welcome.

For posterity and those who encounter this issue later, does this restart issue affect consumers that have successfully been created on startup but hit a connectivity issue later in the process' lifecycle?

It does not handle post-startup failures. Once started, the listener thread sits in a loop and attempts a batchReceive on the consumer. Under some failure conditions the Pulsar client seems to be able to succeed on a subsequent receives - but it is not guaranteed.

That would be another resiliency feature targeted post-startup.

@onobc
Copy link
Collaborator

onobc commented Sep 12, 2024

Closing with the docs commit 8ed6689

@onobc onobc closed this as completed Sep 12, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type: feature A new feature or enhacement
Projects
None yet
Development

No branches or pull requests

2 participants