Skip to content

Commit c7bf652

Browse files
Introduce log retention gauge (#3326)
This PR introduces a float64 gauge for collecting log retention hours in MySQL and Mongo. It does this in the recordSlotSize workflow. Introduces a new activity to call in that workflow called `emitLogRetentionHours`. Utility functions in retention.go and mongo.go Functionally tested locally with prometheus and grafana: <img width="1519" height="536" alt="Screenshot 2025-08-11 at 9 53 49 PM" src="https://github.com/user-attachments/assets/edb3b5cd-82b5-4cfa-8101-5b777ff71bd1" />
1 parent d62bd56 commit c7bf652

File tree

7 files changed

+206
-68
lines changed

7 files changed

+206
-68
lines changed

flow/activities/flowable.go

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -899,7 +899,7 @@ func (a *FlowableActivity) RecordSlotSizes(ctx context.Context) error {
899899
if err != nil {
900900
return err
901901
}
902-
logger.Info("Fetching and recording Slot Information", slog.Int("flows", len(infos)))
902+
logger.Info("Recording slot size and emitting log retention where applicable", slog.Int("flows", len(infos)))
903903
var wg sync.WaitGroup
904904
maxParallel := 5
905905
semaphore := make(chan struct{}, maxParallel)
@@ -913,6 +913,7 @@ func (a *FlowableActivity) RecordSlotSizes(ctx context.Context) error {
913913
timeoutCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
914914
defer cancel()
915915
a.recordSlotInformation(timeoutCtx, info, slotMetricGauges)
916+
a.emitLogRetentionHours(timeoutCtx, info, a.OtelManager.Metrics.LogRetentionGauge)
916917
}(ctx, info)
917918
}
918919
logger.Info("Waiting for Slot Information to be recorded", slog.Int("flows", len(infos)))
@@ -991,6 +992,50 @@ func (a *FlowableActivity) recordSlotInformation(
991992
}
992993
}
993994

