Skip to content

Commit

Permalink
IGNITE-21285 Data entries looped in CDC if ExpirePolicy is set (#255)
Browse files Browse the repository at this point in the history
  • Loading branch information
NSAmelchev authored Feb 21, 2024
1 parent 1854a17 commit ee9488d
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;

Expand Down Expand Up @@ -89,30 +88,9 @@ public CacheVersionConflictResolverImpl(byte clusterId, String conflictResolveFi
) {
GridCacheVersionConflictContext<K, V> res = new GridCacheVersionConflictContext<>(ctx, oldEntry, newEntry);

boolean expireExists = oldEntry.ttl() != CU.TTL_ETERNAL
|| newEntry.ttl() != CU.TTL_ETERNAL
|| oldEntry.expireTime() != CU.EXPIRE_TIME_ETERNAL
|| newEntry.expireTime() != CU.EXPIRE_TIME_ETERNAL;

boolean useNew = isUseNew(ctx, oldEntry, newEntry);

if (expireExists) {
if (newEntry.expireTime() > oldEntry.expireTime()) {
res.merge(
useNew ? newEntry.value(ctx) : oldEntry.value(ctx),
newEntry.ttl(),
newEntry.expireTime()
);
}
else {
res.merge(
useNew ? newEntry.value(ctx) : oldEntry.value(ctx),
oldEntry.ttl(),
oldEntry.expireTime()
);
}
}
else if (useNew)
if (useNew)
res.useNew();
else
res.useOld();
Expand Down Expand Up @@ -140,8 +118,18 @@ protected <K, V> boolean isUseNew(
if (oldEntry.isStartVersion()) // Entry absent (new entry).
return true;

if (oldEntry.dataCenterId() == newEntry.dataCenterId())
return newEntry.version().compareTo(oldEntry.version()) > 0; // New version from the same cluster.
if (oldEntry.dataCenterId() == newEntry.dataCenterId()) {
int cmp = newEntry.version().compareTo(oldEntry.version());

// Ignite sets the expire time to zero on backups for transaction caches.
// If CDC is running in onlyPrimary=false mode, then the updates from backups may be applied first.
// In this case, a new entry from the primary node should be used to set the expiration time.
// See GridDistributedTxRemoteAdapter#commitIfLocked
if (cmp == 0)
return newEntry.expireTime() > oldEntry.expireTime();

return cmp > 0; // New version from the same cluster.
}

if (conflictResolveFieldEnabled) {
Object oldVal = oldEntry.value(ctx);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import javax.cache.expiry.ExpiryPolicy;
import javax.management.DynamicMBean;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
Expand All @@ -49,11 +50,18 @@
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.cdc.CdcMain;
import org.apache.ignite.internal.pagemem.wal.WALIterator;
import org.apache.ignite.internal.pagemem.wal.record.DataEntry;
import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx;
import org.apache.ignite.internal.processors.metric.MetricRegistry;
import org.apache.ignite.internal.processors.odbc.ClientListenerProcessor;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
Expand Down Expand Up @@ -258,6 +266,8 @@ private IgniteBiTuple<IgniteEx[], IgniteConfiguration[]> setupCluster(

/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
checkNoLocalUpdatesOnPassiveCluster();

stopAllGrids();

cleanPersistenceDir();
Expand Down Expand Up @@ -573,6 +583,34 @@ private List<List<?>> executeSql(IgniteEx node, String sqlText, Object... args)
return node.context().query().querySqlFields(new SqlFieldsQuery(sqlText).setArgs(args), true).getAll();
}

/** */
private void checkNoLocalUpdatesOnPassiveCluster() throws IgniteCheckedException {
if (srcCluster[0].cache(ACTIVE_PASSIVE_CACHE) == null)
return;

assertTrue(hasLocalUpdates(srcCluster));
assertFalse(hasLocalUpdates(destCluster));
}

/** @return {@code True} if cluster has local updates. */
private boolean hasLocalUpdates(IgniteEx[] cluster) throws IgniteCheckedException {
for (IgniteEx srv : cluster) {
WALIterator iter = srv.context().cache().context().wal().replay(null,
(type, ptr) -> type == WALRecord.RecordType.DATA_RECORD_V2);

for (IgniteBiTuple<WALPointer, WALRecord> t : iter) {
Collection<DataEntry> locUpdates = F.view(((DataRecord)t.get2()).writeEntries(),
e -> e.cacheId() == CU.cacheId(ACTIVE_PASSIVE_CACHE),
e -> !(e.writeVersion() instanceof GridCacheVersionEx));

if (!locUpdates.isEmpty())
return true;
}
}

return false;
}

/** @return Destination cluster host addresses. */
protected String[] hostAddresses(IgniteEx[] dest) {
String[] addrs = new String[dest.length];
Expand Down

0 comments on commit ee9488d

Please sign in to comment.