Skip to content

feat(metrics): duration since last flow update #3374

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Aug 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -744,7 +744,8 @@ func (a *FlowableActivity) ScheduledTasks(ctx context.Context) error {
}
var allFlows atomic.Pointer[[]flowInformation]
defer shared.Interval(ctx, 59*time.Second, func() {
rows, err := a.CatalogPool.Query(ctx, "SELECT DISTINCT ON (name) name, config_proto, workflow_id FROM flows WHERE query_string IS NULL")
rows, err := a.CatalogPool.Query(ctx,
"SELECT DISTINCT ON (name) name, config_proto, workflow_id, updated_at FROM flows WHERE query_string IS NULL")
if err != nil {
logger.Error("failed to query all flows", slog.Any("error", err))
return
Expand All @@ -754,7 +755,8 @@ func (a *FlowableActivity) ScheduledTasks(ctx context.Context) error {
var flowName string
var configProto []byte
var workflowID string
if err := rows.Scan(&flowName, &configProto, &workflowID); err != nil {
var updatedAt time.Time
if err := rows.Scan(&flowName, &configProto, &workflowID, &updatedAt); err != nil {
return flowInformation{}, err
}

Expand All @@ -766,6 +768,7 @@ func (a *FlowableActivity) ScheduledTasks(ctx context.Context) error {
return flowInformation{
config: &config,
workflowID: workflowID,
updatedAt: updatedAt,
}, nil
})
if err != nil {
Expand Down Expand Up @@ -797,6 +800,7 @@ func (a *FlowableActivity) ScheduledTasks(ctx context.Context) error {

type flowInformation struct {
config *protos.FlowConnectionConfigs
updatedAt time.Time
workflowID string
}

Expand All @@ -820,6 +824,7 @@ func (a *FlowableActivity) RecordMetrics(ctx context.Context, infos []flowInform
attribute.String(otel_metrics.DeploymentVersionKey, internal.PeerDBDeploymentVersion()),
)))
logger.Info("Emitting Instance and Flow Status", slog.Int("flows", len(infos)))
currentTime := time.Now()
activeFlows := make([]flowInformation, 0, len(infos))
for _, info := range infos {
flowMetadata, err := a.GetFlowMetadata(ctx, &protos.FlowContextMetadataInput{
Expand All @@ -845,6 +850,10 @@ func (a *FlowableActivity) RecordMetrics(ctx context.Context, infos []flowInform
attribute.String(otel_metrics.FlowStatusKey, status.String()),
attribute.Bool(otel_metrics.IsFlowActiveKey, isActive),
)))
a.OtelManager.Metrics.DurationSinceLastFlowUpdateGauge.Record(ctx, int64(currentTime.Sub(info.updatedAt).Seconds()),
metric.WithAttributeSet(attribute.NewSet(
attribute.String(otel_metrics.FlowStatusKey, status.String()),
)))
}
logger.Info("Finished emitting Instance and Flow Status", slog.Int("flows", len(infos)))
var totalCpuLimit float64
Expand Down
129 changes: 69 additions & 60 deletions flow/otel_metrics/otel_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,69 +25,71 @@ const (
)

const (
SlotLagGaugeName = "cdc_slot_lag"
CurrentBatchIdGaugeName = "current_batch_id"
LastNormalizedBatchIdGaugeName = "last_normalized_batch_id"
OpenConnectionsGaugeName = "open_connections"
OpenReplicationConnectionsGaugeName = "open_replication_connections"
CommittedLSNGaugeName = "committed_lsn"
RestartLSNGaugeName = "restart_lsn"
ConfirmedFlushLSNGaugeName = "confirmed_flush_lsn"
IntervalSinceLastNormalizeGaugeName = "interval_since_last_normalize"
AllFetchedBytesCounterName = "all_fetched_bytes"
FetchedBytesCounterName = "fetched_bytes"
CommitLagGaugeName = "commit_lag"
ErrorEmittedGaugeName = "error_emitted"
ErrorsEmittedCounterName = "errors_emitted"
WarningEmittedGaugeName = "warning_emitted"
WarningsEmittedCounterName = "warnings_emitted"
RecordsSyncedGaugeName = "records_synced"
RecordsSyncedCounterName = "records_synced_counter"
SyncedTablesGaugeName = "synced_tables"
InstanceStatusGaugeName = "instance_status"
MaintenanceStatusGaugeName = "maintenance_status"
FlowStatusGaugeName = "flow_status"
ActiveFlowsGaugeName = "active_flows"
CPULimitsPerActiveFlowGaugeName = "cpu_limits_per_active_flow_vcores"
MemoryLimitsPerActiveFlowGaugeName = "memory_limits_per_active_flow"
TotalCPULimitsGaugeName = "total_cpu_limits_vcores"
TotalMemoryLimitsGaugeName = "total_memory_limits"
WorkloadTotalReplicasGaugeName = "workload_total_replicas"
LogRetentionGaugeName = "log_retention"
LatestConsumedLogEventGaugeName = "latest_consumed_log_event"
SlotLagGaugeName = "cdc_slot_lag"
CurrentBatchIdGaugeName = "current_batch_id"
LastNormalizedBatchIdGaugeName = "last_normalized_batch_id"
OpenConnectionsGaugeName = "open_connections"
OpenReplicationConnectionsGaugeName = "open_replication_connections"
CommittedLSNGaugeName = "committed_lsn"
RestartLSNGaugeName = "restart_lsn"
ConfirmedFlushLSNGaugeName = "confirmed_flush_lsn"
IntervalSinceLastNormalizeGaugeName = "interval_since_last_normalize"
AllFetchedBytesCounterName = "all_fetched_bytes"
FetchedBytesCounterName = "fetched_bytes"
CommitLagGaugeName = "commit_lag"
ErrorEmittedGaugeName = "error_emitted"
ErrorsEmittedCounterName = "errors_emitted"
WarningEmittedGaugeName = "warning_emitted"
WarningsEmittedCounterName = "warnings_emitted"
RecordsSyncedGaugeName = "records_synced"
RecordsSyncedCounterName = "records_synced_counter"
SyncedTablesGaugeName = "synced_tables"
InstanceStatusGaugeName = "instance_status"
MaintenanceStatusGaugeName = "maintenance_status"
FlowStatusGaugeName = "flow_status"
DurationSinceLastFlowUpdateGaugeName = "duration_since_last_flow_update"
ActiveFlowsGaugeName = "active_flows"
CPULimitsPerActiveFlowGaugeName = "cpu_limits_per_active_flow_vcores"
MemoryLimitsPerActiveFlowGaugeName = "memory_limits_per_active_flow"
TotalCPULimitsGaugeName = "total_cpu_limits_vcores"
TotalMemoryLimitsGaugeName = "total_memory_limits"
WorkloadTotalReplicasGaugeName = "workload_total_replicas"
LogRetentionGaugeName = "log_retention"
LatestConsumedLogEventGaugeName = "latest_consumed_log_event"
)

type Metrics struct {
SlotLagGauge metric.Float64Gauge
CurrentBatchIdGauge metric.Int64Gauge
LastNormalizedBatchIdGauge metric.Int64Gauge
OpenConnectionsGauge metric.Int64Gauge
OpenReplicationConnectionsGauge metric.Int64Gauge
CommittedLSNGauge metric.Int64Gauge
RestartLSNGauge metric.Int64Gauge
ConfirmedFlushLSNGauge metric.Int64Gauge
IntervalSinceLastNormalizeGauge metric.Float64Gauge
AllFetchedBytesCounter metric.Int64Counter
FetchedBytesCounter metric.Int64Counter
CommitLagGauge metric.Int64Gauge
ErrorEmittedGauge metric.Int64Gauge
ErrorsEmittedCounter metric.Int64Counter
WarningsEmittedGauge metric.Int64Gauge
WarningEmittedCounter metric.Int64Counter
RecordsSyncedGauge metric.Int64Gauge
RecordsSyncedCounter metric.Int64Counter
SyncedTablesGauge metric.Int64Gauge
InstanceStatusGauge metric.Int64Gauge
MaintenanceStatusGauge metric.Int64Gauge
FlowStatusGauge metric.Int64Gauge
ActiveFlowsGauge metric.Int64Gauge
CPULimitsPerActiveFlowGauge metric.Float64Gauge
MemoryLimitsPerActiveFlowGauge metric.Float64Gauge
TotalCPULimitsGauge metric.Float64Gauge
TotalMemoryLimitsGauge metric.Float64Gauge
WorkloadTotalReplicasGauge metric.Int64Gauge
LatestConsumedLogEventGauge metric.Int64Gauge
LogRetentionGauge metric.Float64Gauge
SlotLagGauge metric.Float64Gauge
CurrentBatchIdGauge metric.Int64Gauge
LastNormalizedBatchIdGauge metric.Int64Gauge
OpenConnectionsGauge metric.Int64Gauge
OpenReplicationConnectionsGauge metric.Int64Gauge
CommittedLSNGauge metric.Int64Gauge
RestartLSNGauge metric.Int64Gauge
ConfirmedFlushLSNGauge metric.Int64Gauge
IntervalSinceLastNormalizeGauge metric.Float64Gauge
AllFetchedBytesCounter metric.Int64Counter
FetchedBytesCounter metric.Int64Counter
CommitLagGauge metric.Int64Gauge
ErrorEmittedGauge metric.Int64Gauge
ErrorsEmittedCounter metric.Int64Counter
WarningsEmittedGauge metric.Int64Gauge
WarningEmittedCounter metric.Int64Counter
RecordsSyncedGauge metric.Int64Gauge
RecordsSyncedCounter metric.Int64Counter
SyncedTablesGauge metric.Int64Gauge
InstanceStatusGauge metric.Int64Gauge
MaintenanceStatusGauge metric.Int64Gauge
FlowStatusGauge metric.Int64Gauge
DurationSinceLastFlowUpdateGauge metric.Int64Gauge
ActiveFlowsGauge metric.Int64Gauge
CPULimitsPerActiveFlowGauge metric.Float64Gauge
MemoryLimitsPerActiveFlowGauge metric.Float64Gauge
TotalCPULimitsGauge metric.Float64Gauge
TotalMemoryLimitsGauge metric.Float64Gauge
WorkloadTotalReplicasGauge metric.Int64Gauge
LatestConsumedLogEventGauge metric.Int64Gauge
LogRetentionGauge metric.Float64Gauge
}

type SlotMetricGauges struct {
Expand Down Expand Up @@ -332,6 +334,13 @@ func (om *OtelManager) setupMetrics() error {
return err
}

if om.Metrics.DurationSinceLastFlowUpdateGauge, err = om.GetOrInitInt64Gauge(BuildMetricName(DurationSinceLastFlowUpdateGaugeName),
metric.WithUnit("s"),
metric.WithDescription("Duration since last flow update in seconds"),
); err != nil {
return err
}

if om.Metrics.ActiveFlowsGauge, err = om.GetOrInitInt64Gauge(BuildMetricName(ActiveFlowsGaugeName),
metric.WithDescription("Number of active flows"),
); err != nil {
Expand Down
Loading