Skip to content

Commit

Permalink
IGNITE-18372 Add CDC metrics to KafkaToIgniteStreamer - Fixes #277.
Browse files Browse the repository at this point in the history
Signed-off-by: Aleksey Plekhanov <plehanov.alex@gmail.com>
  • Loading branch information
Maksim Davydov authored and alex-plekhanov committed Jul 16, 2024
1 parent e84b926 commit c3813f7
Show file tree
Hide file tree
Showing 13 changed files with 596 additions and 74 deletions.
30 changes: 25 additions & 5 deletions docs/_docs/cdc/change-data-capture-extensions.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,13 @@ image:images/CDC-design.svg[]

== Metrics

[cols="25%,75%",opts="header"]
|===
|Name |Description
| `EventsCount` | Count of messages applied to destination cluster.
| `LastEventTime` | Timestamp of last applied event.
| `LastEventTime` | Timestamp of last applied event to destination cluster.
| `TypesCount` | Count of binary types events applied to destination cluster.
| `MappingsCount` | Count of mappings events applied to destination cluster
|===

== CDC replication using Kafka
Expand Down Expand Up @@ -78,11 +81,15 @@ NOTE: Instances of `ignite-cdc.sh` with configured streamer should be started on

=== IgniteToKafkaCdcStreamer Metrics

[cols="30%,70%",opts="header"]
|===
|Name |Description
| `EventsCount` | Count of messages applied to destination cluster.
| `LastEventTime` | Timestamp of last applied event.
| `BytesSent` | Number of bytes send to Kafka.
| `EventsCount` | Count of messages sent to Kafka.
| `LastEventTime` | Timestamp of last sent event to Kafka.
| `TypesCount` | Count of binary types events sent to Kafka.
| `MappingsCount` | Count of mappings events sent to Kafka.
| `BytesSent` | Count of bytes sent to Kafka.
| `MarkersCount` | Count of metadata markers sent to Kafka.
|===

=== `kafka-to-ignite.sh` application
Expand Down Expand Up @@ -122,7 +129,7 @@ Kafka to ignite configuration file should contain the following beans that will
. `java.util.Properties` bean with the name `kafkaProperties`: Single Kafka consumer configuration.
. `org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration` bean: Options specific to `kafka-to-ignite.sh` application.

[cols="20%,45%,35%",opts="header"]
[cols="25%,45%,30%",opts="header"]
|===
|Name |Description | Default value
| `caches` | Set of cache names to replicate. | null
Expand All @@ -132,6 +139,19 @@ Kafka to ignite configuration file should contain the following beans that will
| `kafkaRequestTimeout` | Kafka request timeout in milliseconds. | `3000`
| `maxBatchSize` | Maximum number of events to be sent to destination cluster in a single batch. | 1024
| `threadCount` | Count of threads to proceed consumers. Each thread poll records from dedicated partitions in round-robin manner. | 16
|`metricRegistryName`| Name for metric registry. `org.apache.metricRegistryName.cdc.applier` | cdc-kafka-to-ignite
|===

=== Metrics

[cols="35%,65%",opts="header"]
|===
|Name |Description
| `EventsReceivedCount` | Count of events received from Kafka.
| `LastEventReceivedTime` | Timestamp of last received event from Kafka.
| `EventsSentCount` | Count of events sent to destination cluster.
| `LastBatchSentTime` | Timestamp of last sent batch to the destination cluster.
| `MarkersCount` | Count of metadata markers received from Kafka.
|===

