Skip to content

Commit

Permalink
IGNITE-22536 Added conflict resolver plugin metrics (#278)
Browse files Browse the repository at this point in the history
  • Loading branch information
maksaska authored Jul 4, 2024
1 parent 49cec55 commit f7e4e3f
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 27 deletions.
13 changes: 13 additions & 0 deletions docs/_docs/cdc/change-data-capture-extensions.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,19 @@ Conflict resolution field should contain user provided monotonically increasing
. If `conflictResolveField` if provided then field values comparison used to determine order.
. Conflict resolution failed. Update will be ignored.

=== Conflict Resolver Metrics

The Ignite's built-in `CacheVersionConflictResolverPluginProvider` provides the following metrics:

[cols="35%,65%",opts="header"]
|===
|Name |Description
| `AcceptedCount` | Count of accepted entries.
| `RejectedCount` | Count of rejected entries.
|===

These metrics are registered under `conflict-resolver` registry for each node configured with this plugin.

=== Configuration example
Configuration is done via Ignite node plugin:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.ignite.internal.processors.cache.CacheConflictResolutionManager;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.version.CacheVersionConflictResolver;
import org.apache.ignite.internal.processors.metric.MetricRegistryImpl;
import org.apache.ignite.lang.IgniteFuture;

/**
Expand All @@ -30,6 +31,9 @@
* @see CacheVersionConflictResolver
*/
public class CacheConflictResolutionManagerImpl<K, V> implements CacheConflictResolutionManager<K, V> {
/** Conflict resolver metrics registry name. */
public static final String CONFLICT_RESOLVER_METRICS_REGISTRY_NAME = "conflict-resolver";

/** Logger. */
private IgniteLogger log;

Expand Down Expand Up @@ -72,20 +76,24 @@ public CacheConflictResolutionManagerImpl(
@Override public CacheVersionConflictResolver conflictResolver() {
CacheVersionConflictResolver rslvr;

MetricRegistryImpl mreg = cctx.grid().context().metric().registry(CONFLICT_RESOLVER_METRICS_REGISTRY_NAME);

if (resolver != null)
rslvr = resolver;
else if (conflictResolverLog.isDebugEnabled()) {
rslvr = new DebugCacheVersionConflictResolverImpl(
clusterId,
conflictResolveField,
conflictResolverLog
conflictResolverLog,
mreg
);
}
else {
rslvr = new CacheVersionConflictResolverImpl(
clusterId,
conflictResolveField,
conflictResolverLog
conflictResolverLog,
mreg
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.ignite.internal.processors.cache.version.CacheVersionConflictResolver;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx;
import org.apache.ignite.internal.processors.metric.MetricRegistryImpl;
import org.apache.ignite.internal.processors.metric.impl.LongAdderMetric;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
Expand All @@ -42,6 +44,18 @@
* </ul>
*/
public class CacheVersionConflictResolverImpl implements CacheVersionConflictResolver {
/** Accepted entries count name. */
public static final String ACCEPTED_EVENTS_CNT = "AcceptedCount";

/** Accepted entries count description. */
public static final String ACCEPTED_EVENTS_CNT_DESC = "Count of accepted entries";

/** Rejected entries count name. */
public static final String REJECTED_EVENTS_CNT = "RejectedCount";

/** Rejected entries count description. */
public static final String REJECTED_EVENTS_CNT_DESC = "Count of rejected entries";

/**
* Cluster id.
*/
Expand All @@ -66,17 +80,32 @@ public class CacheVersionConflictResolverImpl implements CacheVersionConflictRes
@GridToStringInclude
protected final boolean conflictResolveFieldEnabled;

/** Accepted entries count. */
private final LongAdderMetric acceptedCnt;

/** Rejected entries count. */
private final LongAdderMetric rejectedCnt;

/**
* @param clusterId Data center id.
* @param conflictResolveField Field to resolve conflicts.
* @param log Logger.
* @param mreg Metric registry.
*/
public CacheVersionConflictResolverImpl(byte clusterId, String conflictResolveField, IgniteLogger log) {
public CacheVersionConflictResolverImpl(
byte clusterId,
String conflictResolveField,
IgniteLogger log,
MetricRegistryImpl mreg
) {
this.clusterId = clusterId;
this.conflictResolveField = conflictResolveField;
this.log = log;

conflictResolveFieldEnabled = conflictResolveField != null;

acceptedCnt = mreg.longAdderMetric(ACCEPTED_EVENTS_CNT, ACCEPTED_EVENTS_CNT_DESC);
rejectedCnt = mreg.longAdderMetric(REJECTED_EVENTS_CNT, REJECTED_EVENTS_CNT_DESC);
}

/** {@inheritDoc} */
Expand All @@ -90,10 +119,14 @@ public CacheVersionConflictResolverImpl(byte clusterId, String conflictResolveFi

boolean useNew = isUseNew(ctx, oldEntry, newEntry);

if (useNew)
if (useNew) {
res.useNew();
else
acceptedCnt.increment();
}
else {
res.useOld();
rejectedCnt.increment();
}

return res;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx;
import org.apache.ignite.internal.processors.metric.MetricRegistryImpl;
import org.apache.ignite.internal.util.typedef.internal.S;

/** Debug aware resolver. */
Expand All @@ -28,9 +29,15 @@ public class DebugCacheVersionConflictResolverImpl extends CacheVersionConflictR
* @param clusterId Data center id.
* @param conflictResolveField Field to resolve conflicts.
* @param log Logger.
* @param mreg Metric registry.
*/
public DebugCacheVersionConflictResolverImpl(byte clusterId, String conflictResolveField, IgniteLogger log) {
super(clusterId, conflictResolveField, log);
public DebugCacheVersionConflictResolverImpl(
byte clusterId,
String conflictResolveField,
IgniteLogger log,
MetricRegistryImpl mreg
) {
super(clusterId, conflictResolveField, log, mreg);
}

/** {@inheritDoc} */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.metric.MetricRegistryImpl;
import org.apache.ignite.internal.processors.metric.impl.LongAdderMetric;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand All @@ -49,6 +51,9 @@
import static java.util.Collections.singletonMap;
import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.cdc.conflictresolve.CacheConflictResolutionManagerImpl.CONFLICT_RESOLVER_METRICS_REGISTRY_NAME;
import static org.apache.ignite.cdc.conflictresolve.CacheVersionConflictResolverImpl.ACCEPTED_EVENTS_CNT;
import static org.apache.ignite.cdc.conflictresolve.CacheVersionConflictResolverImpl.REJECTED_EVENTS_CNT;

/**
* Cache conflict operations test.
Expand Down Expand Up @@ -76,22 +81,25 @@ public static Collection<?> parameters() {
}

/** */
private static IgniteCache<String, ConflictResolvableTestData> cache;
private static final byte FIRST_CLUSTER_ID = 1;

/** */
private static IgniteInternalCache<BinaryObject, BinaryObject> cachex;
private static final byte SECOND_CLUSTER_ID = 2;

/** */
private static IgniteEx client;
private static final byte THIRD_CLUSTER_ID = 3;

/** */
private static final byte FIRST_CLUSTER_ID = 1;
private IgniteCache<String, ConflictResolvableTestData> cache;

/** */
private static final byte SECOND_CLUSTER_ID = 2;
private IgniteInternalCache<BinaryObject, BinaryObject> cachex;

/** */
private static final byte THIRD_CLUSTER_ID = 3;
private IgniteEx client;

/** */
private IgniteEx ign;

/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
Expand All @@ -104,24 +112,14 @@ public static Collection<?> parameters() {
return super.getConfiguration(igniteInstanceName).setPluginProviders(pluginCfg);
}

/** {@inheritDoc} */
@Override protected void beforeTestsStarted() throws Exception {
startGrid(1);

client = startClientGrid(2);
}

/** {@inheritDoc} */
@Override protected void afterTestsStopped() {
cache = null;
cachex = null;
client = null;
}

/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
super.beforeTest();

ign = startGrid(1);

client = startClientGrid(2);

if (cachex == null || cachex.configuration().getAtomicityMode() != cacheMode) {
if (cachex != null)
client.cache(DEFAULT_CACHE_NAME).destroy();
Expand All @@ -133,6 +131,11 @@ public static Collection<?> parameters() {
}
}

/** {@inheritDoc} */
@Override protected void afterTest() {
stopAllGrids();
}

/** Tests that regular cache operations works with the conflict resolver when there is no update conflicts. */
@Test
public void testSimpleUpdates() {
Expand Down Expand Up @@ -197,6 +200,8 @@ public void testUpdatesReorderFromOtherCluster() throws Exception {
// Remove with the higher topVer should succeed.
putConflict(key, new GridCacheVersion(3, order, 1, otherClusterId), true);

checkMetrics(4, 8);

key = key("UpdateClusterUpdateReorder3", otherClusterId);

int topVer = 1;
Expand All @@ -207,12 +212,16 @@ public void testUpdatesReorderFromOtherCluster() throws Exception {
putConflict(key, new GridCacheVersion(topVer, order, 2, otherClusterId), false);
putConflict(key, new GridCacheVersion(topVer, order, 1, otherClusterId), false);

checkMetrics(5, 10);

// Remove with the equal or lower nodeOrder should ignored.
removeConflict(key, new GridCacheVersion(topVer, order, 2, otherClusterId), false);
removeConflict(key, new GridCacheVersion(topVer, order, 1, otherClusterId), false);

// Remove with the higher nodeOrder should succeed.
putConflict(key, new GridCacheVersion(topVer, order, 3, otherClusterId), true);

checkMetrics(6, 12);
}

/** Tests cache operations for entry replicated from another cluster. */
Expand Down Expand Up @@ -334,4 +343,15 @@ private String key(String key, byte otherClusterId) {
protected String conflictResolveField() {
return null;
}

/** Checks metrics for conflict resolver. */
protected void checkMetrics(int acceptedCnt, int rejectedCnt) {
MetricRegistryImpl mreg = ign.context().metric().registry(CONFLICT_RESOLVER_METRICS_REGISTRY_NAME);

assertNotNull(mreg.findMetric(ACCEPTED_EVENTS_CNT));
assertNotNull(mreg.findMetric(REJECTED_EVENTS_CNT));

assertEquals(acceptedCnt, ((LongAdderMetric)mreg.findMetric(ACCEPTED_EVENTS_CNT)).value());
assertEquals(rejectedCnt, ((LongAdderMetric)mreg.findMetric(REJECTED_EVENTS_CNT)).value());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,9 @@ private static final class LwwConflictResolver implements CacheVersionConflictRe
return res;
}
}

/** {@inheritDoc} */
@Override protected void checkMetrics(int acceptedCnt, int rejectedCnt) {
// No op.
}
}

0 comments on commit f7e4e3f

Please sign in to comment.