Skip to content

Commit

Permalink
IGNITE-19910 Add consumer poll timeout to kafka-cdc (#276)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivandasch authored Jun 13, 2024
1 parent dc3e83a commit 9b79fe6
Show file tree
Hide file tree
Showing 12 changed files with 268 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ public AbstractKafkaToIgniteCdcStreamer(Properties kafkaProps, KafkaToIgniteCdcS
streamerCfg.getKafkaPartsTo() > streamerCfg.getKafkaPartsFrom(),
"The Kafka partitions upper bound must be greater than lower bound.");
A.ensure(streamerCfg.getKafkaRequestTimeout() >= 0, "The Kafka request timeout cannot be negative.");
A.ensure(streamerCfg.getKafkaConsumerPollTimeout() >= 0, "The Kafka consumer poll timeout cannot be negative.");
A.ensure(streamerCfg.getThreadCount() > 0, "Threads count value must me greater than zero.");
A.ensure(
streamerCfg.getKafkaPartsTo() - streamerCfg.getKafkaPartsFrom() >= streamerCfg.getThreadCount(),
Expand Down Expand Up @@ -175,6 +176,7 @@ protected void runAppliers() {
caches,
streamerCfg.getMaxBatchSize(),
streamerCfg.getKafkaRequestTimeout(),
streamerCfg.getKafkaConsumerPollTimeout(),
metaUpdr,
stopped
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,9 @@ class KafkaToIgniteCdcStreamerApplier implements Runnable, AutoCloseable {
/** The maximum time to complete Kafka related requests, in milliseconds. */
private final long kafkaReqTimeout;

/** Consumer poll timeout in milliseconds. */
private final long consumerPollTimeout;

/** Metadata updater. */
private final KafkaToIgniteMetadataUpdater metaUpdr;

Expand All @@ -132,6 +135,7 @@ class KafkaToIgniteCdcStreamerApplier implements Runnable, AutoCloseable {
* @param caches Cache ids.
* @param maxBatchSize Maximum batch size.
* @param kafkaReqTimeout The maximum time to complete Kafka related requests, in milliseconds.
* @param consumerPollTimeout Consumer poll timeout in milliseconds.
* @param metaUpdr Metadata updater.
* @param stopped Stopped flag.
*/
Expand All @@ -145,6 +149,7 @@ public KafkaToIgniteCdcStreamerApplier(
Set<Integer> caches,
int maxBatchSize,
long kafkaReqTimeout,
long consumerPollTimeout,
KafkaToIgniteMetadataUpdater metaUpdr,
AtomicBoolean stopped
) {
Expand All @@ -155,6 +160,7 @@ public KafkaToIgniteCdcStreamerApplier(
this.kafkaPartTo = kafkaPartTo;
this.caches = caches;
this.kafkaReqTimeout = kafkaReqTimeout;
this.consumerPollTimeout = consumerPollTimeout;
this.metaUpdr = metaUpdr;
this.stopped = stopped;
this.log = log.getLogger(KafkaToIgniteCdcStreamerApplier.class);
Expand Down Expand Up @@ -213,7 +219,7 @@ public KafkaToIgniteCdcStreamerApplier(
* @param cnsmr Data consumer.
*/
private void poll(KafkaConsumer<Integer, byte[]> cnsmr) throws IgniteCheckedException {
ConsumerRecords<Integer, byte[]> recs = cnsmr.poll(Duration.ofMillis(kafkaReqTimeout));
ConsumerRecords<Integer, byte[]> recs = cnsmr.poll(Duration.ofMillis(consumerPollTimeout));

if (log.isInfoEnabled()) {
log.info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ public class KafkaToIgniteCdcStreamerConfiguration {
/** Default maximum time to complete Kafka related requests, in milliseconds. */
public static final long DFLT_KAFKA_REQ_TIMEOUT = 3_000L;

/** Default kafka consumer poll timeout. */
public static final long DFLT_KAFKA_CONSUMER_POLL_TIMEOUT = 3_000L;

/** Default {@link #threadCnt} value. */
public static final int DFLT_THREAD_CNT = 16;

Expand All @@ -57,6 +60,9 @@ public class KafkaToIgniteCdcStreamerConfiguration {
/** The maximum time to complete Kafka related requests, in milliseconds. */
private long kafkaReqTimeout = DFLT_KAFKA_REQ_TIMEOUT;

/** Timeout of kafka consumer poll */
private long kafkaConsumerPollTimeout = DFLT_KAFKA_CONSUMER_POLL_TIMEOUT;

/** Metadata consumer group. */
private String metadataCnsmrGrp;

Expand Down Expand Up @@ -171,6 +177,22 @@ public void setKafkaRequestTimeout(long kafkaReqTimeout) {
this.kafkaReqTimeout = kafkaReqTimeout;
}

/**
* @return The kafka consumer poll timeout in milliseconds.
*/
public long getKafkaConsumerPollTimeout() {
return kafkaConsumerPollTimeout;
}

/**
* Sets the kafka consumer poll timeout in milliseconds.
*
* @param timeout Timeout value.
*/
public void setKafkaConsumerPollTimeout(long timeout) {
this.kafkaConsumerPollTimeout = timeout;
}

/**
* @return Metadata topic name.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ public class KafkaToIgniteMetadataUpdater implements AutoCloseable, OffsetCommit
/** The maximum time to complete Kafka related requests, in milliseconds. */
private final long kafkaReqTimeout;

/** Consumer poll timeout. */
private final long consumerPollTimeout;

/** */
private final KafkaConsumer<Void, byte[]> cnsmr;

Expand Down Expand Up @@ -90,6 +93,7 @@ public KafkaToIgniteMetadataUpdater(
) {
this.ctx = ctx;
this.kafkaReqTimeout = streamerCfg.getKafkaRequestTimeout();
this.consumerPollTimeout = streamerCfg.getKafkaConsumerPollTimeout();
this.log = log.getLogger(KafkaToIgniteMetadataUpdater.class);

Properties kafkaProps = new Properties();
Expand Down Expand Up @@ -141,7 +145,7 @@ public synchronized void updateMetadata() {
}

while (true) {
ConsumerRecords<Void, byte[]> recs = cnsmr.poll(Duration.ofMillis(kafkaReqTimeout));
ConsumerRecords<Void, byte[]> recs = cnsmr.poll(Duration.ofMillis(consumerPollTimeout));

if (recs.count() == 0) {
if (log.isDebugEnabled())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package org.apache.ignite.cdc.kafka;

import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;

import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
Expand All @@ -27,6 +29,12 @@

/** Tests load {@link KafkaToIgniteCdcStreamer} from Srping xml file. */
public class KafkaToIgniteLoaderTest extends GridCommonAbstractTest {
/** Constant to reference from xml config. */
public static final int TEST_KAFKA_CONSUMER_POLL_TIMEOUT = 2000;

/** Constant to reference from xml config. */
public static final int TEST_KAFKA_REQUEST_TIMEOUT = 6000;

/** */
@Test
public void testLoadConfig() throws Exception {
Expand Down Expand Up @@ -85,6 +93,36 @@ public void testLoadIgniteClientConfig() throws Exception {
assertNotNull(streamer);
}

/** Tests setting timeout properties of kafka to ignite loaders. */
@Test
public void testLoadTimeoutProperties() throws Exception {
Stream.of(
new String[] {
"loader/thin/kafka-to-ignite-client-invalid-poll-timeout.xml",
"Ouch! Argument is invalid: The Kafka consumer poll timeout cannot be negative."},
new String[] {
"loader/thin/kafka-to-ignite-client-invalid-request-timeout.xml",
"Ouch! Argument is invalid: The Kafka request timeout cannot be negative."
},
new String[] {
"loader/kafka-to-ignite-invalid-poll-timeout.xml",
"Ouch! Argument is invalid: The Kafka consumer poll timeout cannot be negative."},
new String[] {
"loader/kafka-to-ignite-invalid-request-timeout.xml",
"Ouch! Argument is invalid: The Kafka request timeout cannot be negative."
}
).forEach(args -> assertThrows(null, () -> loadKafkaToIgniteStreamer(args[0]), IllegalArgumentException.class, args[1]));

Stream.<AbstractKafkaToIgniteCdcStreamer>of(
loadKafkaToIgniteStreamer("loader/thin/kafka-to-ignite-client-correct.xml"),
loadKafkaToIgniteStreamer("loader/kafka-to-ignite-correct.xml")
).forEach(streamer -> {
assertNotNull(streamer);
assertEquals(TEST_KAFKA_CONSUMER_POLL_TIMEOUT, streamer.streamerCfg.getKafkaConsumerPollTimeout());
assertEquals(TEST_KAFKA_REQUEST_TIMEOUT, streamer.streamerCfg.getKafkaRequestTimeout());
});
}

/** */
@Test
public void testInitSpringContextOnce() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import static org.apache.ignite.cdc.kafka.CdcKafkaReplicationTest.initKafka;
import static org.apache.ignite.cdc.kafka.CdcKafkaReplicationTest.kafkaProperties;
import static org.apache.ignite.cdc.kafka.CdcKafkaReplicationTest.removeKafkaTopicsAndWait;
import static org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration.DFLT_KAFKA_CONSUMER_POLL_TIMEOUT;
import static org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration.DFLT_KAFKA_REQ_TIMEOUT;
import static org.apache.ignite.testframework.GridTestUtils.assertThrows;
import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
Expand Down Expand Up @@ -184,6 +185,7 @@ private KafkaToIgniteCdcStreamerConfiguration streamerConfiguration() {
cfg.setKafkaPartsFrom(0);
cfg.setKafkaPartsTo(DFLT_PARTS);
cfg.setKafkaRequestTimeout(DFLT_KAFKA_REQ_TIMEOUT);
cfg.setKafkaConsumerPollTimeout(DFLT_KAFKA_CONSUMER_POLL_TIMEOUT);

return cfg;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,16 @@
</property>
</bean>

<util:constant static-field="org.apache.ignite.cdc.kafka.KafkaToIgniteLoaderTest.TEST_KAFKA_CONSUMER_POLL_TIMEOUT" id="consumerPollTimeout" />
<util:constant static-field="org.apache.ignite.cdc.kafka.KafkaToIgniteLoaderTest.TEST_KAFKA_REQUEST_TIMEOUT" id="requestTimeout" />

<bean id="streamer.cfg" class="org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration">
<property name="topic" value="ignite" />
<property name="metadataTopic" value="ignite-metadata" />
<property name="kafkaPartsFrom" value="0" />
<property name="kafkaPartsTo" value="16" />
<property name="kafkaRequestTimeout" ref="requestTimeout" />
<property name="kafkaConsumerPollTimeout" ref="consumerPollTimeout" />
</bean>

<util:properties id="kafkaProperties" location="loader/kafka.properties" />
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
<?xml version="1.0" encoding="UTF-8"?>

<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->

<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:util="http://www.springframework.org/schema/util"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd">
<bean id="grid.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
<property name="dataStorageConfiguration">
<bean class="org.apache.ignite.configuration.DataStorageConfiguration">
<property name="defaultDataRegionConfiguration">
<bean class="org.apache.ignite.configuration.DataRegionConfiguration">
<property name="cdcEnabled" value="true" />
<property name="persistenceEnabled" value="true" />
</bean>
</property>
</bean>
</property>
</bean>

<bean id="streamer.cfg" class="org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration">
<property name="topic" value="ignite" />
<property name="metadataTopic" value="ignite-metadata" />
<property name="kafkaPartsFrom" value="0" />
<property name="kafkaPartsTo" value="16" />
<property name="kafkaConsumerPollTimeout" value="-1"/>
</bean>

<util:properties id="kafkaProperties" location="loader/kafka.properties" />
</beans>
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
<?xml version="1.0" encoding="UTF-8"?>

<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->

<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:util="http://www.springframework.org/schema/util"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd">
<bean id="grid.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
<property name="dataStorageConfiguration">
<bean class="org.apache.ignite.configuration.DataStorageConfiguration">
<property name="defaultDataRegionConfiguration">
<bean class="org.apache.ignite.configuration.DataRegionConfiguration">
<property name="cdcEnabled" value="true" />
<property name="persistenceEnabled" value="true" />
</bean>
</property>
</bean>
</property>
</bean>

<bean id="streamer.cfg" class="org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration">
<property name="topic" value="ignite" />
<property name="metadataTopic" value="ignite-metadata" />
<property name="kafkaPartsFrom" value="0" />
<property name="kafkaPartsTo" value="16" />
<property name="kafkaRequestTimeout" value="-1"/>
</bean>

<util:properties id="kafkaProperties" location="loader/kafka.properties" />
</beans>
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,16 @@
</property>
</bean>

<util:constant static-field="org.apache.ignite.cdc.kafka.KafkaToIgniteLoaderTest.TEST_KAFKA_CONSUMER_POLL_TIMEOUT" id="consumerPollTimeout" />
<util:constant static-field="org.apache.ignite.cdc.kafka.KafkaToIgniteLoaderTest.TEST_KAFKA_REQUEST_TIMEOUT" id="requestTimeout" />

<bean id="streamer.cfg" class="org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration">
<property name="topic" value="ignite" />
<property name="metadataTopic" value="ignite-metadata" />
<property name="kafkaPartsFrom" value="0" />
<property name="kafkaPartsTo" value="16" />
<property name="kafkaRequestTimeout" ref="requestTimeout" />
<property name="kafkaConsumerPollTimeout" ref="consumerPollTimeout" />
</bean>

<util:properties id="kafkaProperties" location="loader/kafka.properties" />
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
<?xml version="1.0" encoding="UTF-8"?>

<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->

<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:util="http://www.springframework.org/schema/util"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd">
<bean id="client.cfg" class="org.apache.ignite.configuration.ClientConfiguration">
<property name="addresses">
<list>
<value>127.0.0.1:10800</value>
</list>
</property>
</bean>

<bean id="streamer.cfg" class="org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration">
<property name="topic" value="ignite" />
<property name="metadataTopic" value="ignite-metadata" />
<property name="kafkaPartsFrom" value="0" />
<property name="kafkaPartsTo" value="16" />
<property name="kafkaConsumerPollTimeout" value="-1"/>
</bean>

<util:properties id="kafkaProperties" location="loader/kafka.properties" />
</beans>
Loading

0 comments on commit 9b79fe6

Please sign in to comment.