Skip to content

Commit

Permalink
IGNITE-22403 Fixed partition ranges assignment for appliers of KafkaT…
Browse files Browse the repository at this point in the history
…oIgnite streamers (#272)
  • Loading branch information
maksaska authored Jul 10, 2024
1 parent f7e4e3f commit 5bd39e8
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.ignite.internal.binary.BinaryContext;
import org.apache.ignite.internal.cdc.CdcMain;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
Expand Down Expand Up @@ -153,26 +154,16 @@ protected void runAppliers() {
streamerCfg
);

int kafkaPartsFrom = streamerCfg.getKafkaPartsFrom();
int kafkaParts = streamerCfg.getKafkaPartsTo() - kafkaPartsFrom;
int threadCnt = streamerCfg.getThreadCount();

int partPerApplier = kafkaParts / threadCnt;

for (int i = 0; i < threadCnt; i++) {
int from = i * partPerApplier;
int to = (i + 1) * partPerApplier;

if (i == threadCnt - 1)
to = kafkaParts;
int cntr = 0;

for (T2<Integer, Integer> parts : kafkaPartitions(streamerCfg)) {
KafkaToIgniteCdcStreamerApplier applier = new KafkaToIgniteCdcStreamerApplier(
() -> eventsApplier(),
log,
kafkaProps,
streamerCfg.getTopic(),
kafkaPartsFrom + from,
kafkaPartsFrom + to,
parts.get1(), // kafkaPartFrom
parts.get2(), // kafkaPartTo
caches,
streamerCfg.getMaxBatchSize(),
streamerCfg.getKafkaRequestTimeout(),
Expand All @@ -181,7 +172,7 @@ protected void runAppliers() {
stopped
);

addAndStart("applier-thread-" + i, applier);
addAndStart("applier-thread-" + cntr++, applier);
}

try {
Expand All @@ -197,6 +188,32 @@ protected void runAppliers() {
}
}

/**
* Calculates Kafka partition ranges per applier thread.
* @param streamerCfg {@link KafkaToIgniteCdcStreamerConfiguration}.
* @return List of pairs defining partition ranges for each applier thread.
*/
public static List<T2<Integer, Integer>> kafkaPartitions(KafkaToIgniteCdcStreamerConfiguration streamerCfg) {
List<T2<Integer, Integer>> parts = new ArrayList<>();

int kafkaPartsFrom = streamerCfg.getKafkaPartsFrom();
int kafkaParts = streamerCfg.getKafkaPartsTo() - kafkaPartsFrom;
int threadCnt = streamerCfg.getThreadCount();

while (kafkaParts > 0) {
int partPerApplier = kafkaParts / threadCnt + (kafkaParts % threadCnt > 0 ? 1 : 0);

kafkaParts -= partPerApplier;
--threadCnt;

parts.add(new T2<>(kafkaPartsFrom, kafkaPartsFrom + partPerApplier));

kafkaPartsFrom += partPerApplier;
}

return parts;
}

/** Adds applier to {@link #appliers} and starts thread with it. */
private <T extends AutoCloseable & Runnable> void addAndStart(String threadName, T applier) {
appliers.add(applier);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,10 @@ public KafkaToIgniteCdcStreamerApplier(

/** {@inheritDoc} */
@Override public void run() {
if (log.isInfoEnabled())
log.info("Kafka to Ignite applier started [topic=" + topic + ", partFrom=" + kafkaPartFrom +
", partTo=" + kafkaPartTo + "].");

applier = applierSupplier.get();

try {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.cdc.kafka;

import java.util.List;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;

import static java.util.Arrays.asList;
import static org.apache.ignite.cdc.kafka.AbstractKafkaToIgniteCdcStreamer.kafkaPartitions;
import static org.apache.ignite.internal.util.lang.GridFunc.t;

/** Tests Kafka partition ranges assignment to the multiple applier threads. */
public class KafkaToIgnitePartitionDistributionTest extends GridCommonAbstractTest {
/** */
private final KafkaToIgniteCdcStreamerConfiguration streamerCfg = new KafkaToIgniteCdcStreamerConfiguration();

/** */
@Test
public void testKafkaPartitions() {
doTest(3, 0, 8, asList(t(0, 3), t(3, 6), t(6, 8)));
doTest(4, 0, 8, asList(t(0, 2), t(2, 4), t(4, 6), t(6, 8)));
doTest(5, 0, 8, asList(t(0, 2), t(2, 4), t(4, 6), t(6, 7), t(7, 8)));

doTest(3, 3, 11, asList(t(3, 6), t(6, 9), t(9, 11)));
doTest(4, 3, 11, asList(t(3, 5), t(5, 7), t(7, 9), t(9, 11)));
doTest(5, 3, 11, asList(t(3, 5), t(5, 7), t(7, 9), t(9, 10), t(10, 11)));

doTest(3, 1, 4, asList(t(1, 2), t(2, 3), t(3, 4)));
}

/**
* @param threadCnt Applier threads count.
* @param expParts List of expected partition ranges.
*/
private void doTest(int threadCnt, int partFrom, int partTo, List<IgniteBiTuple<Integer, Integer>> expParts) {
streamerCfg.setThreadCount(threadCnt);
streamerCfg.setKafkaPartsFrom(partFrom);
streamerCfg.setKafkaPartsTo(partTo);

assertEquals(expParts, kafkaPartitions(streamerCfg));
}
}

0 comments on commit 5bd39e8

Please sign in to comment.