Skip to content

Commit 815382d

Browse files
Fix: Endpoint collision between monitoring and regular beats (#1034)
Fix: Endpoint collision between monitoring and regular beats (#1034)
1 parent 7d21718 commit 815382d

File tree

10 files changed

+215
-70
lines changed

10 files changed

+215
-70
lines changed

internal/pkg/agent/control/server/server.go

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ import (
2626
"github.com/elastic/elastic-agent/internal/pkg/agent/control/proto"
2727
"github.com/elastic/elastic-agent/internal/pkg/agent/errors"
2828
"github.com/elastic/elastic-agent/internal/pkg/agent/program"
29-
"github.com/elastic/elastic-agent/internal/pkg/core/monitoring/beats"
3029
monitoring "github.com/elastic/elastic-agent/internal/pkg/core/monitoring/beats"
3130
monitoringCfg "github.com/elastic/elastic-agent/internal/pkg/core/monitoring/config"
3231
"github.com/elastic/elastic-agent/internal/pkg/core/socket"
@@ -37,6 +36,10 @@ import (
3736
"github.com/elastic/elastic-agent/pkg/core/logger"
3837
)
3938

39+
const (
40+
agentName = "elastic-agent"
41+
)
42+
4043
// Server is the daemon side of the control protocol.
4144
type Server struct {
4245
logger *logger.Logger
@@ -225,7 +228,8 @@ func (s *Server) ProcMeta(ctx context.Context, _ *proto.Empty) (*proto.ProcMetaR
225228
// gather spec data for all rk/apps running
226229
specs := s.getSpecInfo("", "")
227230
for _, si := range specs {
228-
endpoint := monitoring.MonitoringEndpoint(si.spec, runtime.GOOS, si.rk)
231+
isSidecar := strings.HasSuffix(si.app, "_monitoring")
232+
endpoint := monitoring.MonitoringEndpoint(si.spec, runtime.GOOS, si.rk, isSidecar)
229233
client := newSocketRequester(si.app, si.rk, endpoint)
230234

231235
procMeta := client.procMeta(ctx)
@@ -258,9 +262,9 @@ func (s *Server) Pprof(ctx context.Context, req *proto.PprofRequest) (*proto.Ppr
258262
ch := make(chan *proto.PprofResult, 1)
259263

260264
// retrieve elastic-agent pprof data if requested or application is unspecified.
261-
if req.AppName == "" || req.AppName == "elastic-agent" {
262-
endpoint := beats.AgentMonitoringEndpoint(runtime.GOOS, s.monitoringCfg.HTTP)
263-
c := newSocketRequester("elastic-agent", "", endpoint)
265+
if req.AppName == "" || req.AppName == agentName {
266+
endpoint := monitoring.AgentMonitoringEndpoint(runtime.GOOS, s.monitoringCfg.HTTP)
267+
c := newSocketRequester(agentName, "", endpoint)
264268
for _, opt := range req.PprofType {
265269
wg.Add(1)
266270
go func(opt proto.PprofOption) {
@@ -273,11 +277,11 @@ func (s *Server) Pprof(ctx context.Context, req *proto.PprofRequest) (*proto.Ppr
273277

274278
// get requested rk/appname spec or all specs
275279
var specs []specInfo
276-
if req.AppName != "elastic-agent" {
280+
if req.AppName != agentName {
277281
specs = s.getSpecInfo(req.RouteKey, req.AppName)
278282
}
279283
for _, si := range specs {
280-
endpoint := monitoring.MonitoringEndpoint(si.spec, runtime.GOOS, si.rk)
284+
endpoint := monitoring.MonitoringEndpoint(si.spec, runtime.GOOS, si.rk, false)
281285
c := newSocketRequester(si.app, si.rk, endpoint)
282286
// Launch a concurrent goroutine to gather all pprof endpoints from a socket.
283287
for _, opt := range req.PprofType {
@@ -315,8 +319,8 @@ func (s *Server) ProcMetrics(ctx context.Context, _ *proto.Empty) (*proto.ProcMe
315319
}
316320

317321
// gather metrics buffer data from the elastic-agent
318-
endpoint := beats.AgentMonitoringEndpoint(runtime.GOOS, s.monitoringCfg.HTTP)
319-
c := newSocketRequester("elastic-agent", "", endpoint)
322+
endpoint := monitoring.AgentMonitoringEndpoint(runtime.GOOS, s.monitoringCfg.HTTP)
323+
c := newSocketRequester(agentName, "", endpoint)
320324
metrics := c.procMetrics(ctx)
321325

322326
resp := &proto.ProcMetricsResponse{
@@ -326,7 +330,8 @@ func (s *Server) ProcMetrics(ctx context.Context, _ *proto.Empty) (*proto.ProcMe
326330
// gather metrics buffer data from all other processes
327331
specs := s.getSpecInfo("", "")
328332
for _, si := range specs {
329-
endpoint := monitoring.MonitoringEndpoint(si.spec, runtime.GOOS, si.rk)
333+
isSidecar := strings.HasSuffix(si.app, "_monitoring")
334+
endpoint := monitoring.MonitoringEndpoint(si.spec, runtime.GOOS, si.rk, isSidecar)
330335
client := newSocketRequester(si.app, si.rk, endpoint)
331336

332337
s.logger.Infof("gather metrics from %s", endpoint)

internal/pkg/agent/operation/monitoring_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,7 @@ type testMonitor struct {
212212

213213
// EnrichArgs enriches arguments provided to application, in order to enable
214214
// monitoring
215-
func (b *testMonitor) EnrichArgs(_ program.Spec, _ string, args []string, _ bool) []string {
215+
func (b *testMonitor) EnrichArgs(_ program.Spec, _ string, args []string) []string {
216216
return args
217217
}
218218

internal/pkg/agent/operation/operator.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import (
2727
"github.com/elastic/elastic-agent/internal/pkg/config"
2828
"github.com/elastic/elastic-agent/internal/pkg/core/app"
2929
"github.com/elastic/elastic-agent/internal/pkg/core/monitoring"
30-
"github.com/elastic/elastic-agent/internal/pkg/core/monitoring/noop"
30+
"github.com/elastic/elastic-agent/internal/pkg/core/monitoring/beats"
3131
"github.com/elastic/elastic-agent/internal/pkg/core/plugin/process"
3232
"github.com/elastic/elastic-agent/internal/pkg/core/plugin/service"
3333
"github.com/elastic/elastic-agent/internal/pkg/core/state"
@@ -387,7 +387,7 @@ func (o *Operator) getApp(p Descriptor) (Application, error) {
387387
appName := p.BinaryName()
388388
if app.IsSidecar(p) {
389389
// make watchers unmonitorable
390-
monitor = noop.NewMonitor()
390+
monitor = beats.NewSidecarMonitor(o.config.DownloadConfig, o.config.MonitoringConfig)
391391
appName += "_monitoring"
392392
}
393393

internal/pkg/core/monitoring/beats/beats_monitor.go

Lines changed: 16 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
package beats
66

77
import (
8-
"fmt"
98
"net/url"
109
"os"
1110
"path/filepath"
@@ -20,8 +19,13 @@ import (
2019
monitoringConfig "github.com/elastic/elastic-agent/internal/pkg/core/monitoring/config"
2120
)
2221

23-
const httpPlusPrefix = "http+"
24-
const defaultMonitoringNamespace = "default"
22+
const (
23+
httpPlusPrefix = "http+"
24+
defaultMonitoringNamespace = "default"
25+
fileSchemePrefix = "file"
26+
unixSchemePrefix = "unix"
27+
windowsOS = "windows"
28+
)
2529

2630
// Monitor implements the monitoring.Monitor interface providing information
2731
// about beats.
@@ -99,15 +103,11 @@ func (b *Monitor) WatchLogs() bool { return b.config.Enabled && b.config.Monitor
99103
func (b *Monitor) WatchMetrics() bool { return b.config.Enabled && b.config.MonitorMetrics }
100104

101105
func (b *Monitor) generateMonitoringEndpoint(spec program.Spec, pipelineID string) string {
102-
return MonitoringEndpoint(spec, b.operatingSystem, pipelineID)
103-
}
104-
105-
func (b *Monitor) generateLoggingFile(spec program.Spec, pipelineID string) string {
106-
return getLoggingFile(spec, b.operatingSystem, b.installPath, pipelineID)
106+
return MonitoringEndpoint(spec, b.operatingSystem, pipelineID, false)
107107
}
108108

109109
func (b *Monitor) generateLoggingPath(spec program.Spec, pipelineID string) string {
110-
return filepath.Dir(b.generateLoggingFile(spec, pipelineID))
110+
return filepath.Dir(getLoggingFile(spec, b.operatingSystem, pipelineID))
111111
}
112112

113113
func (b *Monitor) ownLoggingPath(spec program.Spec) bool {
@@ -118,15 +118,10 @@ func (b *Monitor) ownLoggingPath(spec program.Spec) bool {
118118

119119
// EnrichArgs enriches arguments provided to application, in order to enable
120120
// monitoring
121-
func (b *Monitor) EnrichArgs(spec program.Spec, pipelineID string, args []string, isSidecar bool) []string {
121+
func (b *Monitor) EnrichArgs(spec program.Spec, pipelineID string, args []string) []string {
122122
appendix := make([]string, 0, 7)
123123

124-
monitoringEndpoint := b.generateMonitoringEndpoint(spec, pipelineID)
125-
if monitoringEndpoint != "" {
126-
endpoint := monitoringEndpoint
127-
if isSidecar {
128-
endpoint += "_monitor"
129-
}
124+
if endpoint := b.generateMonitoringEndpoint(spec, pipelineID); endpoint != "" {
130125
appendix = append(appendix,
131126
"-E", "http.enabled=true",
132127
"-E", "http.host="+endpoint,
@@ -146,10 +141,6 @@ func (b *Monitor) EnrichArgs(spec program.Spec, pipelineID string, args []string
146141
loggingPath := b.generateLoggingPath(spec, pipelineID)
147142
if loggingPath != "" {
148143
logFile := spec.Cmd
149-
if isSidecar {
150-
logFile += "_monitor"
151-
}
152-
logFile = fmt.Sprintf("%s", logFile)
153144
appendix = append(appendix,
154145
"-E", "logging.files.path="+loggingPath,
155146
"-E", "logging.files.name="+logFile,
@@ -224,7 +215,7 @@ func (b *Monitor) LogPath(spec program.Spec, pipelineID string) string {
224215
return ""
225216
}
226217

227-
return b.generateLoggingFile(spec, pipelineID)
218+
return getLoggingFile(spec, b.operatingSystem, pipelineID)
228219
}
229220

230221
// MetricsPath describes a location where application exposes metrics
@@ -272,15 +263,15 @@ func monitoringDrop(path string) (drop string) {
272263
}
273264

274265
u, _ := url.Parse(path)
275-
if u == nil || (u.Scheme != "" && u.Scheme != "file" && u.Scheme != "unix") {
266+
if u == nil || (u.Scheme != "" && u.Scheme != fileSchemePrefix && u.Scheme != unixSchemePrefix) {
276267
return ""
277268
}
278269

279-
if u.Scheme == "file" {
270+
if u.Scheme == fileSchemePrefix {
280271
return strings.TrimPrefix(path, "file://")
281272
}
282273

283-
if u.Scheme == "unix" {
274+
if u.Scheme == unixSchemePrefix {
284275
return strings.TrimPrefix(path, "unix://")
285276
}
286277

@@ -299,7 +290,7 @@ func isWindowsPath(path string) bool {
299290
}
300291

301292
func changeOwner(path string, uid, gid int) error {
302-
if runtime.GOOS == "windows" {
293+
if runtime.GOOS == windowsOS {
303294
// on windows it always returns the syscall.EWINDOWS error, wrapped in *PathError
304295
return nil
305296
}

internal/pkg/core/monitoring/beats/monitoring.go

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,31 +27,39 @@ const (
2727
agentMbEndpointFileFormatWin = `npipe:///elastic-agent`
2828
// agentMbEndpointHTTP is used with cloud and exposes metrics on http endpoint
2929
agentMbEndpointHTTP = "http://%s:%d"
30+
31+
monitorSuffix = "_monitor"
3032
)
3133

3234
// MonitoringEndpoint is an endpoint where process is exposing its metrics.
33-
func MonitoringEndpoint(spec program.Spec, operatingSystem, pipelineID string) string {
35+
func MonitoringEndpoint(spec program.Spec, operatingSystem, pipelineID string, isSidecar bool) (endpointPath string) {
36+
defer func() {
37+
if isSidecar && endpointPath != "" {
38+
endpointPath += monitorSuffix
39+
}
40+
}()
41+
3442
if endpoint, ok := spec.MetricEndpoints[operatingSystem]; ok {
3543
return endpoint
3644
}
37-
if operatingSystem == "windows" {
45+
if operatingSystem == windowsOS {
3846
return fmt.Sprintf(mbEndpointFileFormatWin, pipelineID, spec.Cmd)
3947
}
4048
// unix socket path must be less than 104 characters
4149
path := fmt.Sprintf("unix://%s.sock", filepath.Join(paths.TempDir(), pipelineID, spec.Cmd, spec.Cmd))
42-
if len(path) < 104 {
50+
if (isSidecar && len(path) < 104-len(monitorSuffix)) || (!isSidecar && len(path) < 104) {
4351
return path
4452
}
4553
// place in global /tmp (or /var/tmp on Darwin) to ensure that its small enough to fit; current path is way to long
4654
// for it to be used, but needs to be unique per Agent (in the case that multiple are running)
4755
return fmt.Sprintf(`unix:///tmp/elastic-agent/%x.sock`, sha256.Sum256([]byte(path)))
4856
}
4957

50-
func getLoggingFile(spec program.Spec, operatingSystem, installPath, pipelineID string) string {
58+
func getLoggingFile(spec program.Spec, operatingSystem, pipelineID string) string {
5159
if path, ok := spec.LogPaths[operatingSystem]; ok {
5260
return path
5361
}
54-
if operatingSystem == "windows" {
62+
if operatingSystem == windowsOS {
5563
return fmt.Sprintf(logFileFormatWin, paths.Home(), pipelineID, spec.Cmd)
5664
}
5765
return fmt.Sprintf(logFileFormat, paths.Home(), pipelineID, spec.Cmd)
@@ -63,7 +71,7 @@ func AgentMonitoringEndpoint(operatingSystem string, cfg *monitoringConfig.Monit
6371
return fmt.Sprintf(agentMbEndpointHTTP, cfg.Host, cfg.Port)
6472
}
6573

66-
if operatingSystem == "windows" {
74+
if operatingSystem == windowsOS {
6775
return agentMbEndpointFileFormatWin
6876
}
6977
// unix socket path must be less than 104 characters

0 commit comments

Comments
 (0)