Skip to content

Commit fa6e7ea

Browse files
authored
[FLINK-35245][cdc-connector][tidb] Add metrics for flink-connector-tidb-cdc
1 parent b6cfbcc commit fa6e7ea

File tree

3 files changed

+178
-0
lines changed

3 files changed

+178
-0
lines changed

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/TiKVRichParallelSourceFunction.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,11 @@
2323
import org.apache.flink.api.common.typeinfo.TypeInformation;
2424
import org.apache.flink.api.common.typeutils.base.LongSerializer;
2525
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
26+
import org.apache.flink.cdc.connectors.tidb.metrics.TiDBSourceMetrics;
2627
import org.apache.flink.cdc.connectors.tidb.table.StartupMode;
2728
import org.apache.flink.cdc.connectors.tidb.table.utils.TableKeyRangeUtils;
2829
import org.apache.flink.configuration.Configuration;
30+
import org.apache.flink.metrics.MetricGroup;
2931
import org.apache.flink.runtime.state.FunctionInitializationContext;
3032
import org.apache.flink.runtime.state.FunctionSnapshotContext;
3133
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
@@ -42,6 +44,7 @@
4244
import org.tikv.common.TiSession;
4345
import org.tikv.common.key.RowKey;
4446
import org.tikv.common.meta.TiTableInfo;
47+
import org.tikv.common.meta.TiTimestamp;
4548
import org.tikv.kvproto.Cdcpb;
4649
import org.tikv.kvproto.Coprocessor;
4750
import org.tikv.kvproto.Kvrpcpb;
@@ -91,6 +94,7 @@ public class TiKVRichParallelSourceFunction<T> extends RichParallelSourceFunctio
9194

9295
private transient boolean running = true;
9396
private transient ExecutorService executorService;
97+
private transient TiDBSourceMetrics sourceMetrics;
9498

9599
/** offset state. */
96100
private transient ListState<Long> offsetState;
@@ -146,6 +150,9 @@ public void open(final Configuration config) throws Exception {
146150
+ getRuntimeContext().getIndexOfThisSubtask())
147151
.build();
148152
executorService = Executors.newSingleThreadExecutor(threadFactory);
153+
final MetricGroup metricGroup = getRuntimeContext().getMetricGroup();
154+
sourceMetrics = new TiDBSourceMetrics(metricGroup);
155+
sourceMetrics.registerMetrics();
149156
}
150157

