Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch the GRPC communication where Agent is running the server and the beats are connecting back to Agent #18973

Merged
merged 22 commits into from
Jun 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
*beat/logs
*beat/data
x-pack/functionbeat/pkg
x-pack/elastic-agent/data
x-pack/elastic-agent/pkg/agent/operation/tests/downloads

# Files
.DS_Store
Expand All @@ -25,6 +27,9 @@ mage_output_file.go
x-pack/functionbeat/*/fields.yml
x-pack/functionbeat/provider/*/functionbeat-*
x-pack/dockerlogbeat/temproot.tar
x-pack/elastic-agent/elastic_agent
x-pack/elastic-agent/fleet.yml
x-pack/elastic-agent/pkg/agent/operation/tests/scripts/configurable-1.0-darwin-x86_64/configurable

# Editor swap files
*.swp
Expand Down
2 changes: 1 addition & 1 deletion libbeat/cmd/instance/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,7 @@ func (b *Beat) launch(settings Settings, bt beat.Creator) error {
logp.Info("%s start running.", b.Info.Beat)

// Launch config manager
b.ConfigManager.Start()
b.ConfigManager.Start(beater.Stop)
defer b.ConfigManager.Stop()

return beater.Run(&b.Beat)
Expand Down
4 changes: 2 additions & 2 deletions libbeat/management/management.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type ConfigManager interface {
Enabled() bool

// Start the config manager
Start()
Start(func())

// Stop the config manager
Stop()
Expand Down Expand Up @@ -98,6 +98,6 @@ func nilFactory(*common.Config, *reload.Registry, uuid.UUID) (ConfigManager, err
}

func (nilManager) Enabled() bool { return false }
func (nilManager) Start() {}
func (nilManager) Start(_ func()) {}
func (nilManager) Stop() {}
func (nilManager) CheckRawConfig(cfg *common.Config) error { return nil }
1 change: 1 addition & 0 deletions x-pack/elastic-agent/CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,4 @@
- Change monitoring defaults for agent {pull}18927[18927]
- Agent verifies packages before using them {pull}18876[18876]
- Change stream.* to dataset.* fields {pull}18967[18967]
- Agent now runs the GRPC server and spawned application connect by to Agent {pull}18973[18973]
13 changes: 9 additions & 4 deletions x-pack/elastic-agent/_meta/config/common.p2.yml.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,18 @@ datasources:
# install_path: "${path.data}/install"

# process:
# # minimal port number for spawned processes
# min_port: 10000
# # maximum port number for spawned processes
# max_port: 30000
# # timeout for creating new processes. when process is not successfully created by this timeout
# # start operation is considered a failure
# spawn_timeout: 30s
# # timeout for stopping processes. when process is not stopped by this timeout then the process.
# # is force killed
# stop_timeout: 30s

# grpc:
# # listen address for the GRPC server that spawned processes connect back to.
# address: localhost
# # port for the GRPC server that spawned processes connect back to.
# port: 6789

# retry:
# # Enabled determines whether retry is possible. Default is false.
Expand Down
13 changes: 9 additions & 4 deletions x-pack/elastic-agent/_meta/config/common.reference.p2.yml.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,18 @@ datasources:
# install_path: "${path.data}/install"

# process:
# # minimal port number for spawned processes
# min_port: 10000
# # maximum port number for spawned processes
# max_port: 30000
# # timeout for creating new processes. when process is not successfully created by this timeout
# # start operation is considered a failure
# spawn_timeout: 30s
# # timeout for stopping processes. when process is not stopped by this timeout then the process.
# # is force killed
# stop_timeout: 30s

# grpc:
# # listen address for the GRPC server that spawned processes connect back to.
# address: localhost
# # port for the GRPC server that spawned processes connect back to.
# port: 6789

# retry:
# # Enabled determines whether retry is possible. Default is false.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,18 @@ datasources:
# install_path: "${path.data}/install"

# process:
# # minimal port number for spawned processes
# min_port: 10000
# # maximum port number for spawned processes
# max_port: 30000
# # timeout for creating new processes. when process is not successfully created by this timeout
# # start operation is considered a failure
# spawn_timeout: 30s
# # timeout for stopping processes. when process is not stopped by this timeout then the process.
# # is force killed
# stop_timeout: 30s

# grpc:
# # listen address for the GRPC server that spawned processes connect back to.
# address: localhost
# # port for the GRPC server that spawned processes connect back to.
# port: 6789

# retry:
# # Enabled determines whether retry is possible. Default is false.
Expand Down
13 changes: 9 additions & 4 deletions x-pack/elastic-agent/elastic-agent.docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,18 @@ datasources:
# install_path: "${path.data}/install"

# process:
# # minimal port number for spawned processes
# min_port: 10000
# # maximum port number for spawned processes
# max_port: 30000
# # timeout for creating new processes. when process is not successfully created by this timeout
# # start operation is considered a failure
# spawn_timeout: 30s
# # timeout for stopping processes. when process is not stopped by this timeout then the process.
# # is force killed
# stop_timeout: 30s

# grpc:
# # listen address for the GRPC server that spawned processes connect back to.
# address: localhost
# # port for the GRPC server that spawned processes connect back to.
# port: 6789

# retry:
# # Enabled determines whether retry is possible. Default is false.
Expand Down
13 changes: 9 additions & 4 deletions x-pack/elastic-agent/elastic-agent.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,18 @@ datasources:
# install_path: "${path.data}/install"

# process:
# # minimal port number for spawned processes
# min_port: 10000
# # maximum port number for spawned processes
# max_port: 30000
# # timeout for creating new processes. when process is not successfully created by this timeout
# # start operation is considered a failure
# spawn_timeout: 30s
# # timeout for stopping processes. when process is not stopped by this timeout then the process.
# # is force killed
# stop_timeout: 30s

# grpc:
# # listen address for the GRPC server that spawned processes connect back to.
# address: localhost
# # port for the GRPC server that spawned processes connect back to.
# port: 6789

# retry:
# # Enabled determines whether retry is possible. Default is false.
Expand Down
13 changes: 9 additions & 4 deletions x-pack/elastic-agent/elastic-agent.yml
Original file line number Diff line number Diff line change
Expand Up @@ -103,13 +103,18 @@ datasources:
# install_path: "${path.data}/install"

# process:
# # minimal port number for spawned processes
# min_port: 10000
# # maximum port number for spawned processes
# max_port: 30000
# # timeout for creating new processes. when process is not successfully created by this timeout
# # start operation is considered a failure
# spawn_timeout: 30s
# # timeout for stopping processes. when process is not stopped by this timeout then the process.
# # is force killed
# stop_timeout: 30s

# grpc:
# # listen address for the GRPC server that spawned processes connect back to.
# address: localhost
# # port for the GRPC server that spawned processes connect back to.
# port: 6789

# retry:
# # Enabled determines whether retry is possible. Default is false.
Expand Down
7 changes: 6 additions & 1 deletion x-pack/elastic-agent/magefile.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"os"
"os/exec"
"path/filepath"
"runtime"
"strings"
"time"

Expand Down Expand Up @@ -165,8 +166,12 @@ func (Build) Clean() {
func (Build) TestBinaries() error {
p := filepath.Join("pkg", "agent", "operation", "tests", "scripts")

binaryName := "configurable"
if runtime.GOOS == "windows" {
binaryName += ".exe"
}
return combineErr(
RunGo("build", "-o", filepath.Join(p, "configurable-1.0-darwin-x86", "configurable"), filepath.Join(p, "configurable-1.0-darwin-x86", "main.go")),
RunGo("build", "-o", filepath.Join(p, "configurable-1.0-darwin-x86_64", binaryName), filepath.Join(p, "configurable-1.0-darwin-x86_64", "main.go")),
)
}

Expand Down
13 changes: 12 additions & 1 deletion x-pack/elastic-agent/pkg/agent/application/local_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ import (
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/app"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/app/monitoring"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/server"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/dir"
reporting "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/reporter"
logreporter "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/reporter/log"
Expand All @@ -39,6 +41,7 @@ type Local struct {
log *logger.Logger
source source
agentInfo *info.AgentInfo
srv *server.Server
}

type source interface {
Expand Down Expand Up @@ -78,6 +81,10 @@ func newLocal(
}

localApplication.bgContext, localApplication.cancelCtxFn = context.WithCancel(ctx)
localApplication.srv, err = server.NewFromConfig(log, rawConfig, &app.ApplicationStatusHandler{})
if err != nil {
return nil, errors.New(err, "initialize GRPC listener")
}

reporter := reporting.NewReporter(localApplication.bgContext, log, localApplication.agentInfo, logR)

Expand All @@ -86,7 +93,7 @@ func newLocal(
return nil, errors.New(err, "failed to initialize monitoring")
}

router, err := newRouter(log, streamFactory(localApplication.bgContext, rawConfig, nil, reporter, monitor))
router, err := newRouter(log, streamFactory(localApplication.bgContext, rawConfig, localApplication.srv, reporter, monitor))
if err != nil {
return nil, errors.New(err, "fail to initialize pipeline router")
}
Expand All @@ -113,6 +120,9 @@ func (l *Local) Start() error {
l.log.Info("Agent is starting")
defer l.log.Info("Agent is stopped")

if err := l.srv.Start(); err != nil {
return err
}
if err := l.source.Start(); err != nil {
return err
}
Expand All @@ -123,6 +133,7 @@ func (l *Local) Start() error {
// Stop stops a local agent.
func (l *Local) Stop() error {
l.cancelCtxFn()
l.srv.Stop()
return l.source.Stop()
}

Expand Down
14 changes: 13 additions & 1 deletion x-pack/elastic-agent/pkg/agent/application/managed_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,16 @@ import (
"net/http"
"net/url"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/app"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/filters"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/info"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/storage"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/app/monitoring"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/server"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/fleetapi"
reporting "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/reporter"
fleetreporter "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/reporter/fleet"
Expand All @@ -44,6 +47,7 @@ type Managed struct {
api apiClient
agentInfo *info.AgentInfo
gateway *fleetGateway
srv *server.Server
}

func newManaged(
Expand Down Expand Up @@ -110,6 +114,10 @@ func newManaged(
}

managedApplication.bgContext, managedApplication.cancelCtxFn = context.WithCancel(ctx)
managedApplication.srv, err = server.NewFromConfig(log, rawConfig, &app.ApplicationStatusHandler{})
if err != nil {
return nil, errors.New(err, "initialize GRPC listener")
}

logR := logreporter.NewReporter(log, cfg.Reporting.Log)
fleetR, err := fleetreporter.NewReporter(agentInfo, log, cfg.Reporting.Fleet)
Expand All @@ -123,7 +131,7 @@ func newManaged(
return nil, errors.New(err, "failed to initialize monitoring")
}

router, err := newRouter(log, streamFactory(managedApplication.bgContext, rawConfig, client, combinedReporter, monitor))
router, err := newRouter(log, streamFactory(managedApplication.bgContext, rawConfig, managedApplication.srv, combinedReporter, monitor))
if err != nil {
return nil, errors.New(err, "fail to initialize pipeline router")
}
Expand Down Expand Up @@ -200,6 +208,9 @@ func newManaged(
// Start starts a managed elastic-agent.
func (m *Managed) Start() error {
m.log.Info("Agent is starting")
if err := m.srv.Start(); err != nil {
return err
}
m.gateway.Start()
return nil
}
Expand All @@ -208,6 +219,7 @@ func (m *Managed) Start() error {
func (m *Managed) Stop() error {
defer m.log.Info("Agent is stopped")
m.cancelCtxFn()
m.srv.Stop()
return nil
}

Expand Down
8 changes: 5 additions & 3 deletions x-pack/elastic-agent/pkg/agent/application/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/app/monitoring"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/server"
)

// EventProcessor is an processor of application event
Expand Down Expand Up @@ -56,10 +57,10 @@ func (b *operatorStream) Execute(cfg *configRequest) error {
return b.configHandler.HandleConfig(cfg)
}

func streamFactory(ctx context.Context, cfg *config.Config, client sender, r reporter, m monitoring.Monitor) func(*logger.Logger, routingKey) (stream, error) {
func streamFactory(ctx context.Context, cfg *config.Config, srv *server.Server, r reporter, m monitoring.Monitor) func(*logger.Logger, routingKey) (stream, error) {
return func(log *logger.Logger, id routingKey) (stream, error) {
// new operator per stream to isolate processes without using tags
operator, err := newOperator(ctx, log, id, cfg, r, m)
operator, err := newOperator(ctx, log, id, cfg, srv, r, m)
if err != nil {
return nil, err
}
Expand All @@ -71,7 +72,7 @@ func streamFactory(ctx context.Context, cfg *config.Config, client sender, r rep
}
}

func newOperator(ctx context.Context, log *logger.Logger, id routingKey, config *config.Config, r reporter, m monitoring.Monitor) (*operation.Operator, error) {
func newOperator(ctx context.Context, log *logger.Logger, id routingKey, config *config.Config, srv *server.Server, r reporter, m monitoring.Monitor) (*operation.Operator, error) {
operatorConfig := operatorCfg.DefaultConfig()
if err := config.Unpack(&operatorConfig); err != nil {
return nil, err
Expand Down Expand Up @@ -102,6 +103,7 @@ func newOperator(ctx context.Context, log *logger.Logger, id routingKey, config
verifier,
installer,
stateResolver,
srv,
r,
m,
)
Expand Down
Loading