Skip to content

Commit 608677a

Browse files
committed
WIP metrics registry
This commit specifically doesn't fix most tests as this takes a lot of time and also doesn't do a lot of other stuff: 1. breaking metrics in other ways as that might be useful to be kept for a little bit at least 2. also doesn't try to do anything that isn't absolutely necessary to make the registry work 3. doesnt move the metrics stuff around, although I would expect the Registry and the Metric type will end up in a `metrics` package later on ...
1 parent 2039c56 commit 608677a

21 files changed

+442
-192
lines changed

cmd/run.go

+65-6
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,10 @@ import (
4949
"go.k6.io/k6/js"
5050
"go.k6.io/k6/lib"
5151
"go.k6.io/k6/lib/consts"
52+
"go.k6.io/k6/lib/metrics"
5253
"go.k6.io/k6/loader"
54+
"go.k6.io/k6/output"
55+
"go.k6.io/k6/stats"
5356
"go.k6.io/k6/ui/pb"
5457
)
5558

@@ -153,10 +156,6 @@ a commandline interface for interacting with it.`,
153156
// can start winding down its metrics processing.
154157
globalCtx, globalCancel := context.WithCancel(ctx)
155158
defer globalCancel()
156-
lingerCtx, lingerCancel := context.WithCancel(globalCtx)
157-
defer lingerCancel()
158-
runCtx, runCancel := context.WithCancel(lingerCtx)
159-
defer runCancel()
160159

161160
// Create a local execution scheduler wrapping the runner.
162161
logger.Debug("Initializing the execution scheduler...")
@@ -197,6 +196,16 @@ a commandline interface for interacting with it.`,
197196
return err
198197
}
199198

