Skip to content

Commit dcf9f6d

Browse files
authored
refactor(metrics): route all controllers through core/metrics helpers (#162)
## Summary Route every controller and the consumer framework through the `core/metrics` helpers so metric emission is consistent across the codebase. ## What changed **Migrated to `metrics.Begin` / `op.Complete` + `metrics.NamedCounter`:** - RPC controllers: `gateway/controller/land.go`, `orchestrator/controller/ping.go`, `stovepipe/controller/ping.go` - Queue controllers: `orchestrator/controller/{batch,build,start,score,speculate,buildsignal,log}/*.go` - Extension: `extension/mergechecker/github/checker.go` Each migrated method now wraps its work with `op := metrics.Begin(scope, opName); defer func() { op.Complete(retErr) }()`, which emits `{scope}.{op}.called`, `{op}.succeeded` / `{op}.failed`, `{op}.latency`, and `{op}.latency_histogram` with automatic `error_origin`, `retryable`, and `dependency` classification tags via `core/errs`. Sub-event counters (`deserialize_errors`, `storage_errors`, `publish_errors`, …) go through `metrics.NamedCounter` so they share the same sub-scope prefix. **Consumer framework — `core/consumer/consumer.go`:** Migrated to `metrics.NamedCounter` / `metrics.NamedTimer` rather than `Begin`/`Complete` because the framework needs custom `success=true|false|cancel` and `operation=ack|nack` tags that the standard `Op` lifecycle can't express. **Op-name de-duplication:** Each migrated method declares `const opName = "..."` once and references it from every emit (file-level in `speculate.go` where emits span six helper methods). ## Wire-format change Metric names move from flat keys to `{controller_subscope}.{op}.{event}`: | Before | After | | --- | --- | | `land_request_count` | `land_controller.land.called` | | `land_request_latency` | `land_controller.land.latency` | | `received` (per controller) | `{controller}_controller.process.called` | | `processed` | `{controller}_controller.process.succeeded` | | `deserialize_errors` | `{controller}_controller.process.deserialize_errors` | | `messages_received` (consumer) | `consumer.process.messages_received` | | `controller_latency` (consumer) | `consumer.process.controller_latency` | | `ack_count`, `nack_count`, … | `consumer.process.ack_count`, `consumer.process.nack_count`, … | Dashboards keyed on the old flat names will need updating; the `success=true|false|cancel`, `operation=ack|nack`, and per-controller tags on consumer metrics are preserved. ## Test Plan - `make build` — all 135 targets build clean. - `make test` — all 34 test targets pass, including: - `//core/consumer:consumer_test` — verifies the migrated `processDelivery` still emits `controller_latency` (with `success=true|false`) and `ack_count` (substring matches survive the `process.` prefix). - `//extension/mergechecker/github:github_test` - `//gateway/controller:controller_test`, `//orchestrator/controller:controller_test`, `//stovepipe/controller:controller_test` - Per-stage controllers: `batch`, `build`, `buildsignal`, `log`, `score`, `speculate`, `start` - `make gazelle`, `make fmt`, `make mocks`, `make tidy` — re-running on top of the diff produces zero further changes. - `make lint-license` — clean. - No test assertions on the renamed metrics needed updating (`consumer_test.go` uses `strings.Contains`, which still matches the new fully-qualified names). ## Issues None.
1 parent 5f690a7 commit dcf9f6d

24 files changed

Lines changed: 155 additions & 131 deletions

File tree

core/consumer/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ go_library(
1111
visibility = ["//visibility:public"],
1212
deps = [
1313
"//core/errs",
14+
"//core/metrics",
1415
"//entity/queue",
1516
"//extension/queue",
1617
"@com_github_uber_go_tally_v4//:tally",

core/consumer/consumer.go

Lines changed: 25 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323

2424
"github.com/uber-go/tally/v4"
2525
"github.com/uber/submitqueue/core/errs"
26+
"github.com/uber/submitqueue/core/metrics"
2627
"github.com/uber/submitqueue/extension/queue"
2728
"go.uber.org/zap"
2829
)
@@ -329,8 +330,10 @@ func (m *consumer) processPartition(ctx context.Context, controller Controller,
329330

330331
// processDelivery calls the controller and performs ack/nack based on the result.
331332
func (m *consumer) processDelivery(ctx context.Context, controller Controller, delivery queue.Delivery, controllerScope tally.Scope) {
333+
const opName = "process"
334+
332335
start := time.Now()
333-
controllerScope.Counter("messages_received").Inc(1)
336+
metrics.NamedCounter(controllerScope, opName, "messages_received", 1)
334337

335338
msg := delivery.Message()
336339
topicKey := controller.TopicKey()
@@ -364,10 +367,7 @@ func (m *consumer) processDelivery(ctx context.Context, controller Controller, d
364367
}
365368
}
366369

367-
latencyScope := controllerScope.Tagged(map[string]string{
368-
"success": successTag,
369-
})
370-
latencyScope.Timer("controller_latency").Record(elapsed)
370+
metrics.NamedTimer(controllerScope, opName, "controller_latency", elapsed, metrics.NewTag("success", successTag))
371371

372372
if err != nil {
373373
// Check if the error is non-retryable (poison pill message)
@@ -382,7 +382,7 @@ func (m *consumer) processDelivery(ctx context.Context, controller Controller, d
382382
"elapsed_ms", elapsed.Milliseconds(),
383383
)
384384

385-
controllerScope.Counter("non_retryable_errors").Inc(1)
385+
metrics.NamedCounter(controllerScope, opName, "non_retryable_errors", 1)
386386

387387
// Reject moves to DLQ (or acks if DLQ disabled)
388388
if rejectErr := delivery.Reject(ctx, err.Error()); rejectErr != nil {
@@ -392,7 +392,7 @@ func (m *consumer) processDelivery(ctx context.Context, controller Controller, d
392392
"message_id", msg.ID,
393393
"error", rejectErr,
394394
)
395-
controllerScope.Counter("reject_errors").Inc(1)
395+
metrics.NamedCounter(controllerScope, opName, "reject_errors", 1)
396396
}
397397
return
398398
}
@@ -414,7 +414,7 @@ func (m *consumer) processDelivery(ctx context.Context, controller Controller, d
414414
"elapsed_ms", elapsed.Milliseconds(),
415415
)
416416

417-
controllerScope.Counter("controller_errors").Inc(1)
417+
metrics.NamedCounter(controllerScope, opName, "controller_errors", 1)
418418

419419
// Nack with no delay - let visibility timeout handle retry delay
420420
nackStart := time.Now()
@@ -425,14 +425,13 @@ func (m *consumer) processDelivery(ctx context.Context, controller Controller, d
425425
"message_id", msg.ID,
426426
"error", nackErr,
427427
)
428-
controllerScope.Counter("nack_errors").Inc(1)
428+
metrics.NamedCounter(controllerScope, opName, "nack_errors", 1)
429429
} else {
430-
controllerScope.Counter("nack_count").Inc(1)
431-
nackScope := controllerScope.Tagged(map[string]string{
432-
"operation": "nack",
433-
"success": "true",
434-
})
435-
nackScope.Timer("ack_nack_latency").Record(time.Since(nackStart))
430+
metrics.NamedCounter(controllerScope, opName, "nack_count", 1)
431+
metrics.NamedTimer(controllerScope, opName, "ack_nack_latency", time.Since(nackStart),
432+
metrics.NewTag("operation", "nack"),
433+
metrics.NewTag("success", "true"),
434+
)
436435
}
437436
return
438437
}
@@ -446,23 +445,20 @@ func (m *consumer) processDelivery(ctx context.Context, controller Controller, d
446445
"message_id", msg.ID,
447446
"error", ackErr,
448447
)
449-
controllerScope.Counter("ack_errors").Inc(1)
450-
ackScope := controllerScope.Tagged(map[string]string{
451-
"operation": "ack",
452-
"success": "false",
453-
})
454-
ackScope.Timer("ack_nack_latency").Record(time.Since(ackStart))
448+
metrics.NamedCounter(controllerScope, opName, "ack_errors", 1)
449+
metrics.NamedTimer(controllerScope, opName, "ack_nack_latency", time.Since(ackStart),
450+
metrics.NewTag("operation", "ack"),
451+
metrics.NewTag("success", "false"),
452+
)
455453
return
456454
}
457455

