Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions PROMETHEUS.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,17 @@ The endpoint is publicly accessible (no authentication required) and returns met
| `dblab_datasets_total` | Gauge | `pool` | Total number of datasets (slots) in the pool |
| `dblab_datasets_available` | Gauge | `pool` | Number of available (non-busy) dataset slots for reuse |

### Sync Instance Metrics (Physical Mode)

These metrics are only available when DBLab is running in physical mode with a sync instance enabled. They track the WAL replay status of the sync instance.

| Metric Name | Type | Labels | Description |
|-------------|------|--------|-------------|
| `dblab_sync_status` | Gauge | `status` | Status of the sync instance (1=active for status code) |
| `dblab_sync_wal_lag_seconds` | Gauge | - | WAL replay lag in seconds for the sync instance |
| `dblab_sync_uptime_seconds` | Gauge | - | Uptime of the sync instance in seconds |
| `dblab_sync_last_replayed_timestamp` | Gauge | - | Unix timestamp of the last replayed transaction |

### Observability Metrics

These metrics help monitor the health of the metrics collection system itself.
Expand Down Expand Up @@ -146,6 +157,18 @@ dblab_clones_by_status
time() - dblab_scrape_success_timestamp
```

### WAL Replay Lag (Physical Mode)

```promql
dblab_sync_wal_lag_seconds
```

### Time Since Last Replayed Transaction

```promql
time() - dblab_sync_last_replayed_timestamp
```

## Alerting Examples

### Low Disk Space Alert
Expand Down Expand Up @@ -200,6 +223,32 @@ time() - dblab_scrape_success_timestamp
description: "DBLab metrics have not been updated for more than 5 minutes"
```

### High WAL Replay Lag Alert (Physical Mode)

```yaml
- alert: DBLabHighWALLag
expr: dblab_sync_wal_lag_seconds > 3600
for: 10m
labels:
severity: warning
annotations:
summary: "DBLab sync instance has high WAL lag"
description: "DBLab sync instance WAL replay is {{ $value | humanizeDuration }} behind"
```

### Sync Instance Down Alert (Physical Mode)

```yaml
- alert: DBLabSyncDown
expr: dblab_sync_status{status="down"} == 1 or dblab_sync_status{status="error"} == 1
for: 5m
labels:
severity: critical
annotations:
summary: "DBLab sync instance is down"
description: "DBLab sync instance is not healthy"
```

## OpenTelemetry Integration

DBLab metrics can be exported to OpenTelemetry-compatible backends using the OpenTelemetry Collector. This allows you to send metrics to Grafana Cloud, Datadog, New Relic, and other observability platforms.
Expand Down
59 changes: 59 additions & 0 deletions engine/internal/srv/metrics/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ const (
dockerStatsTimeout = 5 * time.Second
dockerStatsWorkers = 10
cpuNoData = -1.0

// postgresTimestampFormat is the default text representation of PostgreSQL's timestamp with time zone.
postgresTimestampFormat = "2006-01-02 15:04:05.999999-07"
)

