diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/AbstractCdcEventsApplier.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/AbstractCdcEventsApplier.java index 479a1db4..ccf3f804 100644 --- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/AbstractCdcEventsApplier.java +++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/AbstractCdcEventsApplier.java @@ -17,8 +17,9 @@ package org.apache.ignite.cdc; -import java.util.HashMap; import java.util.Map; +import java.util.SortedMap; +import java.util.TreeMap; import java.util.function.BooleanSupplier; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; @@ -36,10 +37,10 @@ public abstract class AbstractCdcEventsApplier { private final int maxBatchSize; /** Update batch. */ - private final Map updBatch = new HashMap<>(); + private final SortedMap updBatch = new TreeMap<>(this::compareKey); /** Remove batch. */ - private final Map rmvBatch = new HashMap<>(); + private final SortedMap rmvBatch = new TreeMap<>(this::compareKey); /** */ private final BooleanSupplier hasUpdates = () -> !F.isEmpty(updBatch); @@ -109,13 +110,12 @@ public int apply(Iterable evts) throws IgniteCheckedException { * @param applyUpd Apply update batch flag supplier. * @param applyRmv Apply remove batch flag supplier. * @return Number of applied events. - * @throws IgniteCheckedException In case of error. */ private int applyIf( int cacheId, BooleanSupplier applyUpd, BooleanSupplier applyRmv - ) throws IgniteCheckedException { + ) { int evtsApplied = 0; if (applyUpd.getAsBoolean()) { @@ -151,6 +151,14 @@ private boolean isApplyBatch(Map map, K key) { /** @return Key. */ protected abstract K toKey(CdcEvent evt); + /** + * Compares keys hash codes only, because bytes might not be available. + * If hash codes are equal it put {@code key2} to next batch, see {@link #isApplyBatch)}. + */ + private int compareKey(Object key1, Object key2) { + return Integer.compare(key1.hashCode(), key2.hashCode()); + } + /** @return Value. */ protected abstract V toValue(int cacheId, CdcEvent evt, GridCacheVersion ver); diff --git a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java index 9eddeea7..9a5e18c7 100644 --- a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java +++ b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java @@ -25,6 +25,7 @@ import java.util.EnumSet; import java.util.HashSet; import java.util.List; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; import java.util.function.Function; @@ -93,6 +94,7 @@ import static org.apache.ignite.testframework.GridTestUtils.getFieldValue; import static org.apache.ignite.testframework.GridTestUtils.runAsync; import static org.apache.ignite.testframework.GridTestUtils.waitForCondition; +import static org.junit.Assume.assumeTrue; /** */ @RunWith(Parameterized.class) @@ -310,6 +312,61 @@ public void testActivePassiveReplication() throws Exception { } } + /** Test that CDC instances don't lock each other while streaming mixed keys. */ + @Test + public void testConcurrentMixedKeys() throws Exception { + assumeTrue(atomicity == TRANSACTIONAL); + + for (IgniteEx ign: F.asList(srcCluster[0], destCluster[0])) { + ign.createCache(new CacheConfiguration() + .setName(ACTIVE_PASSIVE_CACHE) + .setAtomicityMode(atomicity) + .setBackups(backups) + .setCacheMode(mode)); + } + + List> futs = startActivePassiveCdc(ACTIVE_PASSIVE_CACHE); + + try { + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + int cnt = 0; + + // Setup bound for keys to increase probability to stream same keys through each CDC instance. + // The value should not be small to force CDC generates batches for putAllConflict call. + int keysCnt = 20; + + while (cnt++ < 10_000) { + srcCluster[rnd.nextInt(2)] + .cache(ACTIVE_PASSIVE_CACHE) + .put(new TestKey(rnd.nextInt(keysCnt), null), rnd.nextInt()); + + if (cnt % 1_000 == 0) + System.out.println("Load count = " + cnt); + } + + // Check that all data received. + assertTrue(waitForCondition(() -> { + IgniteCache srcCache = srcCluster[0].cache(ACTIVE_PASSIVE_CACHE); + IgniteCache destCache = destCluster[0].cache(ACTIVE_PASSIVE_CACHE); + + for (int i = 0; i < keysCnt; i++) { + Integer srcVal = srcCache.get(new TestKey(i, null)); + Integer destVal = destCache.get(new TestKey(i, null)); + + if (srcVal == null || !srcVal.equals(destVal)) + return false; + } + + return true; + + }, getTestTimeout())); + } + finally { + for (IgniteInternalFuture fut : futs) + fut.cancel(); + } + } + /** Replication with complex SQL key. Data inserted via SQL. */ @Test public void testActivePassiveReplicationComplexKeyWithSQL() throws Exception { diff --git a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CdcIgniteToIgniteReplicationTest.java b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CdcIgniteToIgniteReplicationTest.java index 048721a4..b6d42e24 100644 --- a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CdcIgniteToIgniteReplicationTest.java +++ b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CdcIgniteToIgniteReplicationTest.java @@ -32,6 +32,7 @@ import org.apache.ignite.internal.processors.odbc.ClientListenerProcessor; import org.apache.ignite.spi.metric.jmx.JmxMetricExporterSpi; import org.apache.ignite.spi.systemview.view.SystemView; +import org.jetbrains.annotations.Nullable; import org.junit.Test; import static org.apache.ignite.cdc.AbstractIgniteCdcStreamer.EVTS_SENT_CNT; @@ -47,7 +48,7 @@ public class CdcIgniteToIgniteReplicationTest extends AbstractReplicationTest { List> futs = new ArrayList<>(); for (int i = 0; i < srcCluster.length; i++) - futs.add(igniteToIgnite(srcCluster[i].configuration(), destClusterCliCfg[i], destCluster, cache)); + futs.add(igniteToIgnite(srcCluster[i].configuration(), destClusterCliCfg[i], destCluster, cache, "ignite-to-ignite-src-" + i)); return futs; } @@ -56,11 +57,15 @@ public class CdcIgniteToIgniteReplicationTest extends AbstractReplicationTest { @Override protected List> startActiveActiveCdc() { List> futs = new ArrayList<>(); - for (int i = 0; i < srcCluster.length; i++) - futs.add(igniteToIgnite(srcCluster[i].configuration(), destClusterCliCfg[i], destCluster, ACTIVE_ACTIVE_CACHE)); + for (int i = 0; i < srcCluster.length; i++) { + futs.add(igniteToIgnite( + srcCluster[i].configuration(), destClusterCliCfg[i], destCluster, ACTIVE_ACTIVE_CACHE, "ignite-to-ignite-src-" + i)); + } - for (int i = 0; i < destCluster.length; i++) - futs.add(igniteToIgnite(destCluster[i].configuration(), srcClusterCliCfg[i], srcCluster, ACTIVE_ACTIVE_CACHE)); + for (int i = 0; i < destCluster.length; i++) { + futs.add(igniteToIgnite( + destCluster[i].configuration(), srcClusterCliCfg[i], srcCluster, ACTIVE_ACTIVE_CACHE, "ignite-to-ignite-dest-" + i)); + } return futs; } @@ -81,13 +86,15 @@ public class CdcIgniteToIgniteReplicationTest extends AbstractReplicationTest { * @param destCfg Ignite destination cluster configuration. * @param dest Ignite destination cluster. * @param cache Cache name to stream to kafka. + * @param threadName Thread to run CDC instance. * @return Future for Change Data Capture application. */ protected IgniteInternalFuture igniteToIgnite( IgniteConfiguration srcCfg, IgniteConfiguration destCfg, IgniteEx[] dest, - String cache + String cache, + @Nullable String threadName ) { return runAsync(() -> { CdcConfiguration cdcCfg = new CdcConfiguration(); @@ -117,7 +124,7 @@ protected IgniteInternalFuture igniteToIgnite( cdcs.add(cdc); cdc.run(); - }); + }, threadName); } /** */ diff --git a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationAppsTest.java b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationAppsTest.java index 96f1d242..927a7b25 100644 --- a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationAppsTest.java +++ b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationAppsTest.java @@ -112,7 +112,8 @@ public class CdcKafkaReplicationAppsTest extends CdcKafkaReplicationTest { IgniteConfiguration igniteCfg, String topic, String metadataTopic, - String cache + String cache, + String threadName ) { Map params = new HashMap<>(); @@ -127,7 +128,7 @@ public class CdcKafkaReplicationAppsTest extends CdcKafkaReplicationTest { params.put(KAFKA_REQ_TIMEOUT, Long.toString(DFLT_KAFKA_REQ_TIMEOUT)); return runAsync( - () -> CdcCommandLineStartup.main(new String[] {prepareConfig("/replication/ignite-to-kafka.xml", params)}) + () -> CdcCommandLineStartup.main(new String[] {prepareConfig("/replication/ignite-to-kafka.xml", params)}), threadName ); } @@ -139,7 +140,8 @@ public class CdcKafkaReplicationAppsTest extends CdcKafkaReplicationTest { IgniteConfiguration igniteCfg, IgniteEx[] dest, int partFrom, - int partTo + int partTo, + String threadName ) { Map params = new HashMap<>(); @@ -174,7 +176,7 @@ public class CdcKafkaReplicationAppsTest extends CdcKafkaReplicationTest { params.put(METRIC_REG_NAME, DFLT_METRICS_REG_NAME + "-" + igniteCfg.getIgniteInstanceName()); return runAsync( - () -> KafkaToIgniteCommandLineStartup.main(new String[] {prepareConfig(cfg, params)}) + () -> KafkaToIgniteCommandLineStartup.main(new String[] {prepareConfig(cfg, params)}), threadName ); } diff --git a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationTest.java b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationTest.java index 05c5154f..a56b2941 100644 --- a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationTest.java +++ b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationTest.java @@ -104,8 +104,11 @@ public class CdcKafkaReplicationTest extends AbstractReplicationTest { List> futs = new ArrayList<>(); - for (IgniteEx ex : srcCluster) - futs.add(igniteToKafka(ex.configuration(), cache, SRC_DEST_META_TOPIC, cache)); + for (IgniteEx ex : srcCluster) { + int idx = getTestIgniteInstanceIndex(ex.name()); + + futs.add(igniteToKafka(ex.configuration(), cache, SRC_DEST_META_TOPIC, cache, "ignite-src-to-kafka-" + idx)); + } for (int i = 0; i < destCluster.length; i++) { futs.add(kafkaToIgnite( @@ -115,7 +118,8 @@ public class CdcKafkaReplicationTest extends AbstractReplicationTest { destClusterCliCfg[i], destCluster, i * (DFLT_PARTS / 2), - (i + 1) * (DFLT_PARTS / 2) + (i + 1) * (DFLT_PARTS / 2), + "kafka-to-ignite-dest-" + i )); } @@ -126,11 +130,19 @@ public class CdcKafkaReplicationTest extends AbstractReplicationTest { @Override protected List> startActiveActiveCdc() { List> futs = new ArrayList<>(); - for (IgniteEx ex : srcCluster) - futs.add(igniteToKafka(ex.configuration(), SRC_DEST_TOPIC, SRC_DEST_META_TOPIC, ACTIVE_ACTIVE_CACHE)); + for (IgniteEx ex : srcCluster) { + int idx = getTestIgniteInstanceIndex(ex.name()); + + futs.add(igniteToKafka( + ex.configuration(), SRC_DEST_TOPIC, SRC_DEST_META_TOPIC, ACTIVE_ACTIVE_CACHE, "ignite-src-to-kafka-" + idx)); + } - for (IgniteEx ex : destCluster) - futs.add(igniteToKafka(ex.configuration(), DEST_SRC_TOPIC, DEST_SRC_META_TOPIC, ACTIVE_ACTIVE_CACHE)); + for (IgniteEx ex : destCluster) { + int idx = getTestIgniteInstanceIndex(ex.name()); + + futs.add(igniteToKafka( + ex.configuration(), DEST_SRC_TOPIC, DEST_SRC_META_TOPIC, ACTIVE_ACTIVE_CACHE, "ignite-dest-to-kafka-" + idx)); + } futs.add(kafkaToIgnite( ACTIVE_ACTIVE_CACHE, @@ -139,7 +151,8 @@ public class CdcKafkaReplicationTest extends AbstractReplicationTest { destClusterCliCfg[0], destCluster, 0, - DFLT_PARTS + DFLT_PARTS, + "kafka-to-ignite-src" )); futs.add(kafkaToIgnite( @@ -149,7 +162,8 @@ public class CdcKafkaReplicationTest extends AbstractReplicationTest { srcClusterCliCfg[0], srcCluster, 0, - DFLT_PARTS + DFLT_PARTS, + "kafka-to-ignite-dest" )); return futs; @@ -247,7 +261,8 @@ protected IgniteInternalFuture igniteToKafka( IgniteConfiguration igniteCfg, String topic, String metadataTopic, - String cache + String cache, + String threadName ) { return runAsync(() -> { IgniteToKafkaCdcStreamer cdcCnsmr = new IgniteToKafkaCdcStreamer() @@ -270,7 +285,7 @@ protected IgniteInternalFuture igniteToKafka( cdcs.add(cdc); cdc.run(); - }); + }, threadName); } /** @@ -286,7 +301,8 @@ protected IgniteInternalFuture kafkaToIgnite( IgniteConfiguration igniteCfg, IgniteEx[] dest, int fromPart, - int toPart + int toPart, + String threadName ) { KafkaToIgniteCdcStreamerConfiguration cfg = new KafkaToIgniteCdcStreamerConfiguration(); @@ -315,7 +331,7 @@ protected IgniteInternalFuture kafkaToIgnite( kafkaStreamers.add(streamer); - return runAsync(streamer); + return runAsync(streamer, threadName); } /** */