diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/OrderedConsumerVerticle.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/OrderedConsumerVerticle.java index f3d2be1682..edd11e3eff 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/OrderedConsumerVerticle.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/OrderedConsumerVerticle.java @@ -139,14 +139,17 @@ private void poll() { if (this.isPollInFlight.compareAndSet(false, true)) { logger.debug("Polling for records {}", getConsumerVerticleContext().getLoggingKeyValue()); - this.consumer.poll(POLLING_TIMEOUT).onSuccess(records -> vertx.runOnContext(v -> this.recordsHandler(records))).onFailure(t -> { - if (this.closed.get()) { - // The failure might have been caused by stopping the consumer, so we just ignore it - return; - } - isPollInFlight.set(false); - exceptionHandler(t); - }); + this.consumer + .poll(POLLING_TIMEOUT) + .onSuccess(records -> vertx.runOnContext(v -> this.recordsHandler(records))) + .onFailure(t -> { + if (this.closed.get()) { + // The failure might have been caused by stopping the consumer, so we just ignore it + return; + } + isPollInFlight.set(false); + exceptionHandler(t); + }); } } diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/UnorderedConsumerVerticle.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/UnorderedConsumerVerticle.java index 4ec682d740..1b43eb79ce 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/UnorderedConsumerVerticle.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/UnorderedConsumerVerticle.java @@ -97,15 +97,18 @@ private synchronized void poll() { return; } if (this.isPollInFlight.compareAndSet(false, true)) { - this.consumer.poll(POLL_TIMEOUT).onSuccess(records -> vertx.runOnContext(v -> this.handleRecords(records))).onFailure(cause -> { - isPollInFlight.set(false); - logger.error( - "Failed to poll messages {}", - getConsumerVerticleContext().getLoggingKeyValue(), - cause); - // Wait before retrying. - vertx.setTimer(BACKOFF_DELAY_MS, t -> poll()); - }); + this.consumer + .poll(POLL_TIMEOUT) + .onSuccess(records -> vertx.runOnContext(v -> this.handleRecords(records))) + .onFailure(cause -> { + isPollInFlight.set(false); + logger.error( + "Failed to poll messages {}", + getConsumerVerticleContext().getLoggingKeyValue(), + cause); + // Wait before retrying. + vertx.setTimer(BACKOFF_DELAY_MS, t -> poll()); + }); } }