995+
func (a *FlowableActivity) emitLogRetentionHours(
996+
ctx context.Context,
997+
info *flowInformation,
998+
logRetentionGauge metric.Float64Gauge,
999+
) {
1000+
logger := internal.LoggerFromCtx(ctx)
1001+
flowMetadata, err := a.GetFlowMetadata(ctx, &protos.FlowContextMetadataInput{
1002+
FlowName: info.config.FlowJobName,
1003+
SourceName: info.config.SourceName,
1004+
DestinationName: info.config.DestinationName,
1005+
})
1006+
if err != nil {
1007+
logger.Error("Failed to get flow metadata", slog.Any("error", err))
1008+
}
1009+
ctx = context.WithValue(ctx, internal.FlowMetadataKey, flowMetadata)
1010+
srcConn, err := connectors.GetByNameAs[connectors.GetLogRetentionConnector](ctx, nil, a.CatalogPool, info.config.SourceName)
1011+
if err != nil {
1012+
if !errors.Is(err, errors.ErrUnsupported) {
1013+
logger.Error("Failed to create connector to emit log retention", slog.Any("error", err))
1014+
}
1015+
return
1016+
}
1017+
defer connectors.CloseConnector(ctx, srcConn)
1018+
1019+
peerName := info.config.SourceName
1020+
activity.RecordHeartbeat(ctx, "checking log retention on "+peerName)
1021+
if ctx.Err() != nil {
1022+
return
1023+
}
1024+
logRetentionHours, err := srcConn.GetLogRetentionHours(ctx)
1025+
if err != nil {
1026+
logger.Error("Failed to get log retention hours", slog.Any("error", err))
1027+
}
1028+
1029+
if logRetentionHours > 0 {
1030+
logRetentionGauge.Record(ctx, logRetentionHours)
1031+
logger.Info("Emitted log retention hours", slog.String("peerName", peerName), slog.Float64("logRetentionHours", logRetentionHours))
1032+
return
1033+
}
1034+
1035+
logger.Warn("Log retention hours is not set or is zero, skipping emission",
1036+
slog.String("peerName", peerName), slog.Float64("logRetentionHours", logRetentionHours))
1037+
}
1038+
9941039
var activeFlowStatuses = map[protos.FlowStatus]struct{}{
9951040
protos.FlowStatus_STATUS_RUNNING: {},
9961041
protos.FlowStatus_STATUS_PAUSING: {},

flow/connectors/core.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -304,6 +304,12 @@ type GetVersionConnector interface {
304304
GetVersion(context.Context) (string, error)
305305
}
306306

307+
type GetLogRetentionConnector interface {
308+
Connector
309+
310+
GetLogRetentionHours(ctx context.Context) (float64, error)
311+
}
312+
307313
func LoadPeerType(ctx context.Context, catalogPool shared.CatalogPool, peerName string) (protos.DBType, error) {
308314
row := catalogPool.QueryRow(ctx, "SELECT type FROM peers WHERE name = $1", peerName)
309315
var dbtype protos.DBType
@@ -581,4 +587,7 @@ var (
581587
_ GetVersionConnector = &connpostgres.PostgresConnector{}
582588
_ GetVersionConnector = &connmysql.MySqlConnector{}
583589
_ GetVersionConnector = &connmongo.MongoConnector{}
590+
591+
_ GetLogRetentionConnector = &connmysql.MySqlConnector{}
592+
_ GetLogRetentionConnector = &connmongo.MongoConnector{}
584593
)

flow/connectors/mongo/cdc.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -324,7 +324,7 @@ func (c *MongoConnector) PullRecords(
324324
}
325325

326326
clusterTimeNanos := time.Unix(int64(changeEvent.ClusterTime.T), 0).UnixNano()
327-
otelManager.Metrics.LatestConsumedChangeStreamEventGauge.Record(
327+
otelManager.Metrics.LatestConsumedLogEventGauge.Record(
328328
ctx,
329329
time.Unix(int64(changeEvent.ClusterTime.T), 0).Unix(),
330330
)

flow/connectors/mongo/mongo.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,3 +166,12 @@ func parseAsClientOptions(config *protos.MongoConfig, meteredDialer utils.Metere
166166
}
167167
return clientOptions, nil
168168
}
169+
170+
func (c *MongoConnector) GetLogRetentionHours(ctx context.Context) (float64, error) {
171+
serverStatus, err := peerdb_mongo.GetServerStatus(ctx, c.client)
172+
if err != nil {
173+
return 0, fmt.Errorf("failed to get server status: %w", err)
174+
}
175+
176+
return float64(serverStatus.OplogTruncation.OplogMinRetentionHours), nil
177+
}

flow/connectors/mysql/cdc.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -616,7 +616,7 @@ func (c *MySqlConnector) PullRecords(
616616
}
617617
}
618618
if event.Header.Timestamp > 0 {
619-
otelManager.Metrics.LatestConsumedBinlogEventGauge.Record(
619+
otelManager.Metrics.LatestConsumedLogEventGauge.Record(
620620
ctx,
621621
int64(event.Header.Timestamp),
622622
)

flow/connectors/mysql/retention.go

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
package connmysql
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
8+
"github.com/PeerDB-io/peerdb/flow/generated/protos"
9+
)
10+
11+
func (c *MySqlConnector) GetLogRetentionHours(ctx context.Context) (float64, error) {
12+
switch c.config.Flavor {
13+
case protos.MySqlFlavor_MYSQL_MARIA:
14+
return c.getLogRetentionHoursForMariaDB(ctx)
15+
case protos.MySqlFlavor_MYSQL_MYSQL:
16+
cmp, err := c.CompareServerVersion(ctx, "8.0.1")
17+
if err != nil {
18+
return 0, fmt.Errorf("failed to get server version: %w", err)
19+
}
20+
if cmp < 0 {
21+
return c.getLogRetentionHoursForMySQL5(ctx)
22+
}
23+
return c.getLogRetentionHoursForMySQL8(ctx)
24+
default:
25+
return 0, fmt.Errorf("unsupported MySQL flavor: %s", c.config.Flavor)
26+
}
27+
}
28+
29+
func (c *MySqlConnector) getLogRetentionHoursForMySQL5(ctx context.Context) (float64, error) {
30+
// expire_logs_days
31+
rs, err := c.Execute(ctx, "SELECT @@expire_logs_days")
32+
if err != nil {
33+
return 0, fmt.Errorf("failed to get expire_logs_days: %w", err)
34+
}
35+
if len(rs.Values) == 0 || len(rs.Values[0]) == 0 {
36+
return 0, errors.New("no value returned for expire_logs_days")
37+
}
38+
expireLogsDays := rs.Values[0][0].AsUint64()
39+
return float64(expireLogsDays) * 24.0, nil // convert days to hours
40+
}
41+
42+
func (c *MySqlConnector) getLogRetentionHoursForMySQL8(ctx context.Context) (float64, error) {
43+
rs, err := c.Execute(ctx, "SELECT @@binlog_expire_logs_seconds")
44+
if err != nil {
45+
return 0, fmt.Errorf("failed to get binlog_expire_logs_seconds: %w", err)
46+
}
47+
if len(rs.Values) == 0 || len(rs.Values[0]) == 0 {
48+
return 0, errors.New("no value returned for binlog_expire_logs_seconds")
49+
}
50+
binlogExpireLogsSeconds := rs.Values[0][0].AsUint64()
51+
if binlogExpireLogsSeconds == 0 {
52+
return 0, nil // no expiration set
53+
}
54+
return float64(binlogExpireLogsSeconds) / 3600.0, nil
55+
}
56+
57+
func (c *MySqlConnector) getLogRetentionHoursForMariaDB(ctx context.Context) (float64, error) {
58+
cmp, err := c.CompareServerVersion(ctx, "10.6.1")
59+
if err != nil {
60+
return 0, fmt.Errorf("failed to get server version: %w", err)
61+
}
62+
if cmp < 0 {
63+
return 0, errors.New("mariadb version does not support binlog_expire_logs_seconds")
64+
}
65+
66+
rs, err := c.Execute(ctx, "SELECT @@binlog_expire_logs_seconds")
67+
if err != nil {
68+
return 0, fmt.Errorf("failed to get binlog_expire_logs_seconds: %w", err)
69+
}
70+
if len(rs.Values) == 0 || len(rs.Values[0]) == 0 {
71+
return 0, errors.New("no value returned for binlog_expire_logs_seconds")
72+
}
73+
binlogExpireLogsSeconds := rs.Values[0][0].AsUint64()
74+
return float64(binlogExpireLogsSeconds) / 3600.0, nil
75+
}

flow/otel_metrics/otel_manager.go

Lines changed: 65 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -25,69 +25,69 @@ const (
2525
)
2626

2727
const (
28-
SlotLagGaugeName = "cdc_slot_lag"
29-
CurrentBatchIdGaugeName = "current_batch_id"
30-
LastNormalizedBatchIdGaugeName = "last_normalized_batch_id"
31-
OpenConnectionsGaugeName = "open_connections"
32-
OpenReplicationConnectionsGaugeName = "open_replication_connections"
33-
CommittedLSNGaugeName = "committed_lsn"
34-
RestartLSNGaugeName = "restart_lsn"
35-
ConfirmedFlushLSNGaugeName = "confirmed_flush_lsn"
36-
IntervalSinceLastNormalizeGaugeName = "interval_since_last_normalize"
37-
AllFetchedBytesCounterName = "all_fetched_bytes"
38-
FetchedBytesCounterName = "fetched_bytes"
39-
CommitLagGaugeName = "commit_lag"
40-
ErrorEmittedGaugeName = "error_emitted"
41-
ErrorsEmittedCounterName = "errors_emitted"
42-
WarningEmittedGaugeName = "warning_emitted"
43-
WarningsEmittedCounterName = "warnings_emitted"
44-
RecordsSyncedGaugeName = "records_synced"
45-
RecordsSyncedCounterName = "records_synced_counter"
46-
SyncedTablesGaugeName = "synced_tables"
47-
InstanceStatusGaugeName = "instance_status"
48-
MaintenanceStatusGaugeName = "maintenance_status"
49-
FlowStatusGaugeName = "flow_status"
50-
ActiveFlowsGaugeName = "active_flows"
51-
CPULimitsPerActiveFlowGaugeName = "cpu_limits_per_active_flow_vcores"
52-
MemoryLimitsPerActiveFlowGaugeName = "memory_limits_per_active_flow"
53-
TotalCPULimitsGaugeName = "total_cpu_limits_vcores"
54-
TotalMemoryLimitsGaugeName = "total_memory_limits"
55-
WorkloadTotalReplicasGaugeName = "workload_total_replicas"
56-
LatestConsumedBinlogEventGaugeName = "latest_consumed_binlog"
57-
LatestConsumedChangeStreamEventGaugeName = "latest_consumed_change_stream_event"
28+
SlotLagGaugeName = "cdc_slot_lag"
29+
CurrentBatchIdGaugeName = "current_batch_id"
30+
LastNormalizedBatchIdGaugeName = "last_normalized_batch_id"
31+
OpenConnectionsGaugeName = "open_connections"
32+
OpenReplicationConnectionsGaugeName = "open_replication_connections"
33+
CommittedLSNGaugeName = "committed_lsn"
34+
RestartLSNGaugeName = "restart_lsn"
35+
ConfirmedFlushLSNGaugeName = "confirmed_flush_lsn"
36+
IntervalSinceLastNormalizeGaugeName = "interval_since_last_normalize"
37+
AllFetchedBytesCounterName = "all_fetched_bytes"
38+
FetchedBytesCounterName = "fetched_bytes"
39+
CommitLagGaugeName = "commit_lag"
40+
ErrorEmittedGaugeName = "error_emitted"
41+
ErrorsEmittedCounterName = "errors_emitted"
42+
WarningEmittedGaugeName = "warning_emitted"
43+
WarningsEmittedCounterName = "warnings_emitted"
44+
RecordsSyncedGaugeName = "records_synced"
45+
RecordsSyncedCounterName = "records_synced_counter"
46+
SyncedTablesGaugeName = "synced_tables"
47+
InstanceStatusGaugeName = "instance_status"
48+
MaintenanceStatusGaugeName = "maintenance_status"
49+
FlowStatusGaugeName = "flow_status"
50+
ActiveFlowsGaugeName = "active_flows"
51+
CPULimitsPerActiveFlowGaugeName = "cpu_limits_per_active_flow_vcores"
52+
MemoryLimitsPerActiveFlowGaugeName = "memory_limits_per_active_flow"
53+
TotalCPULimitsGaugeName = "total_cpu_limits_vcores"
54+
TotalMemoryLimitsGaugeName = "total_memory_limits"
55+
WorkloadTotalReplicasGaugeName = "workload_total_replicas"
56+
LogRetentionGaugeName = "log_retention"
57+
LatestConsumedLogEventGaugeName = "latest_consumed_log_event"
5858
)
5959

6060
type Metrics struct {
61-
SlotLagGauge metric.Float64Gauge
62-
CurrentBatchIdGauge metric.Int64Gauge
63-
LastNormalizedBatchIdGauge metric.Int64Gauge
64-
OpenConnectionsGauge metric.Int64Gauge
65-
OpenReplicationConnectionsGauge metric.Int64Gauge
66-
CommittedLSNGauge metric.Int64Gauge
67-
RestartLSNGauge metric.Int64Gauge
68-
ConfirmedFlushLSNGauge metric.Int64Gauge
69-
IntervalSinceLastNormalizeGauge metric.Float64Gauge
70-
AllFetchedBytesCounter metric.Int64Counter
71-
FetchedBytesCounter metric.Int64Counter
72-
CommitLagGauge metric.Int64Gauge
73-
ErrorEmittedGauge metric.Int64Gauge
74-
ErrorsEmittedCounter metric.Int64Counter
75-
WarningsEmittedGauge metric.Int64Gauge
76-
WarningEmittedCounter metric.Int64Counter
77-
RecordsSyncedGauge metric.Int64Gauge
78-
RecordsSyncedCounter metric.Int64Counter
79-
SyncedTablesGauge metric.Int64Gauge
80-
InstanceStatusGauge metric.Int64Gauge
81-
MaintenanceStatusGauge metric.Int64Gauge
82-
FlowStatusGauge metric.Int64Gauge
83-
ActiveFlowsGauge metric.Int64Gauge
84-
CPULimitsPerActiveFlowGauge metric.Float64Gauge
85-
MemoryLimitsPerActiveFlowGauge metric.Float64Gauge
86-
TotalCPULimitsGauge metric.Float64Gauge
87-
TotalMemoryLimitsGauge metric.Float64Gauge
88-
WorkloadTotalReplicasGauge metric.Int64Gauge
89-
LatestConsumedBinlogEventGauge metric.Int64Gauge
90-
LatestConsumedChangeStreamEventGauge metric.Int64Gauge
61+
SlotLagGauge metric.Float64Gauge
62+
CurrentBatchIdGauge metric.Int64Gauge
63+
LastNormalizedBatchIdGauge metric.Int64Gauge
64+
OpenConnectionsGauge metric.Int64Gauge
65+
OpenReplicationConnectionsGauge metric.Int64Gauge
66+
CommittedLSNGauge metric.Int64Gauge
67+
RestartLSNGauge metric.Int64Gauge
68+
ConfirmedFlushLSNGauge metric.Int64Gauge
69+
IntervalSinceLastNormalizeGauge metric.Float64Gauge
70+
AllFetchedBytesCounter metric.Int64Counter
71+
FetchedBytesCounter metric.Int64Counter
72+
CommitLagGauge metric.Int64Gauge
73+
ErrorEmittedGauge metric.Int64Gauge
74+
ErrorsEmittedCounter metric.Int64Counter
75+
WarningsEmittedGauge metric.Int64Gauge
76+
WarningEmittedCounter metric.Int64Counter
77+
RecordsSyncedGauge metric.Int64Gauge
78+
RecordsSyncedCounter metric.Int64Counter
79+
SyncedTablesGauge metric.Int64Gauge
80+
InstanceStatusGauge metric.Int64Gauge
81+
MaintenanceStatusGauge metric.Int64Gauge
82+
FlowStatusGauge metric.Int64Gauge
83+
ActiveFlowsGauge metric.Int64Gauge
84+
CPULimitsPerActiveFlowGauge metric.Float64Gauge
85+
MemoryLimitsPerActiveFlowGauge metric.Float64Gauge
86+
TotalCPULimitsGauge metric.Float64Gauge
87+
TotalMemoryLimitsGauge metric.Float64Gauge
88+
WorkloadTotalReplicasGauge metric.Int64Gauge
89+
LatestConsumedLogEventGauge metric.Int64Gauge
90+
LogRetentionGauge metric.Float64Gauge
9191
}
9292

9393
type SlotMetricGauges struct {
@@ -253,16 +253,16 @@ func (om *OtelManager) setupMetrics() error {
253253
return err
254254
}
255255

256-
if om.Metrics.LatestConsumedBinlogEventGauge, err = om.GetOrInitInt64Gauge(BuildMetricName(LatestConsumedBinlogEventGaugeName),
256+
if om.Metrics.LatestConsumedLogEventGauge, err = om.GetOrInitInt64Gauge(BuildMetricName(LatestConsumedLogEventGaugeName),
257257
metric.WithUnit("s"),
258-
metric.WithDescription("Timestamp of latest binlog event read in epoch seconds"),
258+
metric.WithDescription("Latest consumed replication log event timestamp in epoch seconds"),
259259
); err != nil {
260260
return err
261261
}
262262

263-
if om.Metrics.LatestConsumedChangeStreamEventGauge, err = om.GetOrInitInt64Gauge(BuildMetricName(LatestConsumedChangeStreamEventGaugeName),
264-
metric.WithUnit("s"),
265-
metric.WithDescription("Timestamp of latest change stream event read in epoch seconds"),
263+
if om.Metrics.LogRetentionGauge, err = om.GetOrInitFloat64Gauge(BuildMetricName(LogRetentionGaugeName),
264+
metric.WithUnit("h"),
265+
metric.WithDescription("Log retention in hours for the source data store"),
266266
); err != nil {
267267
return err
268268
}

0 commit comments

Comments
 (0)