==== Logging
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,28 +44,28 @@
*/
public abstract class AbstractIgniteCdcStreamer implements CdcConsumer {
/** */
public static final String EVTS_CNT = "EventsCount";
public static final String EVTS_SENT_CNT = "EventsCount";

/** */
public static final String TYPES_CNT = "TypesCount";
public static final String EVTS_SENT_CNT_DESC = "Count of messages applied to destination cluster";

/** */
public static final String MAPPINGS_CNT = "MappingsCount";
public static final String TYPES_SENT_CNT = "TypesCount";

/** */
public static final String EVTS_CNT_DESC = "Count of messages applied to destination cluster";
public static final String TYPES_SENT_CNT_DESC = "Count of binary types events applied to destination cluster";

/** */
public static final String TYPES_CNT_DESC = "Count of received binary types events";
public static final String MAPPINGS_SENT_CNT = "MappingsCount";

/** */
public static final String MAPPINGS_CNT_DESC = "Count of received mappings events";
public static final String MAPPINGS_SENT_CNT_DESC = "Count of mappings events applied to destination cluster";

/** */
public static final String LAST_EVT_TIME = "LastEventTime";
public static final String LAST_EVT_SENT_TIME = "LastEventTime";

/** */
public static final String LAST_EVT_TIME_DESC = "Timestamp of last applied event";
public static final String LAST_EVT_SENT_TIME_DESC = "Timestamp of last applied event to destination cluster";

/** Handle only primary entry flag. */
private boolean onlyPrimary = DFLT_IS_ONLY_PRIMARY;
Expand Down Expand Up @@ -109,10 +109,10 @@ public abstract class AbstractIgniteCdcStreamer implements CdcConsumer {

MetricRegistryImpl mreg = (MetricRegistryImpl)reg;

this.evtsCnt = mreg.longMetric(EVTS_CNT, EVTS_CNT_DESC);
this.typesCnt = mreg.longMetric(TYPES_CNT, TYPES_CNT_DESC);
this.mappingsCnt = mreg.longMetric(MAPPINGS_CNT, MAPPINGS_CNT_DESC);
this.lastEvtTs = mreg.longMetric(LAST_EVT_TIME, LAST_EVT_TIME_DESC);
this.evtsCnt = mreg.longMetric(EVTS_SENT_CNT, EVTS_SENT_CNT_DESC);
this.typesCnt = mreg.longMetric(TYPES_SENT_CNT, TYPES_SENT_CNT_DESC);
this.mappingsCnt = mreg.longMetric(MAPPINGS_SENT_CNT, MAPPINGS_SENT_CNT_DESC);
this.lastEvtTs = mreg.longMetric(LAST_EVT_SENT_TIME, LAST_EVT_SENT_TIME_DESC);
}

/** {@inheritDoc} */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ abstract class AbstractKafkaToIgniteCdcStreamer implements Runnable {
/** */
protected IgniteLogger log;

/** CDC kafka to ignite metrics */
private KafkaToIgniteMetrics metrics;

/**
* @param kafkaProps Kafka properties.
* @param streamerCfg Streamer configuration.
Expand Down Expand Up @@ -127,7 +130,18 @@ public AbstractKafkaToIgniteCdcStreamer(Properties kafkaProps, KafkaToIgniteCdcS

ackAsciiLogo(log);

runx();
metrics = KafkaToIgniteMetrics.startMetrics(log, streamerCfg);

try {
runx();
}
finally {
if (metrics != null)
metrics.stopMetrics();

if (log.isInfoEnabled())
log.info("Ignite Change Data Capture Application stopped.");
}
}
catch (Exception e) {
throw new IgniteException(e);
Expand Down Expand Up @@ -161,15 +175,13 @@ protected void runAppliers() {
() -> eventsApplier(),
log,
kafkaProps,
streamerCfg.getTopic(),
streamerCfg,
parts.get1(), // kafkaPartFrom
parts.get2(), // kafkaPartTo
caches,
streamerCfg.getMaxBatchSize(),
streamerCfg.getKafkaRequestTimeout(),
streamerCfg.getKafkaConsumerPollTimeout(),
metaUpdr,
stopped
stopped,
metrics
);

addAndStart("applier-thread-" + cntr++, applier);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,6 @@
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.IntegerSerializer;

import static org.apache.ignite.cdc.IgniteToIgniteCdcStreamer.EVTS_CNT;
import static org.apache.ignite.cdc.IgniteToIgniteCdcStreamer.EVTS_CNT_DESC;
import static org.apache.ignite.cdc.IgniteToIgniteCdcStreamer.LAST_EVT_TIME;
import static org.apache.ignite.cdc.IgniteToIgniteCdcStreamer.LAST_EVT_TIME_DESC;
import static org.apache.ignite.cdc.IgniteToIgniteCdcStreamer.MAPPINGS_CNT;
import static org.apache.ignite.cdc.IgniteToIgniteCdcStreamer.MAPPINGS_CNT_DESC;
import static org.apache.ignite.cdc.IgniteToIgniteCdcStreamer.TYPES_CNT;
import static org.apache.ignite.cdc.IgniteToIgniteCdcStreamer.TYPES_CNT_DESC;
import static org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration.DFLT_KAFKA_REQ_TIMEOUT;
import static org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration.DFLT_MAX_BATCH_SIZE;
import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
Expand All @@ -87,14 +79,44 @@
*/
@IgniteExperimental
public class IgniteToKafkaCdcStreamer implements CdcConsumer {
/** Default value for the flag that indicates whether entries only from primary nodes should be handled. */
public static final boolean DFLT_IS_ONLY_PRIMARY = false;
/** */
public static final String EVTS_SENT_CNT = "EventsCount";

/** */
public static final String EVTS_SENT_CNT_DESC = "Count of messages sent to Kafka";

/** */
public static final String TYPES_SENT_CNT = "TypesCount";

/** */
public static final String TYPES_SENT_CNT_DESC = "Count of binary types events sent to Kafka";

/** */
public static final String MAPPINGS_SENT_CNT = "MappingsCount";

/** */
public static final String MAPPINGS_SENT_CNT_DESC = "Count of mappings events sent to Kafka";

/** */
public static final String LAST_EVT_SENT_TIME = "LastEventTime";

/** */
public static final String LAST_EVT_SENT_TIME_DESC = "Timestamp of last sent event to Kafka";

/** Bytes sent metric name. */
public static final String BYTES_SENT = "BytesSent";
public static final String BYTES_SENT_CNT = "BytesSent";

/** Bytes sent metric description. */
public static final String BYTES_SENT_DESCRIPTION = "Count of bytes sent.";
public static final String BYTES_SENT_DESC = "Count of bytes sent to Kafka";

/** Count of metadata markers sent name. */
public static final String MARKERS_SENT_CNT = "MarkersCount";

/** Count of metadata markers sent description. */
public static final String MARKERS_SENT_CNT_DESC = "Count of metadata markers sent to Kafka";

/** Default value for the flag that indicates whether entries only from primary nodes should be handled. */
public static final boolean DFLT_IS_ONLY_PRIMARY = false;

/** Metadata updater marker. */
public static final byte[] META_UPDATE_MARKER = U.longToBytes(0xC0FF1E);
Expand Down Expand Up @@ -148,6 +170,9 @@ public class IgniteToKafkaCdcStreamer implements CdcConsumer {
/** Count of sent mappings. */
protected AtomicLongMetric mappingsCnt;

/** Count of sent markers. */
protected AtomicLongMetric markersCnt;

/** */
private List<Future<RecordMetadata>> futs;

Expand Down Expand Up @@ -239,7 +264,7 @@ private void sendMetaUpdatedMarkers() {
sendAll(
IntStream.range(0, kafkaParts).iterator(),
p -> new ProducerRecord<>(evtTopic, p, null, META_UPDATE_MARKER),
evtsCnt
markersCnt
);

if (log.isDebugEnabled())
Expand Down Expand Up @@ -328,11 +353,12 @@ private <T> void sendOneBatch(

MetricRegistryImpl mreg = (MetricRegistryImpl)reg;

this.evtsCnt = mreg.longMetric(EVTS_CNT, EVTS_CNT_DESC);
this.lastMsgTs = mreg.longMetric(LAST_EVT_TIME, LAST_EVT_TIME_DESC);
this.bytesSnt = mreg.longMetric(BYTES_SENT, BYTES_SENT_DESCRIPTION);
this.typesCnt = mreg.longMetric(TYPES_CNT, TYPES_CNT_DESC);
this.mappingsCnt = mreg.longMetric(MAPPINGS_CNT, MAPPINGS_CNT_DESC);
this.evtsCnt = mreg.longMetric(EVTS_SENT_CNT, EVTS_SENT_CNT_DESC);
this.lastMsgTs = mreg.longMetric(LAST_EVT_SENT_TIME, LAST_EVT_SENT_TIME_DESC);
this.bytesSnt = mreg.longMetric(BYTES_SENT_CNT, BYTES_SENT_DESC);
this.typesCnt = mreg.longMetric(TYPES_SENT_CNT, TYPES_SENT_CNT_DESC);
this.mappingsCnt = mreg.longMetric(MAPPINGS_SENT_CNT, MAPPINGS_SENT_CNT_DESC);
this.markersCnt = mreg.longMetric(MARKERS_SENT_CNT, MARKERS_SENT_CNT_DESC);

futs = new ArrayList<>(maxBatchSz);
}
Expand Down Expand Up @@ -428,7 +454,7 @@ public IgniteToKafkaCdcStreamer setKafkaProperties(Properties kafkaProps) {

/**
* Sets the maximum time to complete Kafka related requests, in milliseconds.
*
*
* @param kafkaReqTimeout Timeout value.
* @return {@code this} for chaining.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,45 +125,45 @@ class KafkaToIgniteCdcStreamerApplier implements Runnable, AutoCloseable {
/** Cdc events applier. */
private AbstractCdcEventsApplier applier;

/** CDC kafka to ignite metrics */
private final KafkaToIgniteMetrics metrics;

/**
* @param applierSupplier Cdc events applier supplier.
* @param log Logger.
* @param kafkaProps Kafka properties.
* @param topic Topic name.
* @param streamerCfg Streamer config.
* @param kafkaPartFrom Read from partition.
* @param kafkaPartTo Read to partition.
* @param caches Cache ids.
* @param maxBatchSize Maximum batch size.
* @param kafkaReqTimeout The maximum time to complete Kafka related requests, in milliseconds.
* @param consumerPollTimeout Consumer poll timeout in milliseconds.
* @param metaUpdr Metadata updater.
* @param stopped Stopped flag.
* @param metrics CDC Kafka to Ignite metrics.
*/
public KafkaToIgniteCdcStreamerApplier(
Supplier<AbstractCdcEventsApplier> applierSupplier,
IgniteLogger log,
Properties kafkaProps,
String topic,
KafkaToIgniteCdcStreamerConfiguration streamerCfg,
int kafkaPartFrom,
int kafkaPartTo,
Set<Integer> caches,
int maxBatchSize,
long kafkaReqTimeout,
long consumerPollTimeout,
KafkaToIgniteMetadataUpdater metaUpdr,
AtomicBoolean stopped
AtomicBoolean stopped,
KafkaToIgniteMetrics metrics
) {
this.applierSupplier = applierSupplier;
this.kafkaProps = kafkaProps;
this.topic = topic;
this.topic = streamerCfg.getTopic();
this.kafkaPartFrom = kafkaPartFrom;
this.kafkaPartTo = kafkaPartTo;
this.caches = caches;
this.kafkaReqTimeout = kafkaReqTimeout;
this.consumerPollTimeout = consumerPollTimeout;
this.kafkaReqTimeout = streamerCfg.getKafkaRequestTimeout();
this.consumerPollTimeout = streamerCfg.getKafkaConsumerPollTimeout();
this.metaUpdr = metaUpdr;
this.stopped = stopped;
this.log = log.getLogger(KafkaToIgniteCdcStreamerApplier.class);
this.metrics = metrics;
}

/** {@inheritDoc} */
Expand Down Expand Up @@ -233,7 +233,10 @@ private void poll(KafkaConsumer<Integer, byte[]> cnsmr) throws IgniteCheckedExce
);
}

applier.apply(F.iterator(recs, this::deserialize, true, this::filterAndPossiblyUpdateMetadata));
int msgsSntCur = applier.apply(F.iterator(recs, this::deserialize, true, this::filterAndPossiblyUpdateMetadata));

if (msgsSntCur > 0)
metrics.addSentEvents(msgsSntCur);

cnsmr.commitSync(Duration.ofMillis(kafkaReqTimeout));
}
Expand All @@ -250,9 +253,13 @@ private boolean filterAndPossiblyUpdateMetadata(ConsumerRecord<Integer, byte[]>
if (rec.key() == null && Arrays.equals(val, META_UPDATE_MARKER)) {
metaUpdr.updateMetadata();

metrics.incrementMarkers();

return false;
}

metrics.incrementReceivedEvents();

return F.isEmpty(caches) || caches.contains(rec.key());
}

Expand Down
Loading

0 comments on commit c3813f7

Please sign in to comment.