Skip to content

Commit

Permalink
IGNITE-23388 Use SortedMap for collecting CdcEvents (#291)
Browse files Browse the repository at this point in the history
  • Loading branch information
timoninmaxim authored Oct 18, 2024
1 parent 5b4f9de commit fcfb076
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@

package org.apache.ignite.cdc;

import java.util.HashMap;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.function.BooleanSupplier;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
Expand All @@ -36,10 +37,10 @@ public abstract class AbstractCdcEventsApplier<K, V> {
private final int maxBatchSize;

/** Update batch. */
private final Map<K, V> updBatch = new HashMap<>();
private final SortedMap<K, V> updBatch = new TreeMap<>(this::compareKey);

/** Remove batch. */
private final Map<K, GridCacheVersion> rmvBatch = new HashMap<>();
private final SortedMap<K, GridCacheVersion> rmvBatch = new TreeMap<>(this::compareKey);

/** */
private final BooleanSupplier hasUpdates = () -> !F.isEmpty(updBatch);
Expand Down Expand Up @@ -109,13 +110,12 @@ public int apply(Iterable<CdcEvent> evts) throws IgniteCheckedException {
* @param applyUpd Apply update batch flag supplier.
* @param applyRmv Apply remove batch flag supplier.
* @return Number of applied events.
* @throws IgniteCheckedException In case of error.
*/
private int applyIf(
int cacheId,
BooleanSupplier applyUpd,
BooleanSupplier applyRmv
) throws IgniteCheckedException {
) {
int evtsApplied = 0;

if (applyUpd.getAsBoolean()) {
Expand Down Expand Up @@ -151,6 +151,14 @@ private boolean isApplyBatch(Map<K, ?> map, K key) {
/** @return Key. */
protected abstract K toKey(CdcEvent evt);

/**
* Compares keys hash codes only, because bytes might not be available.
* If hash codes are equal it put {@code key2} to next batch, see {@link #isApplyBatch)}.
*/
private int compareKey(Object key1, Object key2) {
return Integer.compare(key1.hashCode(), key2.hashCode());
}

/** @return Value. */
protected abstract V toValue(int cacheId, CdcEvent evt, GridCacheVersion ver);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Function;
Expand Down Expand Up @@ -93,6 +94,7 @@
import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
import static org.apache.ignite.testframework.GridTestUtils.runAsync;
import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
import static org.junit.Assume.assumeTrue;

/** */
@RunWith(Parameterized.class)
Expand Down Expand Up @@ -310,6 +312,61 @@ public void testActivePassiveReplication() throws Exception {
}
}

/** Test that CDC instances don't lock each other while streaming mixed keys. */
@Test
public void testConcurrentMixedKeys() throws Exception {
assumeTrue(atomicity == TRANSACTIONAL);

for (IgniteEx ign: F.asList(srcCluster[0], destCluster[0])) {
ign.createCache(new CacheConfiguration<TestKey, Integer>()
.setName(ACTIVE_PASSIVE_CACHE)
.setAtomicityMode(atomicity)
.setBackups(backups)
.setCacheMode(mode));
}

List<IgniteInternalFuture<?>> futs = startActivePassiveCdc(ACTIVE_PASSIVE_CACHE);

try {
ThreadLocalRandom rnd = ThreadLocalRandom.current();
int cnt = 0;

// Setup bound for keys to increase probability to stream same keys through each CDC instance.
// The value should not be small to force CDC generates batches for putAllConflict call.
int keysCnt = 20;

while (cnt++ < 10_000) {
srcCluster[rnd.nextInt(2)]
.cache(ACTIVE_PASSIVE_CACHE)
.put(new TestKey(rnd.nextInt(keysCnt), null), rnd.nextInt());

if (cnt % 1_000 == 0)
System.out.println("Load count = " + cnt);
}

// Check that all data received.
assertTrue(waitForCondition(() -> {
IgniteCache<TestKey, Integer> srcCache = srcCluster[0].cache(ACTIVE_PASSIVE_CACHE);
IgniteCache<TestKey, Integer> destCache = destCluster[0].cache(ACTIVE_PASSIVE_CACHE);

for (int i = 0; i < keysCnt; i++) {
Integer srcVal = srcCache.get(new TestKey(i, null));
Integer destVal = destCache.get(new TestKey(i, null));

if (srcVal == null || !srcVal.equals(destVal))
return false;
}

return true;

}, getTestTimeout()));
}
finally {
for (IgniteInternalFuture<?> fut : futs)
fut.cancel();
}
}

/** Replication with complex SQL key. Data inserted via SQL. */
@Test
public void testActivePassiveReplicationComplexKeyWithSQL() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.ignite.internal.processors.odbc.ClientListenerProcessor;
import org.apache.ignite.spi.metric.jmx.JmxMetricExporterSpi;
import org.apache.ignite.spi.systemview.view.SystemView;
import org.jetbrains.annotations.Nullable;
import org.junit.Test;

import static org.apache.ignite.cdc.AbstractIgniteCdcStreamer.EVTS_SENT_CNT;
Expand All @@ -47,7 +48,7 @@ public class CdcIgniteToIgniteReplicationTest extends AbstractReplicationTest {
List<IgniteInternalFuture<?>> futs = new ArrayList<>();

for (int i = 0; i < srcCluster.length; i++)
futs.add(igniteToIgnite(srcCluster[i].configuration(), destClusterCliCfg[i], destCluster, cache));
futs.add(igniteToIgnite(srcCluster[i].configuration(), destClusterCliCfg[i], destCluster, cache, "ignite-to-ignite-src-" + i));

return futs;
}
Expand All @@ -56,11 +57,15 @@ public class CdcIgniteToIgniteReplicationTest extends AbstractReplicationTest {
@Override protected List<IgniteInternalFuture<?>> startActiveActiveCdc() {
List<IgniteInternalFuture<?>> futs = new ArrayList<>();

for (int i = 0; i < srcCluster.length; i++)
futs.add(igniteToIgnite(srcCluster[i].configuration(), destClusterCliCfg[i], destCluster, ACTIVE_ACTIVE_CACHE));
for (int i = 0; i < srcCluster.length; i++) {
futs.add(igniteToIgnite(
srcCluster[i].configuration(), destClusterCliCfg[i], destCluster, ACTIVE_ACTIVE_CACHE, "ignite-to-ignite-src-" + i));
}

for (int i = 0; i < destCluster.length; i++)
futs.add(igniteToIgnite(destCluster[i].configuration(), srcClusterCliCfg[i], srcCluster, ACTIVE_ACTIVE_CACHE));
for (int i = 0; i < destCluster.length; i++) {
futs.add(igniteToIgnite(
destCluster[i].configuration(), srcClusterCliCfg[i], srcCluster, ACTIVE_ACTIVE_CACHE, "ignite-to-ignite-dest-" + i));
}

return futs;
}
Expand All @@ -81,13 +86,15 @@ public class CdcIgniteToIgniteReplicationTest extends AbstractReplicationTest {
* @param destCfg Ignite destination cluster configuration.
* @param dest Ignite destination cluster.
* @param cache Cache name to stream to kafka.
* @param threadName Thread to run CDC instance.
* @return Future for Change Data Capture application.
*/
protected IgniteInternalFuture<?> igniteToIgnite(
IgniteConfiguration srcCfg,
IgniteConfiguration destCfg,
IgniteEx[] dest,
String cache
String cache,
@Nullable String threadName
) {
return runAsync(() -> {
CdcConfiguration cdcCfg = new CdcConfiguration();
Expand Down Expand Up @@ -117,7 +124,7 @@ protected IgniteInternalFuture<?> igniteToIgnite(
cdcs.add(cdc);

cdc.run();
});
}, threadName);
}

/** */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ public class CdcKafkaReplicationAppsTest extends CdcKafkaReplicationTest {
IgniteConfiguration igniteCfg,
String topic,
String metadataTopic,
String cache
String cache,
String threadName
) {
Map<String, String> params = new HashMap<>();

Expand All @@ -127,7 +128,7 @@ public class CdcKafkaReplicationAppsTest extends CdcKafkaReplicationTest {
params.put(KAFKA_REQ_TIMEOUT, Long.toString(DFLT_KAFKA_REQ_TIMEOUT));

return runAsync(
() -> CdcCommandLineStartup.main(new String[] {prepareConfig("/replication/ignite-to-kafka.xml", params)})
() -> CdcCommandLineStartup.main(new String[] {prepareConfig("/replication/ignite-to-kafka.xml", params)}), threadName
);
}

Expand All @@ -139,7 +140,8 @@ public class CdcKafkaReplicationAppsTest extends CdcKafkaReplicationTest {
IgniteConfiguration igniteCfg,
IgniteEx[] dest,
int partFrom,
int partTo
int partTo,
String threadName
) {
Map<String, String> params = new HashMap<>();

Expand Down Expand Up @@ -174,7 +176,7 @@ public class CdcKafkaReplicationAppsTest extends CdcKafkaReplicationTest {
params.put(METRIC_REG_NAME, DFLT_METRICS_REG_NAME + "-" + igniteCfg.getIgniteInstanceName());

return runAsync(
() -> KafkaToIgniteCommandLineStartup.main(new String[] {prepareConfig(cfg, params)})
() -> KafkaToIgniteCommandLineStartup.main(new String[] {prepareConfig(cfg, params)}), threadName
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,11 @@ public class CdcKafkaReplicationTest extends AbstractReplicationTest {

List<IgniteInternalFuture<?>> futs = new ArrayList<>();

for (IgniteEx ex : srcCluster)
futs.add(igniteToKafka(ex.configuration(), cache, SRC_DEST_META_TOPIC, cache));
for (IgniteEx ex : srcCluster) {
int idx = getTestIgniteInstanceIndex(ex.name());

futs.add(igniteToKafka(ex.configuration(), cache, SRC_DEST_META_TOPIC, cache, "ignite-src-to-kafka-" + idx));
}

for (int i = 0; i < destCluster.length; i++) {
futs.add(kafkaToIgnite(
Expand All @@ -115,7 +118,8 @@ public class CdcKafkaReplicationTest extends AbstractReplicationTest {
destClusterCliCfg[i],
destCluster,
i * (DFLT_PARTS / 2),
(i + 1) * (DFLT_PARTS / 2)
(i + 1) * (DFLT_PARTS / 2),
"kafka-to-ignite-dest-" + i
));
}

Expand All @@ -126,11 +130,19 @@ public class CdcKafkaReplicationTest extends AbstractReplicationTest {
@Override protected List<IgniteInternalFuture<?>> startActiveActiveCdc() {
List<IgniteInternalFuture<?>> futs = new ArrayList<>();

for (IgniteEx ex : srcCluster)
futs.add(igniteToKafka(ex.configuration(), SRC_DEST_TOPIC, SRC_DEST_META_TOPIC, ACTIVE_ACTIVE_CACHE));
for (IgniteEx ex : srcCluster) {
int idx = getTestIgniteInstanceIndex(ex.name());

futs.add(igniteToKafka(
ex.configuration(), SRC_DEST_TOPIC, SRC_DEST_META_TOPIC, ACTIVE_ACTIVE_CACHE, "ignite-src-to-kafka-" + idx));
}

for (IgniteEx ex : destCluster)
futs.add(igniteToKafka(ex.configuration(), DEST_SRC_TOPIC, DEST_SRC_META_TOPIC, ACTIVE_ACTIVE_CACHE));
for (IgniteEx ex : destCluster) {
int idx = getTestIgniteInstanceIndex(ex.name());

futs.add(igniteToKafka(
ex.configuration(), DEST_SRC_TOPIC, DEST_SRC_META_TOPIC, ACTIVE_ACTIVE_CACHE, "ignite-dest-to-kafka-" + idx));
}

futs.add(kafkaToIgnite(
ACTIVE_ACTIVE_CACHE,
Expand All @@ -139,7 +151,8 @@ public class CdcKafkaReplicationTest extends AbstractReplicationTest {
destClusterCliCfg[0],
destCluster,
0,
DFLT_PARTS
DFLT_PARTS,
"kafka-to-ignite-src"
));

futs.add(kafkaToIgnite(
Expand All @@ -149,7 +162,8 @@ public class CdcKafkaReplicationTest extends AbstractReplicationTest {
srcClusterCliCfg[0],
srcCluster,
0,
DFLT_PARTS
DFLT_PARTS,
"kafka-to-ignite-dest"
));

return futs;
Expand Down Expand Up @@ -247,7 +261,8 @@ protected IgniteInternalFuture<?> igniteToKafka(
IgniteConfiguration igniteCfg,
String topic,
String metadataTopic,
String cache
String cache,
String threadName
) {
return runAsync(() -> {
IgniteToKafkaCdcStreamer cdcCnsmr = new IgniteToKafkaCdcStreamer()
Expand All @@ -270,7 +285,7 @@ protected IgniteInternalFuture<?> igniteToKafka(
cdcs.add(cdc);

cdc.run();
});
}, threadName);
}

/**
Expand All @@ -286,7 +301,8 @@ protected IgniteInternalFuture<?> kafkaToIgnite(
IgniteConfiguration igniteCfg,
IgniteEx[] dest,
int fromPart,
int toPart
int toPart,
String threadName
) {
KafkaToIgniteCdcStreamerConfiguration cfg = new KafkaToIgniteCdcStreamerConfiguration();

Expand Down Expand Up @@ -315,7 +331,7 @@ protected IgniteInternalFuture<?> kafkaToIgnite(

kafkaStreamers.add(streamer);

return runAsync(streamer);
return runAsync(streamer, threadName);
}

/** */
Expand Down

0 comments on commit fcfb076

Please sign in to comment.