graceful shutdown NPE when using kafka consumer with stork #41658
Description
Describe the bug
I have a kafka consumer calling a rest service using a stork url.
under reasonable load, the rest call fails with a NPE in the StorkClientRequestFilter
: "io.smallrye.stork.Stork.getInstance()" is null
it sounds like the shutdown of the application does not wait for in flight messages to finish processing, before shutting down parts of the application.
the integrity of data is preserved, but some messages are sent to the dead letter topic, delaying our ability to quickly process them, when there is actually another replica that could have processed them.
Expected behavior
in a ideal situation, upon shutdown, we should have:
- do not accept new workload
- let in flight activity finish (schedulers, rest endpoints, messaging consumers)
- shut down all parts of the application
- exit
Actual behavior
we see the following exception:
2024-07-03 10:37:15,144 ERROR [org.jbo.res.rea.cli.imp.StorkClientRequestFilter] (quarkus-virtual-thread-10) Error selecting service instance for serviceName: greeting-service [Error Occurred After Shutdown]: java.lang.NullPointerException: Cannot invoke "io.smallrye.stork.Stork.getService(String)" because the return value of "io.smallrye.stork.Stork.getInstance()" is null
at org.jboss.resteasy.reactive.client.impl.StorkClientRequestFilter.filter(StorkClientRequestFilter.java:38)
at org.jboss.resteasy.reactive.client.spi.ResteasyReactiveClientRequestFilter.filter(ResteasyReactiveClientRequestFilter.java:21)
at org.jboss.resteasy.reactive.client.handlers.ClientRequestFilterRestHandler.handle(ClientRequestFilterRestHandler.java:25)
at org.jboss.resteasy.reactive.client.handlers.ClientRequestFilterRestHandler.handle(ClientRequestFilterRestHandler.java:10)
at org.jboss.resteasy.reactive.common.core.AbstractResteasyReactiveContext.invokeHandler(AbstractResteasyReactiveContext.java:231)
at org.jboss.resteasy.reactive.common.core.AbstractResteasyReactiveContext.run(AbstractResteasyReactiveContext.java:147)
at org.jboss.resteasy.reactive.client.impl.AsyncInvokerImpl.performRequestInternal(AsyncInvokerImpl.java:285)
at org.jboss.resteasy.reactive.client.impl.AsyncInvokerImpl.performRequestInternal(AsyncInvokerImpl.java:275)
at org.jboss.resteasy.reactive.client.impl.AsyncInvokerImpl.method(AsyncInvokerImpl.java:215)
at org.jboss.resteasy.reactive.client.impl.InvocationBuilderImpl.method(InvocationBuilderImpl.java:313)
at org.acme.IGreetingResource$$QuarkusRestClientInterface.hello(Unknown Source)
at org.acme.IGreetingResource$$CDIWrapper.hello(Unknown Source)
at org.acme.IGreetingResource$$CDIWrapper_ClientProxy.hello(Unknown Source)
at org.acme.PriceConsumer.consume(PriceConsumer.java:29)
at org.acme.PriceConsumer_ClientProxy.consume(Unknown Source)
at org.acme.PriceConsumer_SmallRyeMessagingInvoker_consume_ff7ab742c6cdb0956d1e3afc1d8ed7fd43c75ad0.invoke(Unknown Source)
at io.smallrye.reactive.messaging.providers.AbstractMediator.lambda$invokeBlocking$15(AbstractMediator.java:190)
at io.smallrye.context.impl.wrappers.SlowContextualSupplier.get(SlowContextualSupplier.java:21)
at io.smallrye.mutiny.operators.uni.builders.UniCreateFromDeferredSupplier.subscribe(UniCreateFromDeferredSupplier.java:25)
at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:36)
at io.smallrye.mutiny.operators.uni.UniRunSubscribeOn.lambda$subscribe$0(UniRunSubscribeOn.java:27)
at io.quarkus.virtual.threads.ContextPreservingExecutorService$ContextPreservingRunnable.run(ContextPreservingExecutorService.java:45)
at java.base/java.util.concurrent.ThreadPerTaskExecutor$TaskRunner.run(ThreadPerTaskExecutor.java:314)
at java.base/java.lang.VirtualThread.run(VirtualThread.java:309)
and messages may be sent to the DLT.
How to Reproduce?
create a new application with additional deps:
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-rest-client</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-messaging-kafka</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-stork</artifactId>
</dependency>
<dependency>
<groupId>io.smallrye.stork</groupId>
<artifactId>stork-service-discovery-static-list</artifactId>
</dependency>
add properties file:
quarkus.http.port=18080
mp.messaging.incoming.prices.connector=smallrye-kafka
mp.messaging.outgoing.prices-out.connector=smallrye-kafka
mp.messaging.incoming.prices.topic=mytopic
mp.messaging.outgoing.prices-out.topic=mytopic
quarkus.rest-client.greeting.url=stork://greeting-service
#quarkus.rest-client.greeting.url=http://localhost:18080
quarkus.stork.greeting-service.service-discovery.type=static
quarkus.stork.greeting-service.service-discovery.address-list=localhost:18080
quarkus.stork.greeting-service.load-balancer.type=round-robin
a producer:
@ApplicationScoped
public class KafkaPriceProducer {
private final Random random = new Random();
@Outgoing("prices-out")
public Multi<Double> generate() {
// Build an infinite stream of random prices
// It emits a price every second
return Multi.createFrom().ticks().every(Duration.ofMillis(100))
.map(x -> random.nextDouble());
}
}
a consumer:
@ApplicationScoped
public class PriceConsumer {
Logger log = LoggerFactory.getLogger(PriceConsumer.class);
@Inject
@RestClient
IGreetingResource greetingResource;
@Incoming("prices")
@RunOnVirtualThread
@Blocking(ordered = false)
public void consume(double price) throws InterruptedException {
log.info("receive " + price);
Thread.sleep(1000);
log.info("greeting => " + greetingResource.hello());
}
}
start the application with quarkus:dev
after startup, press s
you should see multiple exceptions.
Output of uname -a
or ver
No response
Output of java -version
No response
Quarkus version or git rev
3.12
Build tool (ie. output of mvnw --version
or gradlew --version
)
No response
Additional information
No response
Metadata
Assignees
Labels
Type
Projects
Status
Out of scope