151158
@Override
@@ -210,6 +217,7 @@ protected void readSnapshotEvents() throws Exception {
210217
for (final Kvrpcpb.KvPair pair : segment) {
211218
if (TableKeyRangeUtils.isRecordKey(pair.getKey().toByteArray())) {
212219
snapshotEventDeserializationSchema.deserialize(pair, outputCollector);
220+
reportMetrics(0L, startTs);
213221
}
214222
}
215223

@@ -231,6 +239,8 @@ protected void readChangeEvents() throws Exception {
231239
Cdcpb.Event.Row committedRow = committedEvents.take();
232240
changeEventDeserializationSchema.deserialize(
233241
committedRow, outputCollector);
242+
// use startTs of row as messageTs, use commitTs of row as fetchTs
243+
reportMetrics(committedRow.getStartTs(), committedRow.getCommitTs());
234244
} catch (Exception e) {
235245
e.printStackTrace();
236246
}
@@ -390,4 +400,20 @@ public void close() {
390400
// do nothing
391401
}
392402
}
403+
404+
private void reportMetrics(long messageTs, long fetchTs) {
405+
long now = System.currentTimeMillis();
406+
// record the latest process time
407+
sourceMetrics.recordProcessTime(now);
408+
long messageTimestamp = TiTimestamp.extractPhysical(messageTs);
409+
long fetchTimestamp = TiTimestamp.extractPhysical(fetchTs);
410+
if (messageTimestamp > 0L) {
411+
// report fetch delay
412+
if (fetchTimestamp >= messageTimestamp) {
413+
sourceMetrics.recordFetchDelay(fetchTimestamp - messageTimestamp);
414+
}
415+
// report emit delay
416+
sourceMetrics.recordEmitDelay(now - messageTimestamp);
417+
}
418+
}
393419
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.cdc.connectors.tidb.metrics;
19+
20+
import org.apache.flink.cdc.connectors.tidb.TiKVRichParallelSourceFunction;
21+
import org.apache.flink.metrics.Gauge;
22+
import org.apache.flink.metrics.MetricGroup;
23+
24+
import static org.apache.flink.runtime.metrics.MetricNames.CURRENT_EMIT_EVENT_TIME_LAG;
25+
import static org.apache.flink.runtime.metrics.MetricNames.CURRENT_FETCH_EVENT_TIME_LAG;
26+
import static org.apache.flink.runtime.metrics.MetricNames.SOURCE_IDLE_TIME;
27+
28+
/** A collection class for handling metrics in {@link TiKVRichParallelSourceFunction}. */
29+
public class TiDBSourceMetrics {
30+
31+
private final MetricGroup metricGroup;
32+
33+
/**
34+
* The last record processing time, which is updated after {@link
35+
* TiKVRichParallelSourceFunction} fetches a batch of data. It's mainly used to report metrics
36+
* sourceIdleTime for sourceIdleTime = System.currentTimeMillis() - processTime.
37+
*/
38+
private long processTime = 0L;
39+
40+
/**
41+
* currentFetchEventTimeLag = FetchTime - messageTimestamp, where the FetchTime is the time the
42+
* record fetched into the source operator.
43+
*/
44+
private long fetchDelay = 0L;
45+
46+
/**
47+
* currentEmitEventTimeLag = EmitTime - messageTimestamp, where the EmitTime is the time the
48+
* record leaves the source operator.
49+
*/
50+
private long emitDelay = 0L;
51+
52+
public TiDBSourceMetrics(MetricGroup metricGroup) {
53+
this.metricGroup = metricGroup;
54+
}
55+
56+
public void registerMetrics() {
57+
58+
metricGroup.gauge(CURRENT_FETCH_EVENT_TIME_LAG, (Gauge<Long>) this::getFetchDelay);
59+
metricGroup.gauge(CURRENT_EMIT_EVENT_TIME_LAG, (Gauge<Long>) this::getEmitDelay);
60+
metricGroup.gauge(SOURCE_IDLE_TIME, (Gauge<Long>) this::getIdleTime);
61+
}
62+
63+
public long getFetchDelay() {
64+
return fetchDelay;
65+
}
66+
67+
public long getEmitDelay() {
68+
return emitDelay;
69+
}
70+
71+
public long getIdleTime() {
72+
// no previous process time at the beginning, return 0 as idle time
73+
if (processTime == 0) {
74+
return 0;
75+
}
76+
return System.currentTimeMillis() - processTime;
77+
}
78+
79+
public void recordProcessTime(long processTime) {
80+
this.processTime = processTime;
81+
}
82+
83+
public void recordFetchDelay(long fetchDelay) {
84+
this.fetchDelay = fetchDelay;
85+
}
86+
87+
public void recordEmitDelay(long emitDelay) {
88+
this.emitDelay = emitDelay;
89+
}
90+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.cdc.connectors.tidb.metrics;
19+
20+
import org.apache.flink.metrics.Gauge;
21+
import org.apache.flink.metrics.testutils.MetricListener;
22+
23+
import org.junit.Before;
24+
import org.junit.Test;
25+
26+
import java.util.Optional;
27+
28+
import static org.apache.flink.runtime.metrics.MetricNames.CURRENT_EMIT_EVENT_TIME_LAG;
29+
import static org.apache.flink.runtime.metrics.MetricNames.CURRENT_FETCH_EVENT_TIME_LAG;
30+
import static org.junit.Assert.assertEquals;
31+
import static org.junit.Assert.assertTrue;
32+
33+
/** Unit test for {@link TiDBSourceMetrics}. */
34+
public class TiDBSourceMetricsTest {
35+
private MetricListener metricListener;
36+
private TiDBSourceMetrics sourceMetrics;
37+
38+
@Before
39+
public void setUp() {
40+
metricListener = new MetricListener();
41+
sourceMetrics = new TiDBSourceMetrics(metricListener.getMetricGroup());
42+
sourceMetrics.registerMetrics();
43+
}
44+
45+
@Test
46+
public void testFetchEventTimeLagTracking() {
47+
sourceMetrics.recordFetchDelay(5L);
48+
assertGauge(metricListener, CURRENT_FETCH_EVENT_TIME_LAG, 5L);
49+
}
50+
51+
@Test
52+
public void testEmitEventTimeLagTracking() {
53+
sourceMetrics.recordEmitDelay(3L);
54+
assertGauge(metricListener, CURRENT_EMIT_EVENT_TIME_LAG, 3L);
55+
}
56+
57+
private void assertGauge(MetricListener metricListener, String identifier, long expected) {
58+
Optional<Gauge<Object>> gauge = metricListener.getGauge(identifier);
59+
assertTrue(gauge.isPresent());
60+
assertEquals(expected, (long) gauge.get().getValue());
61+
}
62+
}

0 commit comments

Comments
 (0)