Skip to content

Commit 9c5dbc9

Browse files
committed
Add custom TRACE level encoders and migrate verbose Debug logs
- Add custom level encoders (uppercase, lowercase, color) to display "TRACE" instead of "LEVEL(-8)" - Implement test logger with proper TRACE level display for zaptest compatibility - Add SugaredLogger chaining support (With/Named) to maintain custom type - Migrate 25 verbose Debug logs to Trace level across runner, server, service, webhook - Keep important Debug logs for actual debugging (errors, state changes, subprocess management) - Verbose implementation details now at Trace: file operations, response watchers, shutdown flow
1 parent f621f45 commit 9c5dbc9

File tree

8 files changed

+127
-45
lines changed

8 files changed

+127
-45
lines changed

internal/logging/logger.go

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,26 @@ const (
1414
TraceLevel = zapcore.Level(-8) // Below Debug (-4)
1515
)
1616

17+
// customLowercaseLevelEncoder handles our custom Trace level display (lowercase)
18+
func customLowercaseLevelEncoder(level zapcore.Level, enc zapcore.PrimitiveArrayEncoder) {
19+
switch level {
20+
case TraceLevel:
21+
enc.AppendString("trace")
22+
default:
23+
zapcore.LowercaseLevelEncoder(level, enc)
24+
}
25+
}
26+
27+
// customColorLevelEncoder handles our custom Trace level display (with colors)
28+
func customColorLevelEncoder(level zapcore.Level, enc zapcore.PrimitiveArrayEncoder) {
29+
switch level {
30+
case TraceLevel:
31+
enc.AppendString("\x1b[90mTRACE\x1b[0m") // Gray color for trace
32+
default:
33+
zapcore.CapitalColorLevelEncoder(level, enc)
34+
}
35+
}
36+
1737
// Logger embeds zap.Logger and adds Trace level support
1838
type Logger struct {
1939
*zap.Logger
@@ -34,11 +54,11 @@ func New(name string) *Logger {
3454
if isDevelopment {
3555
cfg = zap.NewDevelopmentConfig()
3656
cfg.Level = zap.NewAtomicLevelAt(zapcore.DebugLevel)
37-
cfg.EncoderConfig.EncodeLevel = zapcore.CapitalColorLevelEncoder
57+
cfg.EncoderConfig.EncodeLevel = customColorLevelEncoder
3858
} else {
3959
cfg = zap.NewProductionConfig()
4060
cfg.Level = zap.NewAtomicLevelAt(zapcore.InfoLevel)
41-
cfg.EncoderConfig.EncodeLevel = zapcore.LowercaseLevelEncoder
61+
cfg.EncoderConfig.EncodeLevel = customLowercaseLevelEncoder
4262
}
4363

4464
// Set log level from environment (COG_LOG_LEVEL takes precedence, fallback to LOG_LEVEL)
@@ -139,3 +159,13 @@ func (s *SugaredLogger) Trace(args ...any) {
139159
func (s *SugaredLogger) Tracew(msg string, keysAndValues ...any) {
140160
s.Logw(TraceLevel, msg, keysAndValues...)
141161
}
162+
163+
// Override With to return our custom SugaredLogger
164+
func (s *SugaredLogger) With(args ...any) *SugaredLogger {
165+
return &SugaredLogger{SugaredLogger: s.SugaredLogger.With(args...)}
166+
}
167+
168+
// Override Named to return our custom SugaredLogger
169+
func (s *SugaredLogger) Named(name string) *SugaredLogger {
170+
return &SugaredLogger{SugaredLogger: s.SugaredLogger.Named(name)}
171+
}

internal/loggingtest/test_helper.go

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,48 @@ package loggingtest
33
import (
44
"testing"
55

6+
"go.uber.org/zap"
7+
"go.uber.org/zap/zapcore"
68
"go.uber.org/zap/zaptest"
79

810
"github.com/replicate/cog-runtime/internal/logging"
911
)
1012

13+
// customTestLevelEncoder handles our custom Trace level display for tests
14+
func customTestLevelEncoder(level zapcore.Level, enc zapcore.PrimitiveArrayEncoder) {
15+
switch level {
16+
case logging.TraceLevel:
17+
enc.AppendString("TRACE")
18+
default:
19+
zapcore.CapitalLevelEncoder(level, enc)
20+
}
21+
}
22+
1123
// NewTestLogger creates a logger for tests that outputs to t.Logf
1224
// Behaves exactly like zaptest.NewLogger but with trace support added
1325
func NewTestLogger(t *testing.T) *logging.Logger {
1426
t.Helper()
15-
zapLogger := zaptest.NewLogger(t)
27+
28+
// Create test logger with custom level encoder
29+
zapLogger := zaptest.NewLogger(t,
30+
zaptest.Level(logging.TraceLevel),
31+
zaptest.WrapOptions(zap.WrapCore(func(core zapcore.Core) zapcore.Core {
32+
// Replace the encoder to handle our custom trace level
33+
enc := zapcore.NewConsoleEncoder(zapcore.EncoderConfig{
34+
TimeKey: "T",
35+
LevelKey: "L",
36+
NameKey: "N",
37+
CallerKey: "C",
38+
MessageKey: "M",
39+
StacktraceKey: "S",
40+
LineEnding: zapcore.DefaultLineEnding,
41+
EncodeLevel: customTestLevelEncoder,
42+
EncodeTime: zapcore.ISO8601TimeEncoder,
43+
EncodeDuration: zapcore.StringDurationEncoder,
44+
EncodeCaller: zapcore.ShortCallerEncoder,
45+
})
46+
return zapcore.NewCore(enc, zapcore.AddSync(zaptest.NewTestingWriter(t)), logging.TraceLevel)
47+
})),
48+
)
1649
return &logging.Logger{Logger: zapLogger}
1750
}

internal/loggingtest/test_helper_test.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,26 @@ func TestLoggerChaining(t *testing.T) {
6565
optionsLogger.Trace("options trace")
6666
}
6767

68+
func TestSugaredLoggerChaining(t *testing.T) {
69+
logger := NewTestLogger(t)
70+
sugar := logger.Sugar()
71+
72+
// Test With returns our custom SugaredLogger with Trace support
73+
withSugar := sugar.With("component", "test")
74+
withSugar.Trace("trace with sugar chaining")
75+
withSugar.Tracew("tracew with sugar chaining", "key", "value")
76+
77+
// Test Named returns our custom SugaredLogger with Trace support
78+
namedSugar := sugar.Named("child")
79+
namedSugar.Trace("trace with named sugar")
80+
namedSugar.Tracew("tracew with named sugar", "key", "value")
81+
82+
// Test chaining both With and Named
83+
chainedSugar := sugar.With("component", "test").Named("child")
84+
chainedSugar.Trace("trace with full chaining")
85+
chainedSugar.Tracew("tracew with full chaining", "key", "value")
86+
}
87+
6888
func TestTraceLevel(t *testing.T) {
6989
// Verify TraceLevel is below DebugLevel
7090
if logging.TraceLevel >= zapcore.DebugLevel {

internal/runner/manager.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ func (m *Manager) PredictAsync(ctx context.Context, req PredictionRequest) error
176176

177177
runner, err := m.assignReqToRunner(deadlineCtx, req)
178178
if err != nil {
179-
log.Debugw("failed to get runner for async request", "error", err)
179+
log.Tracew("failed to get runner for async request", "error", err)
180180
m.releaseSlot()
181181
return err
182182
}
@@ -197,7 +197,7 @@ func (m *Manager) PredictAsync(ctx context.Context, req PredictionRequest) error
197197

198198
respChan, err := runner.predict(req)
199199
if err != nil {
200-
log.Debugw("failed to predict", "error", err)
200+
log.Tracew("failed to predict", "error", err)
201201
m.releaseSlot()
202202
return err
203203
}
@@ -206,7 +206,7 @@ func (m *Manager) PredictAsync(ctx context.Context, req PredictionRequest) error
206206
go func() {
207207
defer m.releaseSlot() // Release slot after prediction completes
208208
<-respChan // Wait for prediction to complete
209-
log.Debugw("async prediction completed", "prediction_id", req.ID)
209+
log.Tracew("async prediction completed", "prediction_id", req.ID)
210210
}()
211211

212212
return nil
@@ -485,7 +485,7 @@ func (m *Manager) assignReqToRunner(ctx context.Context, req PredictionRequest)
485485
// First, try to find existing runner with capacity and atomically reserve slot
486486
procRunner := m.findRunnerWithCapacity(ctx, req)
487487
if procRunner != nil {
488-
log.Debugw("allocated request to existing runner", "runner", procRunner.runnerCtx.id)
488+
log.Tracew("allocated request to existing runner", "runner", procRunner.runnerCtx.id)
489489
return procRunner, nil
490490
}
491491

@@ -824,7 +824,7 @@ func (m *Manager) Stop() error {
824824
// Wait for this runner to become idle OR timeout
825825
select {
826826
case <-runner.readyForShutdown:
827-
log.Infow("runner became idle naturally", "name", runner.runnerCtx.id)
827+
log.Debugw("runner became idle naturally", "name", runner.runnerCtx.id)
828828
case <-graceCtx.Done():
829829
log.Warnw("grace period expired for runner", "name", runner.runnerCtx.id, "context_err", graceCtx.Err())
830830
}
@@ -877,7 +877,7 @@ func (m *Manager) Status() string {
877877
runner.mu.Unlock()
878878
return status
879879
}
880-
log.Debug("default runner not found, returning STARTING")
880+
log.Trace("default runner not found, returning STARTING")
881881
return "STARTING"
882882
}
883883

@@ -1071,9 +1071,9 @@ func (m *Manager) monitorRunnerSubprocess(ctx context.Context, runnerName string
10711071
}
10721072

10731073
// Capture crash logs from runner and fail predictions one by one
1074-
log.Debugw("checking runner logs for crash", "runner_logs_count", len(runner.logs), "runner_logs", runner.logs)
1074+
log.Tracew("checking runner logs for crash", "runner_logs_count", len(runner.logs), "runner_logs", runner.logs)
10751075
crashLogs := runner.logs
1076-
log.Debugw("captured crash logs", "crash_logs_count", len(crashLogs), "crash_logs", crashLogs)
1076+
log.Tracew("captured crash logs", "crash_logs_count", len(crashLogs), "crash_logs", crashLogs)
10771077

10781078
for id, pending := range runner.pending {
10791079
log.Debugw("failing prediction due to setup failure", "prediction_id", id)

internal/runner/runner.go

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ func (r *Runner) watchPredictionResponses(ctx context.Context, predictionID stri
5454
for {
5555
select {
5656
case <-ctx.Done():
57-
log.Debugw("response watcher canceled", "prediction_id", predictionID)
57+
log.Tracew("response watcher canceled", "prediction_id", predictionID)
5858
return
5959

6060
// TODO: Add inotify case when implemented
@@ -68,7 +68,7 @@ func (r *Runner) watchPredictionResponses(ctx context.Context, predictionID stri
6868
// Drain IPC OUTPUT notifications - when inotify available, we blackhole these
6969
// When inotify unavailable, this triggers immediate processing
7070
// TODO: Only process if inotify unavailable
71-
log.Debugw("received OUTPUT IPC notification", "prediction_id", predictionID)
71+
log.Tracew("received OUTPUT IPC notification", "prediction_id", predictionID)
7272
pollTimer.Reset(100 * time.Millisecond) // Reset polling timer since we got an event
7373
if err := r.processResponseFiles(predictionID, pending, responsePattern, log); err != nil {
7474
log.Errorw("failed to process response files from IPC", "prediction_id", predictionID, "error", err)
@@ -86,7 +86,7 @@ func (r *Runner) watchPredictionResponses(ctx context.Context, predictionID stri
8686
completed := pending.response.Status.IsCompleted()
8787
pending.mu.Unlock()
8888
if completed {
89-
log.Debugw("prediction completed, watcher exiting", "prediction_id", predictionID)
89+
log.Tracew("prediction completed, watcher exiting", "prediction_id", predictionID)
9090
return
9191
}
9292
}
@@ -274,7 +274,7 @@ func (r *Runner) handleResponseWebhooksAndCompletion(response *PredictionRespons
274274
}
275275

276276
// Watcher exits - manager defer will handle webhook and cleanup
277-
log.Debugw("prediction completed, watcher exiting", "prediction_id", predictionID)
277+
log.Tracew("prediction completed, watcher exiting", "prediction_id", predictionID)
278278
return
279279
}
280280
}
@@ -356,25 +356,25 @@ func (r *Runner) WaitForStop() {
356356
func (r *Runner) GracefulShutdown() {
357357
log := r.logger.Sugar()
358358
if !r.shutdownWhenIdle.CompareAndSwap(false, true) {
359-
log.Debugw("graceful shutdown already initiated", "runner_id", r.runnerCtx.id)
359+
log.Tracew("graceful shutdown already initiated", "runner_id", r.runnerCtx.id)
360360
return
361361
}
362362

363363
r.mu.RLock()
364364
shouldSignal := (r.status == StatusReady && len(r.pending) == 0)
365365
r.mu.RUnlock()
366366

367-
log.Debugw("graceful shutdown initiated", "runner_id", r.runnerCtx.id, "status", r.status, "pending_count", len(r.pending), "should_signal", shouldSignal)
367+
log.Tracew("graceful shutdown initiated", "runner_id", r.runnerCtx.id, "status", r.status, "pending_count", len(r.pending), "should_signal", shouldSignal)
368368

369369
if shouldSignal {
370370
if r.readyForShutdown == nil {
371371
log.Warnw("readyForShutdown channel is nil, cannot signal shutdown readiness", "runner_id", r.runnerCtx.id)
372372
} else {
373373
select {
374374
case <-r.readyForShutdown:
375-
log.Debugw("readyForShutdown already closed", "runner_id", r.runnerCtx.id)
375+
log.Tracew("readyForShutdown already closed", "runner_id", r.runnerCtx.id)
376376
default:
377-
log.Debugw("closing readyForShutdown channel", "runner_id", r.runnerCtx.id)
377+
log.Tracew("closing readyForShutdown channel", "runner_id", r.runnerCtx.id)
378378
close(r.readyForShutdown)
379379
}
380380
}
@@ -407,7 +407,7 @@ func (r *Runner) Start(ctx context.Context) error {
407407
return fmt.Errorf("failed to start subprocess: %w", err)
408408
}
409409

410-
log.Debugw("runner process started successfully", "pid", cmd.Process.Pid)
410+
log.Tracew("runner process started successfully", "pid", cmd.Process.Pid)
411411

412412
return nil
413413
}
@@ -437,7 +437,7 @@ func (r *Runner) setupLogCapture() error {
437437
line := scanner.Text()
438438
r.logStdout(line)
439439
}
440-
r.logger.Debug("finished stdout log capture")
440+
r.logger.Trace("finished stdout log capture")
441441
})
442442

443443
wg.Go(func() {
@@ -446,7 +446,7 @@ func (r *Runner) setupLogCapture() error {
446446
line := scanner.Text()
447447
r.logStderr(line)
448448
}
449-
r.logger.Debug("finished stderr log capture")
449+
r.logger.Trace("finished stderr log capture")
450450
})
451451

452452
// Signal when both pipes are closed (with double-close protection)
@@ -792,15 +792,15 @@ func (r *Runner) predict(req PredictionRequest) (chan PredictionResponse, error)
792792
r.mu.Lock()
793793
defer r.mu.Unlock()
794794

795-
log.Debugw("runner.predict called", "prediction_id", req.ID, "status", r.status)
795+
log.Tracew("runner.predict called", "prediction_id", req.ID, "status", r.status)
796796

797797
// Prediction must be pre-allocated by manager
798798
pending, exists := r.pending[req.ID]
799799
if !exists {
800800
return nil, fmt.Errorf("prediction %s not allocated", req.ID)
801801
}
802802

803-
log.Debugw("prediction found in pending", "prediction_id", req.ID)
803+
log.Tracew("prediction found in pending", "prediction_id", req.ID)
804804

805805
// Process input paths (base64 and URL inputs)
806806
inputPaths := make([]string, 0)
@@ -829,13 +829,13 @@ func (r *Runner) predict(req PredictionRequest) (chan PredictionResponse, error)
829829
return nil, fmt.Errorf("failed to write request file: %w", err)
830830
}
831831

832-
log.Debugw("wrote prediction request file", "prediction_id", req.ID, "path", requestPath, "working_dir", r.runnerCtx.workingdir, "request_data", string(requestData))
832+
log.Tracew("wrote prediction request file", "prediction_id", req.ID, "path", requestPath, "working_dir", r.runnerCtx.workingdir, "request_data", string(requestData))
833833

834834
// Debug: Check if file actually exists and list directory contents
835835
if _, err := os.Stat(requestPath); err != nil {
836-
log.Debugw("ERROR: written request file does not exist", "prediction_id", req.ID, "path", requestPath, "error", err)
836+
log.Tracew("ERROR: written request file does not exist", "prediction_id", req.ID, "path", requestPath, "error", err)
837837
} else {
838-
log.Debugw("confirmed request file exists", "prediction_id", req.ID, "path", requestPath)
838+
log.Tracew("confirmed request file exists", "prediction_id", req.ID, "path", requestPath)
839839
}
840840

841841
// Debug: List all files in working directory
@@ -844,13 +844,13 @@ func (r *Runner) predict(req PredictionRequest) (chan PredictionResponse, error)
844844
for i, entry := range entries {
845845
fileNames[i] = entry.Name()
846846
}
847-
log.Debugw("working directory contents after write", "prediction_id", req.ID, "working_dir", r.runnerCtx.workingdir, "files", fileNames)
847+
log.Tracew("working directory contents after write", "prediction_id", req.ID, "working_dir", r.runnerCtx.workingdir, "files", fileNames)
848848
}
849849

850850
// Update pending prediction with request details
851851
pending.request = req
852852

853-
log.Debugw("returning prediction channel", "prediction_id", req.ID)
853+
log.Tracew("returning prediction channel", "prediction_id", req.ID)
854854
return pending.c, nil
855855
}
856856

@@ -970,21 +970,21 @@ func (r *Runner) updateSetupResult() {
970970
}
971971

972972
setupResultPath := filepath.Join(r.runnerCtx.workingdir, "setup_result.json")
973-
log.Debug("reading setup_result.json", "path", setupResultPath)
973+
log.Trace("reading setup_result.json", "path", setupResultPath)
974974

975975
// Try to read additional setup result data from file
976976
var setupResultFromFile SetupResult
977977
if err := r.readJSON(setupResultPath, &setupResultFromFile); err != nil {
978-
log.Debugw("failed to read setup_result.json, assuming success", "error", err)
978+
log.Tracew("failed to read setup_result.json, assuming success", "error", err)
979979
// If setup_result.json doesn't exist, assume setup succeeded and status is ready
980980
r.setupResult.Status = SetupSucceeded
981981
r.setupResult.Schema = "" // Will be populated by updateSchema if available
982982
r.status = StatusReady
983-
log.Debugw("setup result not found, assuming success", "status", r.status.String())
983+
log.Tracew("setup result not found, assuming success", "status", r.status.String())
984984
return
985985
}
986986

987-
log.Debugw("successfully read setup_result.json", "status", setupResultFromFile.Status, "schema_length", len(setupResultFromFile.Schema))
987+
log.Tracew("successfully read setup_result.json", "status", setupResultFromFile.Status, "schema_length", len(setupResultFromFile.Schema))
988988

989989
// Update setup result with data from file, preserving logs that were already set
990990
r.setupResult.Status = setupResultFromFile.Status

0 commit comments

Comments
 (0)