diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/AbstractKafkaToIgniteCdcStreamer.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/AbstractKafkaToIgniteCdcStreamer.java index 5e12cc43..18de8cb6 100644 --- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/AbstractKafkaToIgniteCdcStreamer.java +++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/AbstractKafkaToIgniteCdcStreamer.java @@ -32,6 +32,7 @@ import org.apache.ignite.internal.binary.BinaryContext; import org.apache.ignite.internal.cdc.CdcMain; import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.A; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; @@ -153,26 +154,16 @@ protected void runAppliers() { streamerCfg ); - int kafkaPartsFrom = streamerCfg.getKafkaPartsFrom(); - int kafkaParts = streamerCfg.getKafkaPartsTo() - kafkaPartsFrom; - int threadCnt = streamerCfg.getThreadCount(); - - int partPerApplier = kafkaParts / threadCnt; - - for (int i = 0; i < threadCnt; i++) { - int from = i * partPerApplier; - int to = (i + 1) * partPerApplier; - - if (i == threadCnt - 1) - to = kafkaParts; + int cntr = 0; + for (T2 parts : kafkaPartitions(streamerCfg)) { KafkaToIgniteCdcStreamerApplier applier = new KafkaToIgniteCdcStreamerApplier( () -> eventsApplier(), log, kafkaProps, streamerCfg.getTopic(), - kafkaPartsFrom + from, - kafkaPartsFrom + to, + parts.get1(), // kafkaPartFrom + parts.get2(), // kafkaPartTo caches, streamerCfg.getMaxBatchSize(), streamerCfg.getKafkaRequestTimeout(), @@ -181,7 +172,7 @@ protected void runAppliers() { stopped ); - addAndStart("applier-thread-" + i, applier); + addAndStart("applier-thread-" + cntr++, applier); } try { @@ -197,6 +188,32 @@ protected void runAppliers() { } } + /** + * Calculates Kafka partition ranges per applier thread. + * @param streamerCfg {@link KafkaToIgniteCdcStreamerConfiguration}. + * @return List of pairs defining partition ranges for each applier thread. + */ + public static List> kafkaPartitions(KafkaToIgniteCdcStreamerConfiguration streamerCfg) { + List> parts = new ArrayList<>(); + + int kafkaPartsFrom = streamerCfg.getKafkaPartsFrom(); + int kafkaParts = streamerCfg.getKafkaPartsTo() - kafkaPartsFrom; + int threadCnt = streamerCfg.getThreadCount(); + + while (kafkaParts > 0) { + int partPerApplier = kafkaParts / threadCnt + (kafkaParts % threadCnt > 0 ? 1 : 0); + + kafkaParts -= partPerApplier; + --threadCnt; + + parts.add(new T2<>(kafkaPartsFrom, kafkaPartsFrom + partPerApplier)); + + kafkaPartsFrom += partPerApplier; + } + + return parts; + } + /** Adds applier to {@link #appliers} and starts thread with it. */ private void addAndStart(String threadName, T applier) { appliers.add(applier); diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerApplier.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerApplier.java index d113bba7..3dab9c79 100644 --- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerApplier.java +++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerApplier.java @@ -168,6 +168,10 @@ public KafkaToIgniteCdcStreamerApplier( /** {@inheritDoc} */ @Override public void run() { + if (log.isInfoEnabled()) + log.info("Kafka to Ignite applier started [topic=" + topic + ", partFrom=" + kafkaPartFrom + + ", partTo=" + kafkaPartTo + "]."); + applier = applierSupplier.get(); try { diff --git a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/KafkaToIgnitePartitionDistributionTest.java b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/KafkaToIgnitePartitionDistributionTest.java new file mode 100644 index 00000000..88dbf28f --- /dev/null +++ b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/KafkaToIgnitePartitionDistributionTest.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cdc.kafka; + +import java.util.List; +import org.apache.ignite.lang.IgniteBiTuple; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.junit.Test; + +import static java.util.Arrays.asList; +import static org.apache.ignite.cdc.kafka.AbstractKafkaToIgniteCdcStreamer.kafkaPartitions; +import static org.apache.ignite.internal.util.lang.GridFunc.t; + +/** Tests Kafka partition ranges assignment to the multiple applier threads. */ +public class KafkaToIgnitePartitionDistributionTest extends GridCommonAbstractTest { + /** */ + private final KafkaToIgniteCdcStreamerConfiguration streamerCfg = new KafkaToIgniteCdcStreamerConfiguration(); + + /** */ + @Test + public void testKafkaPartitions() { + doTest(3, 0, 8, asList(t(0, 3), t(3, 6), t(6, 8))); + doTest(4, 0, 8, asList(t(0, 2), t(2, 4), t(4, 6), t(6, 8))); + doTest(5, 0, 8, asList(t(0, 2), t(2, 4), t(4, 6), t(6, 7), t(7, 8))); + + doTest(3, 3, 11, asList(t(3, 6), t(6, 9), t(9, 11))); + doTest(4, 3, 11, asList(t(3, 5), t(5, 7), t(7, 9), t(9, 11))); + doTest(5, 3, 11, asList(t(3, 5), t(5, 7), t(7, 9), t(9, 10), t(10, 11))); + + doTest(3, 1, 4, asList(t(1, 2), t(2, 3), t(3, 4))); + } + + /** + * @param threadCnt Applier threads count. + * @param expParts List of expected partition ranges. + */ + private void doTest(int threadCnt, int partFrom, int partTo, List> expParts) { + streamerCfg.setThreadCount(threadCnt); + streamerCfg.setKafkaPartsFrom(partFrom); + streamerCfg.setKafkaPartsTo(partTo); + + assertEquals(expParts, kafkaPartitions(streamerCfg)); + } +}