199+
registry := stats.NewRegistry(engine.Samples)
200+
builtInMetrics := metrics.RegisterBuiltinMetrics(registry)
201+
globalCtx = stats.WithRegistry(globalCtx, registry)
202+
globalCtx = metrics.WithBuiltinMetrics(globalCtx, builtInMetrics)
203+
204+
lingerCtx, lingerCancel := context.WithCancel(globalCtx)
205+
defer lingerCancel()
206+
runCtx, runCancel := context.WithCancel(lingerCtx)
207+
defer runCancel()
208+
200209
// Spin up the REST API server, if not disabled.
201210
if address != "" {
202211
initBar.Modify(pb.WithConstProgress(0, "Init API server"))
@@ -216,11 +225,11 @@ a commandline interface for interacting with it.`,
216225

217226
// We do this here so we can get any output URLs below.
218227
initBar.Modify(pb.WithConstProgress(0, "Starting outputs"))
219-
err = engine.StartOutputs()
228+
err = StartOutputs(logger, outputs, engine, builtInMetrics)
220229
if err != nil {
221230
return err
222231
}
223-
defer engine.StopOutputs()
232+
defer StopOutputs(logger, outputs)
224233

225234
printExecutionDescription(
226235
"local", filename, "", conf, execScheduler.GetState().ExecutionTuple,
@@ -442,3 +451,53 @@ func handleSummaryResult(fs afero.Fs, stdOut, stdErr io.Writer, result map[strin
442451

443452
return consolidateErrorMessage(errs, "Could not save some summary information:")
444453
}
454+
455+
// StartOutputs spins up all configured outputs, giving the thresholds to any
456+
// that can accept them. And if some output fails, stop the already started
457+
// ones. This may take some time, since some outputs make initial network
458+
// requests to set up whatever remote services are going to listen to them.
459+
func StartOutputs(
460+
logger logrus.FieldLogger,
461+
outputs []output.Output,
462+
engine *core.Engine,
463+
builtinMetrics *metrics.BuiltInMetrics,
464+
) error {
465+
logger.Debugf("Starting %d outputs...", len(outputs))
466+
for i, out := range outputs {
467+
if thresholdOut, ok := out.(output.WithThresholds); ok {
468+
thresholdOut.SetThresholds(engine.Options.Thresholds)
469+
}
470+
471+
if stopOut, ok := out.(output.WithTestRunStop); ok {
472+
stopOut.SetTestRunStopCallback(
473+
func(err error) {
474+
logger.WithError(err).Error("Received error to stop from output")
475+
engine.Stop()
476+
})
477+
}
478+
479+
if builtinMetricOut, ok := out.(output.WithBuiltinMetrics); ok {
480+
builtinMetricOut.SetBuiltinMetrics(builtinMetrics)
481+
}
482+
483+
if err := out.Start(); err != nil {
484+
stopOutputs(logger, outputs, i)
485+
return err
486+
}
487+
}
488+
return nil
489+
}
490+
491+
// StopOutputs stops all configured outputs.
492+
func StopOutputs(logger logrus.FieldLogger, outputs []output.Output) {
493+
stopOutputs(logger, outputs, len(outputs))
494+
}
495+
496+
func stopOutputs(logger logrus.FieldLogger, outputs []output.Output, upToID int) {
497+
logger.Debugf("Stopping %d outputs...", upToID)
498+
for i := 0; i < upToID; i++ {
499+
if err := outputs[i].Stop(); err != nil {
500+
logger.WithError(err).Errorf("Stopping output %d failed", i)
501+
}
502+
}
503+
}

core/engine.go

+9-3
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,8 @@ type Engine struct {
6767
Metrics map[string]*stats.Metric
6868
MetricsLock sync.Mutex
6969

70-
Samples chan stats.SampleContainer
70+
builtInMetrics *metrics.BuiltInMetrics
71+
Samples chan stats.SampleContainer
7172

7273
// Assigned to metrics upon first received sample.
7374
thresholds map[string]stats.Thresholds
@@ -147,6 +148,10 @@ func (e *Engine) StartOutputs() error {
147148
})
148149
}
149150

151+
if builtinMetricOut, ok := out.(output.WithBuiltinMetrics); ok {
152+
builtinMetricOut.SetBuiltinMetrics(e.builtInMetrics)
153+
}
154+
150155
if err := out.Start(); err != nil {
151156
e.stopOutputs(i)
152157
return err
@@ -184,6 +189,7 @@ func (e *Engine) stopOutputs(upToID int) {
184189
// returned by cancelling the globalCtx
185190
// - The second returned lambda can be used to wait for that process to finish.
186191
func (e *Engine) Init(globalCtx, runCtx context.Context) (run func() error, wait func(), err error) {
192+
e.builtInMetrics = metrics.GetBuiltInMetrics(globalCtx)
187193
e.logger.Debug("Initialization starting...")
188194
// TODO: if we ever need metrics processing in the init context, we can move
189195
// this below the other components... or even start them concurrently?
@@ -416,12 +422,12 @@ func (e *Engine) emitMetrics() {
416422
Samples: []stats.Sample{
417423
{
418424
Time: t,
419-
Metric: metrics.VUs,
425+
Metric: e.builtInMetrics.VUs,
420426
Value: float64(executionState.GetCurrentlyActiveVUsCount()),
421427
Tags: e.Options.RunTags,
422428
}, {
423429
Time: t,
424-
Metric: metrics.VUsMax,
430+
Metric: e.builtInMetrics.VUsMax,
425431
Value: float64(executionState.GetInitializedVUsCount()),
426432
Tags: e.Options.RunTags,
427433
},

js/modules/k6/grpc/client.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -493,7 +493,7 @@ func (c *Client) HandleRPC(ctx context.Context, stat grpcstats.RPCStats) {
493493
stats.PushIfNotDone(ctx, state.Samples, stats.ConnectedSamples{
494494
Samples: []stats.Sample{
495495
{
496-
Metric: metrics.GRPCReqDuration,
496+
Metric: metrics.GetBuiltInMetrics(ctx).GRPCReqDuration,
497497
Tags: sampleTags,
498498
Value: stats.D(s.EndTime.Sub(s.BeginTime)),
499499
Time: s.EndTime,

js/modules/k6/k6.go

+5-3
Original file line numberDiff line numberDiff line change
@@ -101,9 +101,10 @@ func (*K6) Group(ctx context.Context, name string, fn goja.Callable) (goja.Value
101101
t := time.Now()
102102

103103
tags := state.CloneTags()
104+
104105
stats.PushIfNotDone(ctx, state.Samples, stats.Sample{
105106
Time: t,
106-
Metric: metrics.GroupDuration,
107+
Metric: metrics.GetBuiltInMetrics(ctx).GroupDuration,
107108
Tags: stats.IntoSampleTags(&tags),
108109
Value: stats.D(t.Sub(startTime)),
109110
})
@@ -165,12 +166,13 @@ func (*K6) Check(ctx context.Context, arg0, checks goja.Value, extras ...goja.Va
165166
select {
166167
case <-ctx.Done():
167168
default:
169+
builtin := metrics.GetBuiltInMetrics(ctx)
168170
if val.ToBoolean() {
169171
atomic.AddInt64(&check.Passes, 1)
170-
stats.PushIfNotDone(ctx, state.Samples, stats.Sample{Time: t, Metric: metrics.Checks, Tags: sampleTags, Value: 1})
172+
stats.PushIfNotDone(ctx, state.Samples, stats.Sample{Time: t, Metric: builtin.Checks, Tags: sampleTags, Value: 1})
171173
} else {
172174
atomic.AddInt64(&check.Fails, 1)
173-
stats.PushIfNotDone(ctx, state.Samples, stats.Sample{Time: t, Metric: metrics.Checks, Tags: sampleTags, Value: 0})
175+
stats.PushIfNotDone(ctx, state.Samples, stats.Sample{Time: t, Metric: builtin.Checks, Tags: sampleTags, Value: 0})
174176
// A single failure makes the return value false.
175177
succ = false
176178
}

0 commit comments

Comments
 (0)