458-
controllerScope.Counter("messages_processed").Inc(1)
459-
controllerScope.Counter("ack_count").Inc(1)
460-
461-
ackScope := controllerScope.Tagged(map[string]string{
462-
"operation": "ack",
463-
"success": "true",
464-
})
465-
ackScope.Timer("ack_nack_latency").Record(time.Since(ackStart))
456+
metrics.NamedCounter(controllerScope, opName, "messages_processed", 1)
457+
metrics.NamedCounter(controllerScope, opName, "ack_count", 1)
458+
metrics.NamedTimer(controllerScope, opName, "ack_nack_latency", time.Since(ackStart),
459+
metrics.NewTag("operation", "ack"),
460+
metrics.NewTag("success", "true"),
461+
)
466462

467463
m.logger.Debugw("message processed successfully",
468464
"controller", controller.Name(),

extension/mergechecker/github/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ go_library(
1010
importpath = "github.com/uber/submitqueue/extension/mergechecker/github",
1111
visibility = ["//visibility:public"],
1212
deps = [
13+
"//core/metrics",
1314
"//entity",
1415
"//entity/github",
1516
"//extension/mergechecker",

extension/mergechecker/github/checker.go

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"net/http"
2424

2525
"github.com/uber-go/tally/v4"
26+
"github.com/uber/submitqueue/core/metrics"
2627
"github.com/uber/submitqueue/entity"
2728
entitygithub "github.com/uber/submitqueue/entity/github"
2829
"github.com/uber/submitqueue/extension/mergechecker"
@@ -60,17 +61,19 @@ func NewMergeChecker(params Params) mergechecker.MergeChecker {
6061
}
6162

6263
// Check assesses whether a change can merge cleanly using the GitHub GraphQL API.
63-
func (c *mergeChecker) Check(ctx context.Context, queue string, change entity.Change) (mergechecker.Result, error) {
64-
c.metricsScope.Counter("check_started").Inc(1)
64+
func (c *mergeChecker) Check(ctx context.Context, queue string, change entity.Change) (result mergechecker.Result, retErr error) {
65+
const opName = "check"
66+
67+
op := metrics.Begin(c.metricsScope, opName)
68+
defer func() { op.Complete(retErr) }()
6569

66-
result := mergechecker.Result{}
6770
// Parse all change IDs
6871
// TODO: classify parse errors as user errors (non-retryable) vs system errors.
6972
changeIDs := make([]entitygithub.ChangeID, 0, len(change.URIs))
7073
for _, rawID := range change.URIs {
7174
cid, err := entitygithub.ParseChangeID(rawID)
7275
if err != nil {
73-
c.metricsScope.Counter("parse_errors").Inc(1)
76+
metrics.NamedCounter(c.metricsScope, opName, "parse_errors", 1)
7477
return result, fmt.Errorf("failed to parse change ID %q: %w", rawID, err)
7578
}
7679
changeIDs = append(changeIDs, cid)
@@ -79,26 +82,26 @@ func (c *mergeChecker) Check(ctx context.Context, queue string, change entity.Ch
7982
// Fetch PR info from GitHub GraphQL API
8083
prInfoMap, err := c.fetchPRInfo(ctx, changeIDs)
8184
if err != nil {
82-
c.metricsScope.Counter("graphql_errors").Inc(1)
85+
metrics.NamedCounter(c.metricsScope, opName, "graphql_errors", 1)
8386
return result, fmt.Errorf("failed to fetch PR info: %w", err)
8487
}
8588

8689
// Validate PR mergeability
8790
mergeable, reason, err := validatePRs(changeIDs, prInfoMap)
8891
if err != nil {
89-
c.metricsScope.Counter("validation_errors").Inc(1)
92+
metrics.NamedCounter(c.metricsScope, opName, "validation_errors", 1)
9093
return result, err
9194
}
9295

9396
if !mergeable {
94-
c.metricsScope.Counter("not_mergeable").Inc(1)
97+
metrics.NamedCounter(c.metricsScope, opName, "not_mergeable", 1)
9598
c.logger.Infow("change not mergeable",
9699
"queue", queue,
97100
"reason", reason,
98101
"change_uris", change.URIs,
99102
)
100103
} else {
101-
c.metricsScope.Counter("mergeable").Inc(1)
104+
metrics.NamedCounter(c.metricsScope, opName, "mergeable", 1)
102105
}
103106

104107
result.Mergeable = mergeable

gateway/controller/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ go_library(
1111
deps = [
1212
"//core/consumer",
1313
"//core/errs",
14+
"//core/metrics",
1415
"//entity",
1516
"//entity/queue",
1617
"//extension/counter",

gateway/controller/land.go

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,11 @@ import (
1818
"context"
1919
"errors"
2020
"fmt"
21-
"time"
2221

2322
"github.com/uber-go/tally/v4"
2423
"github.com/uber/submitqueue/core/consumer"
2524
"github.com/uber/submitqueue/core/errs"
25+
"github.com/uber/submitqueue/core/metrics"
2626
"github.com/uber/submitqueue/entity"
2727
"github.com/uber/submitqueue/entity/queue"
2828
"github.com/uber/submitqueue/extension/counter"
@@ -75,7 +75,7 @@ type LandController struct {
7575
func NewLandController(logger *zap.SugaredLogger, scope tally.Scope, counter counter.Counter, requestLogStore storage.RequestLogStore, queueConfigs queueconfig.Store, registry consumer.TopicRegistry) *LandController {
7676
return &LandController{
7777
logger: logger,
78-
metricsScope: scope,
78+
metricsScope: scope.SubScope("land_controller"),
7979
counter: counter,
8080
requestLogStore: requestLogStore,
8181
queueConfigs: queueConfigs,
@@ -84,13 +84,11 @@ func NewLandController(logger *zap.SugaredLogger, scope tally.Scope, counter cou
8484
}
8585

8686
// Land handles the land request and returns a response
87-
func (c *LandController) Land(ctx context.Context, req *pb.LandRequest) (*pb.LandResponse, error) {
88-
start := time.Now()
89-
defer func() {
90-
c.metricsScope.Timer("land_request_latency").Record(time.Since(start))
91-
}()
87+
func (c *LandController) Land(ctx context.Context, req *pb.LandRequest) (resp *pb.LandResponse, retErr error) {
88+
const opName = "land"
9289

93-
c.metricsScope.Counter("land_request_count").Inc(1)
90+
op := metrics.Begin(c.metricsScope, opName)
91+
defer func() { op.Complete(retErr) }()
9492

9593
// Validate required fields.
9694
if req.Queue == "" {
@@ -157,7 +155,7 @@ func (c *LandController) Land(ctx context.Context, req *pb.LandRequest) (*pb.Lan
157155
"sqid", landRequest.ID,
158156
"topic_key", consumer.TopicKeyStart,
159157
)
160-
c.metricsScope.Counter("publish_success").Inc(1)
158+
metrics.NamedCounter(c.metricsScope, opName, "publish_success", 1)
161159

162160
return &pb.LandResponse{
163161
Sqid: landRequest.ID,

orchestrator/controller/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ go_library(
66
importpath = "github.com/uber/submitqueue/orchestrator/controller",
77
visibility = ["//visibility:public"],
88
deps = [
9+
"//core/metrics",
910
"//orchestrator/protopb",
1011
"@com_github_uber_go_tally_v4//:tally",
1112
"@org_uber_go_zap//:zap",

orchestrator/controller/batch/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ go_library(
77
visibility = ["//visibility:public"],
88
deps = [
99
"//core/consumer",
10+
"//core/metrics",
1011
"//entity",
1112
"//entity/queue",
1213
"//extension/conflict",

orchestrator/controller/batch/batch.go

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020

2121
"github.com/uber-go/tally/v4"
2222
"github.com/uber/submitqueue/core/consumer"
23+
"github.com/uber/submitqueue/core/metrics"
2324
"github.com/uber/submitqueue/entity"
2425
entityqueue "github.com/uber/submitqueue/entity/queue"
2526
"github.com/uber/submitqueue/extension/conflict"
@@ -71,22 +72,25 @@ func NewController(
7172
// Process processes a batch delivery from the queue.
7273
// Deserializes the request, groups into batch, and publishes to the score topic.
7374
// Returns nil to ack (success), or error to nack (retry).
74-
func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) error {
75-
c.metricsScope.Counter("received").Inc(1)
75+
func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (retErr error) {
76+
const opName = "process"
77+
78+
op := metrics.Begin(c.metricsScope, opName)
79+
defer func() { op.Complete(retErr) }()
7680

7781
msg := delivery.Message()
7882

7983
// Deserialize request ID from payload
8084
rid, err := entity.RequestIDFromBytes(msg.Payload)
8185
if err != nil {
82-
c.metricsScope.Counter("deserialize_errors").Inc(1)
86+
metrics.NamedCounter(c.metricsScope, opName, "deserialize_errors", 1)
8387
return fmt.Errorf("failed to deserialize request ID: %w", err)
8488
}
8589

8690
// Fetch request from storage
8791
request, err := c.store.GetRequestStore().Get(ctx, rid.ID)
8892
if err != nil {
89-
c.metricsScope.Counter("storage_errors").Inc(1)
93+
metrics.NamedCounter(c.metricsScope, opName, "storage_errors", 1)
9094
return fmt.Errorf("failed to get request %s: %w", rid.ID, err)
9195
}
9296

@@ -104,7 +108,7 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er
104108
// Generate a globally unique batch ID.
105109
seq, err := c.counter.Next(ctx, "batch/"+request.Queue)
106110
if err != nil {
107-
c.metricsScope.Counter("counter_errors").Inc(1)
111+
metrics.NamedCounter(c.metricsScope, opName, "counter_errors", 1)
108112
return fmt.Errorf("failed to generate batch ID for queue=%s: %w", request.Queue, err)
109113
}
110114

@@ -125,7 +129,7 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er
125129
entity.BatchStateMerging,
126130
})
127131
if err != nil {
128-
c.metricsScope.Counter("batch_store_errors").Inc(1)
132+
metrics.NamedCounter(c.metricsScope, opName, "batch_store_errors", 1)
129133
return fmt.Errorf("failed to get active batches for queue=%s: %w", request.Queue, err)
130134
}
131135

@@ -134,7 +138,7 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er
134138
// apply; the dependency graph only tracks the relation.
135139
conflicts, err := c.analyzer.Analyze(ctx, batch, activeBatches)
136140
if err != nil {
137-
c.metricsScope.Counter("conflict_analyzer_errors").Inc(1)
141+
metrics.NamedCounter(c.metricsScope, opName, "conflict_analyzer_errors", 1)
138142
return fmt.Errorf("failed to analyze conflicts for batchID=%s: %w", batch.ID, err)
139143
}
140144

@@ -155,15 +159,15 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er
155159
for _, depID := range conflictingIDs {
156160
existing, err := c.store.GetBatchDependentStore().Get(ctx, depID)
157161
if err != nil {
158-
c.metricsScope.Counter("batch_dependent_store_errors").Inc(1)
162+
metrics.NamedCounter(c.metricsScope, opName, "batch_dependent_store_errors", 1)
159163
return fmt.Errorf("failed to get batch dependent for batchID=%s: %w", depID, err)
160164
}
161165

162166
dependents := append(existing.Dependents, batch.ID)
163167

164168
newVersion := existing.Version + 1
165169
if err := c.store.GetBatchDependentStore().UpdateDependents(ctx, depID, existing.Version, newVersion, dependents); err != nil {
166-
c.metricsScope.Counter("batch_dependent_store_errors").Inc(1)
170+
metrics.NamedCounter(c.metricsScope, opName, "batch_dependent_store_errors", 1)
167171
return fmt.Errorf("failed to update batch dependent index for existing batchID=%s and new batchID=%s: %w", depID, batch.ID, err)
168172
}
169173
}
@@ -176,15 +180,15 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er
176180
}
177181

178182
if err := c.store.GetBatchDependentStore().Create(ctx, bd); err != nil {
179-
c.metricsScope.Counter("batch_dependent_store_errors").Inc(1)
183+
metrics.NamedCounter(c.metricsScope, opName, "batch_dependent_store_errors", 1)
180184
return fmt.Errorf("failed to create batch dependent index for new batchID=%s: %w", batch.ID, err)
181185
}
182186

183187
// Persist batch to storage.
184188
// This is the final operation that concludes the batch creation process. If it fails, BatchDependents will be pointing to a batch id that does not exist.
185189
// We do not reuse batch ids, a retry of this operation will create a new batch with a new ID. The downstream logic that operates on BatchDependent should be able to handle stale entries.
186190
if err := c.store.GetBatchStore().Create(ctx, batch); err != nil {
187-
c.metricsScope.Counter("batch_store_errors").Inc(1)
191+
metrics.NamedCounter(c.metricsScope, opName, "batch_store_errors", 1)
188192
return fmt.Errorf("failed to create batch in batch store: %w", err)
189193
}
190194

@@ -199,7 +203,7 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er
199203
// If it fails and the controller retries, a new batch will be created with the new batch ID but the same request ID.
200204
// The downstream logic should be able to handle stale entries by looking at the state of the batch.
201205
if err := c.publish(ctx, consumer.TopicKeyScore, batch.ID, batch.Queue); err != nil {
202-
c.metricsScope.Counter("publish_errors").Inc(1)
206+
metrics.NamedCounter(c.metricsScope, opName, "publish_errors", 1)
203207
return fmt.Errorf("failed to publish batch ID to score topic: %w", err)
204208
}
205209

@@ -208,8 +212,6 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er
208212
"topic_key", consumer.TopicKeyScore,
209213
)
210214

211-
c.metricsScope.Counter("processed").Inc(1)
212-
213215
return nil // Success - message will be acked
214216
}
215217

orchestrator/controller/build/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ go_library(
77
visibility = ["//visibility:public"],
88
deps = [
99
"//core/consumer",
10+
"//core/metrics",
1011
"//entity",
1112
"//entity/queue",
1213
"//extension/storage",

0 commit comments

Comments
 (0)