// CloningService defines the interface for clone and snapshot operations needed by metrics.
Expand All @@ -39,6 +42,7 @@ type CloningService interface {
type RetrievalService interface {
GetRetrievalMode() models.RetrievalMode
GetRetrievalStatus() models.RetrievalStatus
ReportSyncStatus(ctx context.Context) (*models.Sync, error)
}

// PoolService defines the interface for pool operations needed by metrics.
Expand Down Expand Up @@ -152,6 +156,7 @@ func (c *Collector) collectAll(ctx context.Context) {
c.collectCloneMetrics(ctx)
c.collectSnapshotMetrics()
c.collectBranchMetrics()
c.collectSyncMetrics(ctx)

c.metrics.ScrapeDurationSeconds.Set(time.Since(start).Seconds())
c.metrics.ScrapeSuccessTimestamp.Set(float64(time.Now().Unix()))
Expand Down Expand Up @@ -564,3 +569,57 @@ func (c *Collector) collectBranchMetrics() {

c.metrics.BranchesTotal.Set(float64(totalBranches))
}

func (c *Collector) collectSyncMetrics(ctx context.Context) {
if c.retrieval.GetRetrievalMode() != models.Physical {
return
}

syncState, err := c.retrieval.ReportSyncStatus(ctx)
if err != nil {
log.Dbg("failed to get sync status for metrics:", err)
c.metrics.ScrapeErrorsTotal.Inc()
c.setSyncStatusNotAvailable()

return
}

if syncState == nil {
c.setSyncStatusNotAvailable()

return
}

c.metrics.SyncStatus.Reset()
c.metrics.SyncStatus.WithLabelValues(string(syncState.Status.Code)).Set(1)
c.metrics.SyncWALLagSeconds.Set(float64(syncState.ReplicationLag))
c.metrics.SyncUptimeSeconds.Set(float64(syncState.ReplicationUptime))

if ts := parseTimestamp(syncState.LastReplayedLsnAt); ts != nil {
c.metrics.SyncLastReplayedAt.Set(float64(ts.Unix()))
}
}

func (c *Collector) setSyncStatusNotAvailable() {
c.metrics.SyncStatus.Reset()
c.metrics.SyncStatus.WithLabelValues(string(models.SyncStatusNotAvailable)).Set(1)
c.metrics.SyncUptimeSeconds.Set(0)
}

func parseTimestamp(value string) *time.Time {
if value == "" {
return nil
}

formats := []string{time.RFC3339Nano, postgresTimestampFormat}

for _, format := range formats {
if ts, err := time.Parse(format, value); err == nil {
return &ts
}
}

log.Dbg("failed to parse timestamp:", value)

return nil
}
177 changes: 177 additions & 0 deletions engine/internal/srv/metrics/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/docker/docker/api/types/container"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
dto "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand All @@ -36,6 +37,10 @@ func (m *mockRetrievalService) GetRetrievalMode() models.RetrievalMode { return

func (m *mockRetrievalService) GetRetrievalStatus() models.RetrievalStatus { return models.Inactive }

func (m *mockRetrievalService) ReportSyncStatus(_ context.Context) (*models.Sync, error) {
return nil, nil
}

type mockPoolService struct{}

func (m *mockPoolService) GetFSManagerList() []pool.FSManager { return nil }
Expand Down Expand Up @@ -446,3 +451,175 @@ func TestCalculateCPUPercent_Concurrent(t *testing.T) {

assert.Len(t, c.prevCPUStats, cloneCount)
}

type configMockRetrievalService struct {
mode models.RetrievalMode
syncStatus *models.Sync
syncErr error
}

func (m *configMockRetrievalService) GetRetrievalMode() models.RetrievalMode { return m.mode }

func (m *configMockRetrievalService) GetRetrievalStatus() models.RetrievalStatus {
return models.Inactive
}
func (m *configMockRetrievalService) ReportSyncStatus(_ context.Context) (*models.Sync, error) {
return m.syncStatus, m.syncErr
}

func TestCollectSyncMetrics(t *testing.T) {
t.Run("non-physical mode skips collection", func(t *testing.T) {
m := NewMetrics()
retrieval := &configMockRetrievalService{mode: models.Logical}
c := &Collector{metrics: m, retrieval: retrieval, prevCPUStats: make(map[string]containerCPUState)}

c.collectSyncMetrics(context.Background())

assert.Equal(t, float64(0), getGaugeValue(m.SyncWALLagSeconds))
assert.Equal(t, float64(0), getGaugeValue(m.SyncUptimeSeconds))
})

t.Run("error from ReportSyncStatus sets not available status", func(t *testing.T) {
m := NewMetrics()
retrieval := &configMockRetrievalService{mode: models.Physical, syncErr: fmt.Errorf("connection failed")}
c := &Collector{metrics: m, retrieval: retrieval, prevCPUStats: make(map[string]containerCPUState)}

initialErrors := getCounterValue(m.ScrapeErrorsTotal)
c.collectSyncMetrics(context.Background())

assert.Equal(t, initialErrors+1, getCounterValue(m.ScrapeErrorsTotal))
assert.Equal(t, float64(0), getGaugeValue(m.SyncUptimeSeconds))
assert.Equal(t, float64(1), getGaugeVecValue(m.SyncStatus, string(models.SyncStatusNotAvailable)))
})

t.Run("nil sync status sets not available status and resets uptime", func(t *testing.T) {
m := NewMetrics()
retrieval := &configMockRetrievalService{mode: models.Physical, syncStatus: nil}
c := &Collector{metrics: m, retrieval: retrieval, prevCPUStats: make(map[string]containerCPUState)}

c.collectSyncMetrics(context.Background())

assert.Equal(t, float64(0), getGaugeValue(m.SyncWALLagSeconds))
assert.Equal(t, float64(0), getGaugeValue(m.SyncUptimeSeconds))
assert.Equal(t, float64(1), getGaugeVecValue(m.SyncStatus, string(models.SyncStatusNotAvailable)))
})

t.Run("successful collection sets all metrics", func(t *testing.T) {
m := NewMetrics()
syncStatus := &models.Sync{
Status: models.Status{Code: models.StatusOK},
ReplicationLag: 120,
ReplicationUptime: 3600,
LastReplayedLsnAt: "2025-01-15T10:30:00.123456789Z",
}
retrieval := &configMockRetrievalService{mode: models.Physical, syncStatus: syncStatus}
c := &Collector{metrics: m, retrieval: retrieval, prevCPUStats: make(map[string]containerCPUState)}

c.collectSyncMetrics(context.Background())

assert.Equal(t, float64(120), getGaugeValue(m.SyncWALLagSeconds))
assert.Equal(t, float64(3600), getGaugeValue(m.SyncUptimeSeconds))
assert.Equal(t, float64(1), getGaugeVecValue(m.SyncStatus, "OK"))
assert.Greater(t, getGaugeValue(m.SyncLastReplayedAt), float64(0))
})

t.Run("postgres timestamp format is parsed correctly", func(t *testing.T) {
m := NewMetrics()
syncStatus := &models.Sync{
Status: models.Status{Code: models.StatusOK},
ReplicationLag: 60,
ReplicationUptime: 1800,
LastReplayedLsnAt: "2025-01-15 10:30:00.123456+00",
}
retrieval := &configMockRetrievalService{mode: models.Physical, syncStatus: syncStatus}
c := &Collector{metrics: m, retrieval: retrieval, prevCPUStats: make(map[string]containerCPUState)}

c.collectSyncMetrics(context.Background())

assert.Equal(t, float64(60), getGaugeValue(m.SyncWALLagSeconds))
assert.Equal(t, float64(1800), getGaugeValue(m.SyncUptimeSeconds))
assert.Greater(t, getGaugeValue(m.SyncLastReplayedAt), float64(0))
})

t.Run("empty timestamp leaves metric unchanged", func(t *testing.T) {
m := NewMetrics()
syncStatus := &models.Sync{
Status: models.Status{Code: models.StatusOK},
ReplicationLag: 30,
ReplicationUptime: 900,
LastReplayedLsnAt: "",
}
retrieval := &configMockRetrievalService{mode: models.Physical, syncStatus: syncStatus}
c := &Collector{metrics: m, retrieval: retrieval, prevCPUStats: make(map[string]containerCPUState)}

c.collectSyncMetrics(context.Background())

assert.Equal(t, float64(30), getGaugeValue(m.SyncWALLagSeconds))
assert.Equal(t, float64(900), getGaugeValue(m.SyncUptimeSeconds))
assert.Equal(t, float64(0), getGaugeValue(m.SyncLastReplayedAt))
})

t.Run("unparseable timestamp leaves metric unchanged", func(t *testing.T) {
m := NewMetrics()
syncStatus := &models.Sync{
Status: models.Status{Code: models.StatusOK},
ReplicationLag: 45,
ReplicationUptime: 1200,
LastReplayedLsnAt: "invalid-timestamp",
}
retrieval := &configMockRetrievalService{mode: models.Physical, syncStatus: syncStatus}
c := &Collector{metrics: m, retrieval: retrieval, prevCPUStats: make(map[string]containerCPUState)}

c.collectSyncMetrics(context.Background())

assert.Equal(t, float64(45), getGaugeValue(m.SyncWALLagSeconds))
assert.Equal(t, float64(1200), getGaugeValue(m.SyncUptimeSeconds))
assert.Equal(t, float64(0), getGaugeValue(m.SyncLastReplayedAt))
})
}

func getGaugeValue(g prometheus.Gauge) float64 {
ch := make(chan prometheus.Metric, 1)
g.Collect(ch)
select {
case m := <-ch:
var metric dto.Metric
_ = m.Write(&metric)
if metric.Gauge != nil {
return *metric.Gauge.Value
}
default:
}
return 0
}

func getGaugeVecValue(g *prometheus.GaugeVec, label string) float64 {
ch := make(chan prometheus.Metric, 10)
g.Collect(ch)
close(ch)
for m := range ch {
var metric dto.Metric
_ = m.Write(&metric)
for _, lp := range metric.Label {
if lp.GetValue() == label && metric.Gauge != nil {
return *metric.Gauge.Value
}
}
}
return 0
}

func getCounterValue(c prometheus.Counter) float64 {
ch := make(chan prometheus.Metric, 1)
c.Collect(ch)
select {
case m := <-ch:
var metric dto.Metric
_ = m.Write(&metric)
if metric.Counter != nil {
return *metric.Counter.Value
}
default:
}
return 0
}
Loading
Loading