Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IGNITE-19910 Add consumer poll timeout to kafka-cdc #276

Merged
merged 3 commits into from
Jun 13, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ public AbstractKafkaToIgniteCdcStreamer(Properties kafkaProps, KafkaToIgniteCdcS
streamerCfg.getKafkaPartsTo() > streamerCfg.getKafkaPartsFrom(),
"The Kafka partitions upper bound must be greater than lower bound.");
A.ensure(streamerCfg.getKafkaRequestTimeout() >= 0, "The Kafka request timeout cannot be negative.");
A.ensure(streamerCfg.getKafkaConsumerPollTimeout() >= 0, "The Kafka consumer poll timeout cannot be negative.");
A.ensure(streamerCfg.getThreadCount() > 0, "Threads count value must me greater than zero.");
A.ensure(
streamerCfg.getKafkaPartsTo() - streamerCfg.getKafkaPartsFrom() >= streamerCfg.getThreadCount(),
Expand Down Expand Up @@ -175,6 +176,7 @@ protected void runAppliers() {
caches,
streamerCfg.getMaxBatchSize(),
streamerCfg.getKafkaRequestTimeout(),
streamerCfg.getKafkaConsumerPollTimeout(),
metaUpdr,
stopped
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,9 @@ class KafkaToIgniteCdcStreamerApplier implements Runnable, AutoCloseable {
/** The maximum time to complete Kafka related requests, in milliseconds. */
private final long kafkaReqTimeout;

/** Consumer poll timeout. */
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's mention in javadocs (especially in public config) that the timeout is measured in "milliseconds", in the same way it is for kafkaReqTimeout.

private final long consumerPollTimeout;

/** Metadata updater. */
private final KafkaToIgniteMetadataUpdater metaUpdr;

Expand All @@ -132,6 +135,7 @@ class KafkaToIgniteCdcStreamerApplier implements Runnable, AutoCloseable {
* @param caches Cache ids.
* @param maxBatchSize Maximum batch size.
* @param kafkaReqTimeout The maximum time to complete Kafka related requests, in milliseconds.
* @param consumerPollTimeout Consumer poll timeout.
* @param metaUpdr Metadata updater.
* @param stopped Stopped flag.
*/
Expand All @@ -145,6 +149,7 @@ public KafkaToIgniteCdcStreamerApplier(
Set<Integer> caches,
int maxBatchSize,
long kafkaReqTimeout,
long consumerPollTimeout,
KafkaToIgniteMetadataUpdater metaUpdr,
AtomicBoolean stopped
) {
Expand All @@ -155,6 +160,7 @@ public KafkaToIgniteCdcStreamerApplier(
this.kafkaPartTo = kafkaPartTo;
this.caches = caches;
this.kafkaReqTimeout = kafkaReqTimeout;
this.consumerPollTimeout = consumerPollTimeout;
this.metaUpdr = metaUpdr;
this.stopped = stopped;
this.log = log.getLogger(KafkaToIgniteCdcStreamerApplier.class);
Expand Down Expand Up @@ -213,7 +219,7 @@ public KafkaToIgniteCdcStreamerApplier(
* @param cnsmr Data consumer.
*/
private void poll(KafkaConsumer<Integer, byte[]> cnsmr) throws IgniteCheckedException {
ConsumerRecords<Integer, byte[]> recs = cnsmr.poll(Duration.ofMillis(kafkaReqTimeout));
ConsumerRecords<Integer, byte[]> recs = cnsmr.poll(Duration.ofMillis(consumerPollTimeout));

if (log.isInfoEnabled()) {
log.info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ public class KafkaToIgniteCdcStreamerConfiguration {
/** Default maximum time to complete Kafka related requests, in milliseconds. */
public static final long DFLT_KAFKA_REQ_TIMEOUT = 3_000L;

/** Default kafka consumer poll timeout. */
public static final long DFLT_KAFKA_CONSUMER_POLL_TIMEOUT = 3_000L;

/** Default {@link #threadCnt} value. */
public static final int DFLT_THREAD_CNT = 16;

Expand All @@ -57,6 +60,9 @@ public class KafkaToIgniteCdcStreamerConfiguration {
/** The maximum time to complete Kafka related requests, in milliseconds. */
private long kafkaReqTimeout = DFLT_KAFKA_REQ_TIMEOUT;

/** Timeout of kafka consumer poll */
private long kafkaConsumerPollTimeout = DFLT_KAFKA_CONSUMER_POLL_TIMEOUT;

/** Metadata consumer group. */
private String metadataCnsmrGrp;

Expand Down Expand Up @@ -171,6 +177,22 @@ public void setKafkaRequestTimeout(long kafkaReqTimeout) {
this.kafkaReqTimeout = kafkaReqTimeout;
}

/**
* @return The kafka consumer poll timeout.
*/
public long getKafkaConsumerPollTimeout() {
return kafkaConsumerPollTimeout;
}

/**
* Sets the kafka consumer poll timeout.
*
* @param timeout Timeout value.
*/
public void setKafkaConsumerPollTimeout(long timeout) {
this.kafkaConsumerPollTimeout = timeout;
}

/**
* @return Metadata topic name.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ public class KafkaToIgniteMetadataUpdater implements AutoCloseable, OffsetCommit
/** The maximum time to complete Kafka related requests, in milliseconds. */
private final long kafkaReqTimeout;

/** Consumer poll timeout. */
private final long consumerPollTimeout;

/** */
private final KafkaConsumer<Void, byte[]> cnsmr;

Expand Down Expand Up @@ -90,6 +93,7 @@ public KafkaToIgniteMetadataUpdater(
) {
this.ctx = ctx;
this.kafkaReqTimeout = streamerCfg.getKafkaRequestTimeout();
this.consumerPollTimeout = streamerCfg.getKafkaConsumerPollTimeout();
this.log = log.getLogger(KafkaToIgniteMetadataUpdater.class);

Properties kafkaProps = new Properties();
Expand Down Expand Up @@ -141,7 +145,7 @@ public synchronized void updateMetadata() {
}

while (true) {
ConsumerRecords<Void, byte[]> recs = cnsmr.poll(Duration.ofMillis(kafkaReqTimeout));
ConsumerRecords<Void, byte[]> recs = cnsmr.poll(Duration.ofMillis(consumerPollTimeout));

if (recs.count() == 0) {
if (log.isDebugEnabled())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package org.apache.ignite.cdc.kafka;

import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;

import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
Expand All @@ -27,6 +29,12 @@

/** Tests load {@link KafkaToIgniteCdcStreamer} from Srping xml file. */
public class KafkaToIgniteLoaderTest extends GridCommonAbstractTest {
/** Constant to reference from xml config. */
public static final int TEST_KAFKA_CONSUMER_POLL_TIMEOUT = 2000;

/** Constant to reference from xml config. */
public static final int TEST_KAFKA_REQUEST_TIMEOUT = 6000;

/** */
@Test
public void testLoadConfig() throws Exception {
Expand Down Expand Up @@ -85,6 +93,35 @@ public void testLoadIgniteClientConfig() throws Exception {
assertNotNull(streamer);
}

@Test
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missed javadoc for the test

public void testLoadTimeoutProperties() throws Exception {
Stream.of(
new String[] {
"loader/thin/kafka-to-ignite-client-invalid-poll-timeout.xml",
"Ouch! Argument is invalid: The Kafka consumer poll timeout cannot be negative."},
new String[] {
"loader/thin/kafka-to-ignite-client-invalid-request-timeout.xml",
"Ouch! Argument is invalid: The Kafka request timeout cannot be negative."
},
new String[] {
"loader/kafka-to-ignite-invalid-poll-timeout.xml",
"Ouch! Argument is invalid: The Kafka consumer poll timeout cannot be negative."},
new String[] {
"loader/kafka-to-ignite-invalid-request-timeout.xml",
"Ouch! Argument is invalid: The Kafka request timeout cannot be negative."
}
).forEach(args -> assertThrows(null, () -> loadKafkaToIgniteStreamer(args[0]), IllegalArgumentException.class, args[1]));

Stream.<AbstractKafkaToIgniteCdcStreamer>of(
loadKafkaToIgniteStreamer("loader/thin/kafka-to-ignite-client-correct.xml"),
loadKafkaToIgniteStreamer("loader/kafka-to-ignite-correct.xml")
).forEach(streamer -> {
assertNotNull(streamer);
assertEquals(TEST_KAFKA_CONSUMER_POLL_TIMEOUT, streamer.streamerCfg.getKafkaConsumerPollTimeout());
assertEquals(TEST_KAFKA_REQUEST_TIMEOUT, streamer.streamerCfg.getKafkaRequestTimeout());
});
}

/** */
@Test
public void testInitSpringContextOnce() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import static org.apache.ignite.cdc.kafka.CdcKafkaReplicationTest.initKafka;
import static org.apache.ignite.cdc.kafka.CdcKafkaReplicationTest.kafkaProperties;
import static org.apache.ignite.cdc.kafka.CdcKafkaReplicationTest.removeKafkaTopicsAndWait;
import static org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration.DFLT_KAFKA_CONSUMER_POLL_TIMEOUT;
import static org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration.DFLT_KAFKA_REQ_TIMEOUT;
import static org.apache.ignite.testframework.GridTestUtils.assertThrows;
import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
Expand Down Expand Up @@ -184,6 +185,7 @@ private KafkaToIgniteCdcStreamerConfiguration streamerConfiguration() {
cfg.setKafkaPartsFrom(0);
cfg.setKafkaPartsTo(DFLT_PARTS);
cfg.setKafkaRequestTimeout(DFLT_KAFKA_REQ_TIMEOUT);
cfg.setKafkaConsumerPollTimeout(DFLT_KAFKA_CONSUMER_POLL_TIMEOUT);

return cfg;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,16 @@
</property>
</bean>

<util:constant static-field="org.apache.ignite.cdc.kafka.KafkaToIgniteLoaderTest.TEST_KAFKA_CONSUMER_POLL_TIMEOUT" id="consumerPollTimeout" />
<util:constant static-field="org.apache.ignite.cdc.kafka.KafkaToIgniteLoaderTest.TEST_KAFKA_REQUEST_TIMEOUT" id="requestTimeout" />

<bean id="streamer.cfg" class="org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration">
<property name="topic" value="ignite" />
<property name="metadataTopic" value="ignite-metadata" />
<property name="kafkaPartsFrom" value="0" />
<property name="kafkaPartsTo" value="16" />
<property name="kafkaRequestTimeout" ref="requestTimeout" />
<property name="kafkaConsumerPollTimeout" ref="consumerPollTimeout" />
</bean>

<util:properties id="kafkaProperties" location="loader/kafka.properties" />
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
<?xml version="1.0" encoding="UTF-8"?>

<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->

<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:util="http://www.springframework.org/schema/util"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd">
<bean id="grid.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
<property name="dataStorageConfiguration">
<bean class="org.apache.ignite.configuration.DataStorageConfiguration">
<property name="defaultDataRegionConfiguration">
<bean class="org.apache.ignite.configuration.DataRegionConfiguration">
<property name="cdcEnabled" value="true" />
<property name="persistenceEnabled" value="true" />
</bean>
</property>
</bean>
</property>
</bean>

<bean id="streamer.cfg" class="org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration">
<property name="topic" value="ignite" />
<property name="metadataTopic" value="ignite-metadata" />
<property name="kafkaPartsFrom" value="0" />
<property name="kafkaPartsTo" value="16" />
<property name="kafkaConsumerPollTimeout" value="-1"/>
</bean>

<util:properties id="kafkaProperties" location="loader/kafka.properties" />
</beans>
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
<?xml version="1.0" encoding="UTF-8"?>

<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->

<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:util="http://www.springframework.org/schema/util"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd">
<bean id="grid.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
<property name="dataStorageConfiguration">
<bean class="org.apache.ignite.configuration.DataStorageConfiguration">
<property name="defaultDataRegionConfiguration">
<bean class="org.apache.ignite.configuration.DataRegionConfiguration">
<property name="cdcEnabled" value="true" />
<property name="persistenceEnabled" value="true" />
</bean>
</property>
</bean>
</property>
</bean>

<bean id="streamer.cfg" class="org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration">
<property name="topic" value="ignite" />
<property name="metadataTopic" value="ignite-metadata" />
<property name="kafkaPartsFrom" value="0" />
<property name="kafkaPartsTo" value="16" />
<property name="kafkaRequestTimeout" value="-1"/>
</bean>

<util:properties id="kafkaProperties" location="loader/kafka.properties" />
</beans>
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,16 @@
</property>
</bean>

<util:constant static-field="org.apache.ignite.cdc.kafka.KafkaToIgniteLoaderTest.TEST_KAFKA_CONSUMER_POLL_TIMEOUT" id="consumerPollTimeout" />
<util:constant static-field="org.apache.ignite.cdc.kafka.KafkaToIgniteLoaderTest.TEST_KAFKA_REQUEST_TIMEOUT" id="requestTimeout" />

<bean id="streamer.cfg" class="org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration">
<property name="topic" value="ignite" />
<property name="metadataTopic" value="ignite-metadata" />
<property name="kafkaPartsFrom" value="0" />
<property name="kafkaPartsTo" value="16" />
<property name="kafkaRequestTimeout" ref="requestTimeout" />
<property name="kafkaConsumerPollTimeout" ref="consumerPollTimeout" />
</bean>

<util:properties id="kafkaProperties" location="loader/kafka.properties" />
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
<?xml version="1.0" encoding="UTF-8"?>

<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->

<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:util="http://www.springframework.org/schema/util"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd">
<bean id="client.cfg" class="org.apache.ignite.configuration.ClientConfiguration">
<property name="addresses">
<list>
<value>127.0.0.1:10800</value>
</list>
</property>
</bean>

<bean id="streamer.cfg" class="org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration">
<property name="topic" value="ignite" />
<property name="metadataTopic" value="ignite-metadata" />
<property name="kafkaPartsFrom" value="0" />
<property name="kafkaPartsTo" value="16" />
<property name="kafkaConsumerPollTimeout" value="-1"/>
</bean>

<util:properties id="kafkaProperties" location="loader/kafka.properties" />
</beans>
Loading