Skip to content

graceful shutdown NPE when using kafka consumer with stork #41658

Open
@vsevel

Description

Describe the bug

I have a kafka consumer calling a rest service using a stork url.
under reasonable load, the rest call fails with a NPE in the StorkClientRequestFilter: "io.smallrye.stork.Stork.getInstance()" is null
it sounds like the shutdown of the application does not wait for in flight messages to finish processing, before shutting down parts of the application.
the integrity of data is preserved, but some messages are sent to the dead letter topic, delaying our ability to quickly process them, when there is actually another replica that could have processed them.

Expected behavior

in a ideal situation, upon shutdown, we should have:

  • do not accept new workload
  • let in flight activity finish (schedulers, rest endpoints, messaging consumers)
  • shut down all parts of the application
  • exit

Actual behavior

we see the following exception:

2024-07-03 10:37:15,144 ERROR [org.jbo.res.rea.cli.imp.StorkClientRequestFilter] (quarkus-virtual-thread-10) Error selecting service instance for serviceName: greeting-service [Error Occurred After Shutdown]: java.lang.NullPointerException: Cannot invoke "io.smallrye.stork.Stork.getService(String)" because the return value of "io.smallrye.stork.Stork.getInstance()" is null
        at org.jboss.resteasy.reactive.client.impl.StorkClientRequestFilter.filter(StorkClientRequestFilter.java:38)
        at org.jboss.resteasy.reactive.client.spi.ResteasyReactiveClientRequestFilter.filter(ResteasyReactiveClientRequestFilter.java:21)
        at org.jboss.resteasy.reactive.client.handlers.ClientRequestFilterRestHandler.handle(ClientRequestFilterRestHandler.java:25)
        at org.jboss.resteasy.reactive.client.handlers.ClientRequestFilterRestHandler.handle(ClientRequestFilterRestHandler.java:10)
        at org.jboss.resteasy.reactive.common.core.AbstractResteasyReactiveContext.invokeHandler(AbstractResteasyReactiveContext.java:231)
        at org.jboss.resteasy.reactive.common.core.AbstractResteasyReactiveContext.run(AbstractResteasyReactiveContext.java:147)
        at org.jboss.resteasy.reactive.client.impl.AsyncInvokerImpl.performRequestInternal(AsyncInvokerImpl.java:285)
        at org.jboss.resteasy.reactive.client.impl.AsyncInvokerImpl.performRequestInternal(AsyncInvokerImpl.java:275)
        at org.jboss.resteasy.reactive.client.impl.AsyncInvokerImpl.method(AsyncInvokerImpl.java:215)
        at org.jboss.resteasy.reactive.client.impl.InvocationBuilderImpl.method(InvocationBuilderImpl.java:313)
        at org.acme.IGreetingResource$$QuarkusRestClientInterface.hello(Unknown Source)
        at org.acme.IGreetingResource$$CDIWrapper.hello(Unknown Source)
        at org.acme.IGreetingResource$$CDIWrapper_ClientProxy.hello(Unknown Source)
        at org.acme.PriceConsumer.consume(PriceConsumer.java:29)
        at org.acme.PriceConsumer_ClientProxy.consume(Unknown Source)
        at org.acme.PriceConsumer_SmallRyeMessagingInvoker_consume_ff7ab742c6cdb0956d1e3afc1d8ed7fd43c75ad0.invoke(Unknown Source)
        at io.smallrye.reactive.messaging.providers.AbstractMediator.lambda$invokeBlocking$15(AbstractMediator.java:190)
        at io.smallrye.context.impl.wrappers.SlowContextualSupplier.get(SlowContextualSupplier.java:21)
        at io.smallrye.mutiny.operators.uni.builders.UniCreateFromDeferredSupplier.subscribe(UniCreateFromDeferredSupplier.java:25)
        at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:36)
        at io.smallrye.mutiny.operators.uni.UniRunSubscribeOn.lambda$subscribe$0(UniRunSubscribeOn.java:27)
        at io.quarkus.virtual.threads.ContextPreservingExecutorService$ContextPreservingRunnable.run(ContextPreservingExecutorService.java:45)
        at java.base/java.util.concurrent.ThreadPerTaskExecutor$TaskRunner.run(ThreadPerTaskExecutor.java:314)
        at java.base/java.lang.VirtualThread.run(VirtualThread.java:309)

and messages may be sent to the DLT.

How to Reproduce?

create a new application with additional deps:

    <dependency>
      <groupId>io.quarkus</groupId>
      <artifactId>quarkus-rest-client</artifactId>
    </dependency>
    <dependency>
      <groupId>io.quarkus</groupId>
      <artifactId>quarkus-messaging-kafka</artifactId>
    </dependency>
    <dependency>
      <groupId>io.quarkus</groupId>
      <artifactId>quarkus-smallrye-stork</artifactId>
    </dependency>
    <dependency>
      <groupId>io.smallrye.stork</groupId>
      <artifactId>stork-service-discovery-static-list</artifactId>
    </dependency>

add properties file:

quarkus.http.port=18080

mp.messaging.incoming.prices.connector=smallrye-kafka
mp.messaging.outgoing.prices-out.connector=smallrye-kafka

mp.messaging.incoming.prices.topic=mytopic
mp.messaging.outgoing.prices-out.topic=mytopic

quarkus.rest-client.greeting.url=stork://greeting-service
#quarkus.rest-client.greeting.url=http://localhost:18080

quarkus.stork.greeting-service.service-discovery.type=static
quarkus.stork.greeting-service.service-discovery.address-list=localhost:18080
quarkus.stork.greeting-service.load-balancer.type=round-robin

a producer:

@ApplicationScoped
public class KafkaPriceProducer {

    private final Random random = new Random();

    @Outgoing("prices-out")
    public Multi<Double> generate() {
        // Build an infinite stream of random prices
        // It emits a price every second
        return Multi.createFrom().ticks().every(Duration.ofMillis(100))
                .map(x -> random.nextDouble());
    }
}

a consumer:

@ApplicationScoped
public class PriceConsumer {

    Logger log = LoggerFactory.getLogger(PriceConsumer.class);

    @Inject
    @RestClient
    IGreetingResource greetingResource;

    @Incoming("prices")
    @RunOnVirtualThread
    @Blocking(ordered = false)
    public void consume(double price) throws InterruptedException {
        log.info("receive " + price);
        Thread.sleep(1000);
        log.info("greeting => " + greetingResource.hello());
    }

}

start the application with quarkus:dev
after startup, press s
you should see multiple exceptions.

Output of uname -a or ver

No response

Output of java -version

No response

Quarkus version or git rev

3.12

Build tool (ie. output of mvnw --version or gradlew --version)

No response

Additional information

No response

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    • Status

      Out of scope

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions