From 9b79fe6611b8c9dd4f3d9c2323d4f547829e3270 Mon Sep 17 00:00:00 2001 From: Ivan Daschinskiy Date: Thu, 13 Jun 2024 11:48:54 +0300 Subject: [PATCH] IGNITE-19910 Add consumer poll timeout to kafka-cdc (#276) --- .../AbstractKafkaToIgniteCdcStreamer.java | 2 + .../KafkaToIgniteCdcStreamerApplier.java | 8 +++- ...KafkaToIgniteCdcStreamerConfiguration.java | 22 +++++++++ .../kafka/KafkaToIgniteMetadataUpdater.java | 6 ++- .../cdc/kafka/KafkaToIgniteLoaderTest.java | 38 +++++++++++++++ .../KafkaToIgniteMetadataUpdaterTest.java | 2 + .../loader/kafka-to-ignite-correct.xml | 5 ++ .../kafka-to-ignite-invalid-poll-timeout.xml | 48 +++++++++++++++++++ ...afka-to-ignite-invalid-request-timeout.xml | 48 +++++++++++++++++++ .../thin/kafka-to-ignite-client-correct.xml | 5 ++ ...-to-ignite-client-invalid-poll-timeout.xml | 43 +++++++++++++++++ ...-ignite-client-invalid-request-timeout.xml | 43 +++++++++++++++++ 12 files changed, 268 insertions(+), 2 deletions(-) create mode 100644 modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-invalid-poll-timeout.xml create mode 100644 modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-invalid-request-timeout.xml create mode 100644 modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-invalid-poll-timeout.xml create mode 100644 modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-invalid-request-timeout.xml diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/AbstractKafkaToIgniteCdcStreamer.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/AbstractKafkaToIgniteCdcStreamer.java index 2df4d204a..5e12cc43b 100644 --- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/AbstractKafkaToIgniteCdcStreamer.java +++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/AbstractKafkaToIgniteCdcStreamer.java @@ -96,6 +96,7 @@ public AbstractKafkaToIgniteCdcStreamer(Properties kafkaProps, KafkaToIgniteCdcS streamerCfg.getKafkaPartsTo() > streamerCfg.getKafkaPartsFrom(), "The Kafka partitions upper bound must be greater than lower bound."); A.ensure(streamerCfg.getKafkaRequestTimeout() >= 0, "The Kafka request timeout cannot be negative."); + A.ensure(streamerCfg.getKafkaConsumerPollTimeout() >= 0, "The Kafka consumer poll timeout cannot be negative."); A.ensure(streamerCfg.getThreadCount() > 0, "Threads count value must me greater than zero."); A.ensure( streamerCfg.getKafkaPartsTo() - streamerCfg.getKafkaPartsFrom() >= streamerCfg.getThreadCount(), @@ -175,6 +176,7 @@ protected void runAppliers() { caches, streamerCfg.getMaxBatchSize(), streamerCfg.getKafkaRequestTimeout(), + streamerCfg.getKafkaConsumerPollTimeout(), metaUpdr, stopped ); diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerApplier.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerApplier.java index ceb688e94..d113bba73 100644 --- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerApplier.java +++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerApplier.java @@ -107,6 +107,9 @@ class KafkaToIgniteCdcStreamerApplier implements Runnable, AutoCloseable { /** The maximum time to complete Kafka related requests, in milliseconds. */ private final long kafkaReqTimeout; + /** Consumer poll timeout in milliseconds. */ + private final long consumerPollTimeout; + /** Metadata updater. */ private final KafkaToIgniteMetadataUpdater metaUpdr; @@ -132,6 +135,7 @@ class KafkaToIgniteCdcStreamerApplier implements Runnable, AutoCloseable { * @param caches Cache ids. * @param maxBatchSize Maximum batch size. * @param kafkaReqTimeout The maximum time to complete Kafka related requests, in milliseconds. + * @param consumerPollTimeout Consumer poll timeout in milliseconds. * @param metaUpdr Metadata updater. * @param stopped Stopped flag. */ @@ -145,6 +149,7 @@ public KafkaToIgniteCdcStreamerApplier( Set caches, int maxBatchSize, long kafkaReqTimeout, + long consumerPollTimeout, KafkaToIgniteMetadataUpdater metaUpdr, AtomicBoolean stopped ) { @@ -155,6 +160,7 @@ public KafkaToIgniteCdcStreamerApplier( this.kafkaPartTo = kafkaPartTo; this.caches = caches; this.kafkaReqTimeout = kafkaReqTimeout; + this.consumerPollTimeout = consumerPollTimeout; this.metaUpdr = metaUpdr; this.stopped = stopped; this.log = log.getLogger(KafkaToIgniteCdcStreamerApplier.class); @@ -213,7 +219,7 @@ public KafkaToIgniteCdcStreamerApplier( * @param cnsmr Data consumer. */ private void poll(KafkaConsumer cnsmr) throws IgniteCheckedException { - ConsumerRecords recs = cnsmr.poll(Duration.ofMillis(kafkaReqTimeout)); + ConsumerRecords recs = cnsmr.poll(Duration.ofMillis(consumerPollTimeout)); if (log.isInfoEnabled()) { log.info( diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerConfiguration.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerConfiguration.java index 24675d437..a2e6e103d 100644 --- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerConfiguration.java +++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerConfiguration.java @@ -33,6 +33,9 @@ public class KafkaToIgniteCdcStreamerConfiguration { /** Default maximum time to complete Kafka related requests, in milliseconds. */ public static final long DFLT_KAFKA_REQ_TIMEOUT = 3_000L; + /** Default kafka consumer poll timeout. */ + public static final long DFLT_KAFKA_CONSUMER_POLL_TIMEOUT = 3_000L; + /** Default {@link #threadCnt} value. */ public static final int DFLT_THREAD_CNT = 16; @@ -57,6 +60,9 @@ public class KafkaToIgniteCdcStreamerConfiguration { /** The maximum time to complete Kafka related requests, in milliseconds. */ private long kafkaReqTimeout = DFLT_KAFKA_REQ_TIMEOUT; + /** Timeout of kafka consumer poll */ + private long kafkaConsumerPollTimeout = DFLT_KAFKA_CONSUMER_POLL_TIMEOUT; + /** Metadata consumer group. */ private String metadataCnsmrGrp; @@ -171,6 +177,22 @@ public void setKafkaRequestTimeout(long kafkaReqTimeout) { this.kafkaReqTimeout = kafkaReqTimeout; } + /** + * @return The kafka consumer poll timeout in milliseconds. + */ + public long getKafkaConsumerPollTimeout() { + return kafkaConsumerPollTimeout; + } + + /** + * Sets the kafka consumer poll timeout in milliseconds. + * + * @param timeout Timeout value. + */ + public void setKafkaConsumerPollTimeout(long timeout) { + this.kafkaConsumerPollTimeout = timeout; + } + /** * @return Metadata topic name. */ diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteMetadataUpdater.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteMetadataUpdater.java index 97b5bfedf..236dc2f71 100644 --- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteMetadataUpdater.java +++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteMetadataUpdater.java @@ -61,6 +61,9 @@ public class KafkaToIgniteMetadataUpdater implements AutoCloseable, OffsetCommit /** The maximum time to complete Kafka related requests, in milliseconds. */ private final long kafkaReqTimeout; + /** Consumer poll timeout. */ + private final long consumerPollTimeout; + /** */ private final KafkaConsumer cnsmr; @@ -90,6 +93,7 @@ public KafkaToIgniteMetadataUpdater( ) { this.ctx = ctx; this.kafkaReqTimeout = streamerCfg.getKafkaRequestTimeout(); + this.consumerPollTimeout = streamerCfg.getKafkaConsumerPollTimeout(); this.log = log.getLogger(KafkaToIgniteMetadataUpdater.class); Properties kafkaProps = new Properties(); @@ -141,7 +145,7 @@ public synchronized void updateMetadata() { } while (true) { - ConsumerRecords recs = cnsmr.poll(Duration.ofMillis(kafkaReqTimeout)); + ConsumerRecords recs = cnsmr.poll(Duration.ofMillis(consumerPollTimeout)); if (recs.count() == 0) { if (log.isDebugEnabled()) diff --git a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/KafkaToIgniteLoaderTest.java b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/KafkaToIgniteLoaderTest.java index 6938448bf..918c0f8a4 100644 --- a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/KafkaToIgniteLoaderTest.java +++ b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/KafkaToIgniteLoaderTest.java @@ -18,6 +18,8 @@ package org.apache.ignite.cdc.kafka; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Stream; + import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.junit.Test; @@ -27,6 +29,12 @@ /** Tests load {@link KafkaToIgniteCdcStreamer} from Srping xml file. */ public class KafkaToIgniteLoaderTest extends GridCommonAbstractTest { + /** Constant to reference from xml config. */ + public static final int TEST_KAFKA_CONSUMER_POLL_TIMEOUT = 2000; + + /** Constant to reference from xml config. */ + public static final int TEST_KAFKA_REQUEST_TIMEOUT = 6000; + /** */ @Test public void testLoadConfig() throws Exception { @@ -85,6 +93,36 @@ public void testLoadIgniteClientConfig() throws Exception { assertNotNull(streamer); } + /** Tests setting timeout properties of kafka to ignite loaders. */ + @Test + public void testLoadTimeoutProperties() throws Exception { + Stream.of( + new String[] { + "loader/thin/kafka-to-ignite-client-invalid-poll-timeout.xml", + "Ouch! Argument is invalid: The Kafka consumer poll timeout cannot be negative."}, + new String[] { + "loader/thin/kafka-to-ignite-client-invalid-request-timeout.xml", + "Ouch! Argument is invalid: The Kafka request timeout cannot be negative." + }, + new String[] { + "loader/kafka-to-ignite-invalid-poll-timeout.xml", + "Ouch! Argument is invalid: The Kafka consumer poll timeout cannot be negative."}, + new String[] { + "loader/kafka-to-ignite-invalid-request-timeout.xml", + "Ouch! Argument is invalid: The Kafka request timeout cannot be negative." + } + ).forEach(args -> assertThrows(null, () -> loadKafkaToIgniteStreamer(args[0]), IllegalArgumentException.class, args[1])); + + Stream.of( + loadKafkaToIgniteStreamer("loader/thin/kafka-to-ignite-client-correct.xml"), + loadKafkaToIgniteStreamer("loader/kafka-to-ignite-correct.xml") + ).forEach(streamer -> { + assertNotNull(streamer); + assertEquals(TEST_KAFKA_CONSUMER_POLL_TIMEOUT, streamer.streamerCfg.getKafkaConsumerPollTimeout()); + assertEquals(TEST_KAFKA_REQUEST_TIMEOUT, streamer.streamerCfg.getKafkaRequestTimeout()); + }); + } + /** */ @Test public void testInitSpringContextOnce() throws Exception { diff --git a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/KafkaToIgniteMetadataUpdaterTest.java b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/KafkaToIgniteMetadataUpdaterTest.java index 26b95d9ec..5dc764fea 100644 --- a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/KafkaToIgniteMetadataUpdaterTest.java +++ b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/KafkaToIgniteMetadataUpdaterTest.java @@ -41,6 +41,7 @@ import static org.apache.ignite.cdc.kafka.CdcKafkaReplicationTest.initKafka; import static org.apache.ignite.cdc.kafka.CdcKafkaReplicationTest.kafkaProperties; import static org.apache.ignite.cdc.kafka.CdcKafkaReplicationTest.removeKafkaTopicsAndWait; +import static org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration.DFLT_KAFKA_CONSUMER_POLL_TIMEOUT; import static org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration.DFLT_KAFKA_REQ_TIMEOUT; import static org.apache.ignite.testframework.GridTestUtils.assertThrows; import static org.apache.ignite.testframework.GridTestUtils.waitForCondition; @@ -184,6 +185,7 @@ private KafkaToIgniteCdcStreamerConfiguration streamerConfiguration() { cfg.setKafkaPartsFrom(0); cfg.setKafkaPartsTo(DFLT_PARTS); cfg.setKafkaRequestTimeout(DFLT_KAFKA_REQ_TIMEOUT); + cfg.setKafkaConsumerPollTimeout(DFLT_KAFKA_CONSUMER_POLL_TIMEOUT); return cfg; } diff --git a/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-correct.xml b/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-correct.xml index b69342412..5b8655ab1 100644 --- a/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-correct.xml +++ b/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-correct.xml @@ -36,11 +36,16 @@ + + + + + diff --git a/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-invalid-poll-timeout.xml b/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-invalid-poll-timeout.xml new file mode 100644 index 000000000..724fe4437 --- /dev/null +++ b/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-invalid-poll-timeout.xml @@ -0,0 +1,48 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-invalid-request-timeout.xml b/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-invalid-request-timeout.xml new file mode 100644 index 000000000..8723df25a --- /dev/null +++ b/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-invalid-request-timeout.xml @@ -0,0 +1,48 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-correct.xml b/modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-correct.xml index caa894888..ba5d179c2 100644 --- a/modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-correct.xml +++ b/modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-correct.xml @@ -31,11 +31,16 @@ + + + + + diff --git a/modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-invalid-poll-timeout.xml b/modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-invalid-poll-timeout.xml new file mode 100644 index 000000000..cd0c78648 --- /dev/null +++ b/modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-invalid-poll-timeout.xml @@ -0,0 +1,43 @@ + + + + + + + + + 127.0.0.1:10800 + + + + + + + + + + + + + + diff --git a/modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-invalid-request-timeout.xml b/modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-invalid-request-timeout.xml new file mode 100644 index 000000000..3255d7bd0 --- /dev/null +++ b/modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-invalid-request-timeout.xml @@ -0,0 +1,43 @@ + + + + + + + + + 127.0.0.1:10800 + + + + + + + + + + + + + +