Skip to content

Commit

Permalink
[cmd/opampsupervisor] Add passthrough logging for collector (open-tel…
Browse files Browse the repository at this point in the history
…emetry#35474)

**Description:** <Describe what has changed.>
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->

Allow collector logs to passthrough to stdout instead of strictly being
sent to a file.

If configured to do so, the supervisor will capture collector output and
log it using it's logger. This way, the supervisor should be configured
to log to stdout if running in a containerized env.

This PR follows closely with this
[PR](open-telemetry#35468).
Right now the supervisor exclusively logs to stdout, but under the
assumption that it can be configured to log elsewhere, this change uses
the supervisor logger rather than setting the collector's `exec.Cmd` to
log to stdout and stderr.

**Link to tracking Issue:** Closes open-telemetry#35473

**Testing:** <Describe what testing was performed and which tests were
added.>
I built a docker image of the supervisor and collector and ran it using
the `containerized` param. This is a sample of what the output looked
like:

```
2024-09-30T12:42:47.472Z	DEBUG	commander/commander.go:73	Starting agent	{"agent": "/collector/observiq-otel-collector"}
2024-09-30T12:42:47.472Z	DEBUG	commander/commander.go:163	Agent process started	{"pid": 11}
2024-09-30T12:42:47.525Z	INFO	collector	commander/commander.go:156	2024-09-30T12:42:47.525Z	info	service@v0.108.1/service.go:178	Setting up own telemetry...
2024-09-30T12:42:47.525Z	INFO	collector	commander/commander.go:156	2024-09-30T12:42:47.525Z	info	service@v0.108.1/telemetry.go:98	Serving metrics	{"address": ":8888", "metrics level": "Normal"}
2024-09-30T12:42:47.525Z	INFO	collector	commander/commander.go:156	2024-09-30T12:42:47.525Z	info	service@v0.108.1/service.go:263	Starting observiq-otel-collector...	{"Version": "v2.0.0", "NumCPU": 12}
2024-09-30T12:42:47.525Z	INFO	collector	commander/commander.go:156	2024-09-30T12:42:47.525Z	info	extensions/extensions.go:38	Starting extensions...
2024-09-30T12:42:47.525Z	INFO	collector	commander/commander.go:156	2024-09-30T12:42:47.525Z	info	extensions/extensions.go:41	Extension is starting...	{"kind": "extension", "name": "opamp"}
2024-09-30T12:42:47.525Z	INFO	collector	commander/commander.go:156	2024-09-30T12:42:47.525Z	info	extensions/extensions.go:58	Extension started.	{"kind": "extension", "name": "opamp"}
2024-09-30T12:42:47.526Z	INFO	collector	commander/commander.go:156	2024-09-30T12:42:47.526Z	info	service@v0.108.1/service.go:289	Everything is ready. Begin running and processing data.
2024-09-30T12:42:47.526Z	INFO	collector	commander/commander.go:156	2024-09-30T12:42:47.526Z	info	localhostgate/featuregate.go:63	The default endpoints for all servers in components have changed to use localhost instead of 0.0.0.0. Disable the feature gate to temporarily revert to the previous default.	{"feature gate ID": "component.UseLocalHostAsDefaultHost"}
2024-09-30T12:42:47.528Z	DEBUG	commander/commander.go:220	Stopping agent process	{"pid": 11}
```

**Documentation:** <Describe the documentation added.>
  • Loading branch information
dpaasman00 authored Oct 4, 2024
1 parent 47cd38d commit 6fb3266
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 22 deletions.
27 changes: 27 additions & 0 deletions .chloggen/supervisor-passthrough-collector-logging.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: opampsupervisor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Allow collector logs to passthrough to supervisor output to facilitate running in a containerized environment.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [35473]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
20 changes: 11 additions & 9 deletions cmd/opampsupervisor/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1359,10 +1359,12 @@ func TestSupervisorStopsAgentProcessWithEmptyConfigMap(t *testing.T) {
}

type LogEntry struct {
Level string `json:"level"`
Level string `json:"level"`
Logger string `json:"logger"`
}

func TestSupervisorInfoLoggingLevel(t *testing.T) {
func TestSupervisorLogging(t *testing.T) {
// Tests that supervisor only logs at Info level and above && that collector logs passthrough and are present in supervisor log file
if runtime.GOOS == "windows" {
t.Skip("Zap does not close the log file and Windows disallows removing files that are still opened.")
}
Expand Down Expand Up @@ -1420,12 +1422,8 @@ func TestSupervisorInfoLoggingLevel(t *testing.T) {
require.NoError(t, err)

scanner := bufio.NewScanner(logFile)
check := false
seenCollectorLog := false
for scanner.Scan() {
if !check {
check = true
}

line := scanner.Bytes()
var log LogEntry
err := json.Unmarshal(line, &log)
Expand All @@ -1434,9 +1432,13 @@ func TestSupervisorInfoLoggingLevel(t *testing.T) {
level, err := zapcore.ParseLevel(log.Level)
require.NoError(t, err)
require.GreaterOrEqual(t, level, zapcore.InfoLevel)

if log.Logger == "collector" {
seenCollectorLog = true
}
}
// verify at least 1 log was read
require.True(t, check)
// verify a collector log was read
require.True(t, seenCollectorLog)
require.NoError(t, logFile.Close())
}

Expand Down
83 changes: 70 additions & 13 deletions cmd/opampsupervisor/supervisor/commander/commander.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package commander

import (
"bufio"
"context"
"errors"
"fmt"
Expand Down Expand Up @@ -71,45 +72,101 @@ func (c *Commander) Start(ctx context.Context) error {

c.logger.Debug("Starting agent", zap.String("agent", c.cfg.Executable))

c.cmd = exec.CommandContext(ctx, c.cfg.Executable, c.args...) // #nosec G204
c.cmd.SysProcAttr = sysProcAttrs()

// PassthroughLogging changes how collector start up happens
if c.cfg.PassthroughLogs {
return c.startWithPassthroughLogging()
}
return c.startNormal()
}

func (c *Commander) Restart(ctx context.Context) error {
c.logger.Debug("Restarting agent", zap.String("agent", c.cfg.Executable))
if err := c.Stop(ctx); err != nil {
return err
}

return c.Start(ctx)
}

func (c *Commander) startNormal() error {
logFilePath := filepath.Join(c.logsDir, "agent.log")
stdoutFile, err := os.Create(logFilePath)
if err != nil {
return fmt.Errorf("cannot create %s: %w", logFilePath, err)
}

c.cmd = exec.CommandContext(ctx, c.cfg.Executable, c.args...) // #nosec G204
c.cmd.SysProcAttr = sysProcAttrs()

// Capture standard output and standard error.
// https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/21072
c.cmd.Stdout = stdoutFile
c.cmd.Stderr = stdoutFile

if err := c.cmd.Start(); err != nil {
stdoutFile.Close()
return err
return fmt.Errorf("startNormal: %w", err)
}

c.logger.Debug("Agent process started", zap.Int("pid", c.cmd.Process.Pid))
c.running.Store(1)

go c.watch(stdoutFile)
go func() {
defer stdoutFile.Close()
c.watch()
}()

return nil
}

func (c *Commander) Restart(ctx context.Context) error {
c.logger.Debug("Restarting agent", zap.String("agent", c.cfg.Executable))
if err := c.Stop(ctx); err != nil {
return err
func (c *Commander) startWithPassthroughLogging() error {
// grab cmd pipes
stdoutPipe, err := c.cmd.StdoutPipe()
if err != nil {
return fmt.Errorf("stdoutPipe: %w", err)
}
stderrPipe, err := c.cmd.StderrPipe()
if err != nil {
return fmt.Errorf("stderrPipe: %w", err)
}

return c.Start(ctx)
}
// start agent
if err := c.cmd.Start(); err != nil {
return fmt.Errorf("start: %w", err)
}
c.running.Store(1)

func (c *Commander) watch(stdoutFile *os.File) {
defer stdoutFile.Close()
colLogger := c.logger.Named("collector")

// capture agent output
go func() {
scanner := bufio.NewScanner(stdoutPipe)
for scanner.Scan() {
line := scanner.Text()
colLogger.Info(line)
}
if err := scanner.Err(); err != nil {
c.logger.Error("Error reading agent stdout: %w", zap.Error(err))
}
}()
go func() {
scanner := bufio.NewScanner(stderrPipe)
for scanner.Scan() {
line := scanner.Text()
colLogger.Info(line)
}
if err := scanner.Err(); err != nil {
c.logger.Error("Error reading agent stderr: %w", zap.Error(err))
}
}()

c.logger.Debug("Agent process started", zap.Int("pid", c.cmd.Process.Pid))

go c.watch()
return nil
}

func (c *Commander) watch() {
err := c.cmd.Wait()

// cmd.Wait returns an exec.ExitError when the Collector exits unsuccessfully or stops
Expand Down
2 changes: 2 additions & 0 deletions cmd/opampsupervisor/supervisor/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ type Agent struct {
Description AgentDescription `mapstructure:"description"`
BootstrapTimeout time.Duration `mapstructure:"bootstrap_timeout"`
HealthCheckPort int `mapstructure:"health_check_port"`
PassthroughLogs bool `mapstructure:"passthrough_logs"`
}

func (a Agent) Validate() error {
Expand Down Expand Up @@ -229,6 +230,7 @@ func DefaultSupervisor() Supervisor {
Agent: Agent{
OrphanDetectionInterval: 5 * time.Second,
BootstrapTimeout: 3 * time.Second,
PassthroughLogs: false,
},
Telemetry: Telemetry{
Logs: Logs{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ storage:

agent:
executable: ../../bin/otelcontribcol_{{.goos}}_{{.goarch}}{{.extension}}
passthrough_logs: true

telemetry:
logs:
Expand Down

0 comments on commit 6fb3266

Please sign in to comment.