Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix / clarify capabilities API #2952

Merged
merged 23 commits into from
Jun 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal/pkg/agent/application/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func New(
}
log.With("inputs", specs.Inputs()).Info("Detected available inputs and outputs")

caps, err := capabilities.Load(paths.AgentCapabilitiesPath(), log)
caps, err := capabilities.LoadFile(paths.AgentCapabilitiesPath(), log)
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to determine capabilities: %w", err)
}
Expand Down
48 changes: 29 additions & 19 deletions internal/pkg/agent/application/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ type Coordinator struct {
configMgr ConfigManager
varsMgr VarsManager

caps capabilities.Capability
caps capabilities.Capabilities
modifiers []ComponentsModifier

// The current state of the Coordinator. This value and its subfields are
Expand Down Expand Up @@ -259,7 +259,7 @@ type managerChans struct {
var ErrFatalCoordinator = errors.New("fatal error in coordinator")

// New creates a new coordinator.
func New(logger *logger.Logger, cfg *configuration.Configuration, logLevel logp.Level, agentInfo *info.AgentInfo, specs component.RuntimeSpecs, reexecMgr ReExecManager, upgradeMgr UpgradeManager, runtimeMgr RuntimeManager, configMgr ConfigManager, varsMgr VarsManager, caps capabilities.Capability, monitorMgr MonitorManager, isManaged bool, modifiers ...ComponentsModifier) *Coordinator {
func New(logger *logger.Logger, cfg *configuration.Configuration, logLevel logp.Level, agentInfo *info.AgentInfo, specs component.RuntimeSpecs, reexecMgr ReExecManager, upgradeMgr UpgradeManager, runtimeMgr RuntimeManager, configMgr ConfigManager, varsMgr VarsManager, caps capabilities.Capabilities, monitorMgr MonitorManager, isManaged bool, modifiers ...ComponentsModifier) *Coordinator {
var fleetState cproto.State
var fleetMessage string
if !isManaged {
Expand Down Expand Up @@ -402,10 +402,7 @@ func (c *Coordinator) Upgrade(ctx context.Context, version string, sourceURI str

// early check capabilities to ensure this upgrade actions is allowed
if c.caps != nil {
if _, err := c.caps.Apply(map[string]interface{}{
"version": version,
"sourceURI": sourceURI,
}); errors.Is(err, capabilities.ErrBlocked) {
if !c.caps.AllowUpgrade(version, sourceURI) {
return ErrNotUpgradable
}
}
Expand Down Expand Up @@ -925,19 +922,6 @@ func (c *Coordinator) processConfig(ctx context.Context, cfg *config.Config) (er
return fmt.Errorf("could not create the AST from the configuration: %w", err)
}

if c.caps != nil {
var ok bool
updatedAst, err := c.caps.Apply(rawAst)
if err != nil {
return fmt.Errorf("failed to apply capabilities: %w", err)
}

rawAst, ok = updatedAst.(*transpiler.AST)
if !ok {
return fmt.Errorf("failed to transform object returned from capabilities to AST: %w", err)
}
}

if err := features.Apply(cfg); err != nil {
return fmt.Errorf("could not update feature flags config: %w", err)
}
Expand Down Expand Up @@ -1065,6 +1049,9 @@ func (c *Coordinator) recomputeConfigAndComponents() error {
return fmt.Errorf("failed to render components: %w", err)
}

// Filter any disallowed inputs/outputs from the components
comps = c.filterByCapabilities(comps)

for _, modifier := range c.modifiers {
comps, err = modifier(comps, cfg)
if err != nil {
Expand All @@ -1079,6 +1066,29 @@ func (c *Coordinator) recomputeConfigAndComponents() error {
return nil
}

// Filter any inputs and outputs in the generated component model
// based on whether they're excluded by the capabilities config
func (c *Coordinator) filterByCapabilities(comps []component.Component) []component.Component {
if c.caps == nil {
// No active filters, return unchanged
return comps
}
result := []component.Component{}
for _, component := range comps {
// If this is an input component (not a shipper), make sure its type is allowed
if component.InputSpec != nil && !c.caps.AllowInput(component.InputType) {
c.logger.Info("Component %q with input type %q filtered by capabilities.yml", component.InputType)
continue
}
if !c.caps.AllowOutput(component.OutputType) {
c.logger.Info("Component %q with output type %q filtered by capabilities.yml", component.ID, component.OutputType)
continue
}
result = append(result, component)
}
return result
}

// handleCoordinatorDone is called when the Coordinator's context is
// finished. It waits for the runtime, config, and vars managers to finish,
// and collects their return values into an error if any of them returned
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,7 @@ func createCoordinator(t *testing.T, opts ...CoordinatorOpt) (*Coordinator, *fak
rm, err := runtime.NewManager(l, l, "localhost:0", ai, apmtest.DiscardTracer, monitoringMgr, configuration.DefaultGRPCConfig())
require.NoError(t, err)

caps, err := capabilities.Load(paths.AgentCapabilitiesPath(), l)
caps, err := capabilities.LoadFile(paths.AgentCapabilitiesPath(), l)
require.NoError(t, err)

cfgMgr := newFakeConfigManager()
Expand Down
19 changes: 15 additions & 4 deletions internal/pkg/agent/application/coordinator/diagnostics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,9 @@ func TestDiagnosticComponentsExpected(t *testing.T) {
// reported by the components-expected diagnostic
components := []component.Component{
{
ID: "filestream-component",
ID: "filestream-component",
InputType: "filestream",
OutputType: "elasticsearch",
InputSpec: &component.InputRuntimeSpec{
InputType: "filestream",
BinaryName: "filestream-binary",
Expand All @@ -167,7 +169,8 @@ func TestDiagnosticComponentsExpected(t *testing.T) {
},
},
{
ID: "shipper-component",
ID: "shipper-component",
OutputType: "elasticsearch",
ShipperSpec: &component.ShipperRuntimeSpec{
ShipperType: "shipper",
BinaryName: "shipper-binary",
Expand All @@ -187,6 +190,8 @@ func TestDiagnosticComponentsExpected(t *testing.T) {
expected := `
components:
- id: filestream-component
input_type: filestream
output_type: elasticsearch
input_spec:
binary_name: filestream-binary
binary_path: filestream-path
Expand All @@ -203,6 +208,8 @@ components:
log_level: 2
type: 1
- id: shipper-component
input_type: ""
output_type: elasticsearch
shipper_spec:
binary_name: shipper-binary
binary_path: shipper-path
Expand Down Expand Up @@ -237,8 +244,10 @@ func TestDiagnosticComponentsActual(t *testing.T) {
Components: []runtime.ComponentComponentState{
{
Component: component.Component{
ID: "component-1",
Err: errors.New("component error"),
ID: "component-1",
Err: errors.New("component error"),
InputType: "test-input",
OutputType: "test-output",
Units: []component.Unit{
{
ID: "test-unit",
Expand Down Expand Up @@ -268,6 +277,8 @@ func TestDiagnosticComponentsActual(t *testing.T) {
components:
- id: component-1
error: {}
input_type: "test-input"
output_type: "test-output"
units:
- id: test-unit
error: {}
Expand Down
117 changes: 57 additions & 60 deletions internal/pkg/agent/cmd/inspect.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func inspectConfig(ctx context.Context, cfgPath string, opts inspectConfigOpts,
if err != nil {
return err
}
return printConfig(fullCfg, l, streams)
return printConfig(fullCfg, streams)
}

cfg, lvl, err := getConfigWithVariables(ctx, l, cfgPath, opts.variablesWait)
Expand Down Expand Up @@ -167,10 +167,19 @@ func inspectConfig(ctx context.Context, cfgPath string, opts inspectConfigOpts,
if err != nil {
return fmt.Errorf("failed to get monitoring: %w", err)
}
components, binaryMapping, err := specs.PolicyToComponents(cfg, lvl, agentInfo)
components, err := specs.PolicyToComponents(cfg, lvl, agentInfo)
if err != nil {
return fmt.Errorf("failed to get binary mappings: %w", err)
}

// The monitoring config depends on a map from component id to
// binary name.
binaryMapping := make(map[string]string)
for _, component := range components {
if spec := component.InputSpec; spec != nil {
binaryMapping[component.ID] = spec.BinaryName
}
}
monitorCfg, err := monitorFn(cfg, components, binaryMapping)
if err != nil {
return fmt.Errorf("failed to get monitoring config: %w", err)
Expand Down Expand Up @@ -203,26 +212,12 @@ func printMapStringConfig(mapStr map[string]interface{}, streams *cli.IOStreams)
return err
}

func printConfig(cfg *config.Config, l *logger.Logger, streams *cli.IOStreams) error {
caps, err := capabilities.Load(paths.AgentCapabilitiesPath(), l)
if err != nil {
return err
}

func printConfig(cfg *config.Config, streams *cli.IOStreams) error {
mapStr, err := cfg.ToMapStr()
if err != nil {
return err
}
newCfg, err := caps.Apply(mapStr)
if err != nil {
return errors.New(err, "failed to apply capabilities")
}
newMap, ok := newCfg.(map[string]interface{})
if !ok {
return errors.New("config returned from capabilities has invalid type")
}

return printMapStringConfig(newMap, streams)
return printMapStringConfig(mapStr, streams)
ycombinator marked this conversation as resolved.
Show resolved Hide resolved
}

type inspectComponentsOpts struct {
Expand All @@ -232,6 +227,11 @@ type inspectComponentsOpts struct {
variablesWait time.Duration
}

// returns true if the given Capabilities config blocks the given component.
func blockedByCaps(c component.Component, caps capabilities.Capabilities) bool {
return !caps.AllowInput(c.InputType) || !caps.AllowOutput(c.OutputType)
}

func inspectComponents(ctx context.Context, cfgPath string, opts inspectComponentsOpts, streams *cli.IOStreams) error {
l, err := newErrorLogger()
if err != nil {
Expand Down Expand Up @@ -270,6 +270,26 @@ func inspectComponents(ctx context.Context, cfgPath string, opts inspectComponen
return fmt.Errorf("failed to render components: %w", err)
}

// Hide configuration unless toggled on.
if !opts.showConfig {
for i, comp := range comps {
for key, unit := range comp.Units {
unit.Config = nil
comp.Units[key] = unit
}
comps[i] = comp
}
}

// Hide runtime specification unless toggled on.
if !opts.showSpec {
for i, comp := range comps {
comp.InputSpec = nil
comp.ShipperSpec = nil
comps[i] = comp
}
}

// ID provided.
if opts.id != "" {
splitID := strings.SplitN(opts.id, "/", 2)
Expand All @@ -287,42 +307,27 @@ func inspectComponents(ctx context.Context, cfgPath string, opts inspectComponen
}
return fmt.Errorf("unable to find unit with ID: %s/%s", compID, unitID)
}
if !opts.showSpec {
comp.InputSpec = nil
comp.ShipperSpec = nil
}
if !opts.showConfig {
for key, unit := range comp.Units {
unit.Config = nil
comp.Units[key] = unit
}
}
return printComponent(comp, streams)
}
return fmt.Errorf("unable to find component with ID: %s", compID)
}

// Hide configuration unless toggled on.
if !opts.showConfig {
for i, comp := range comps {
for key, unit := range comp.Units {
unit.Config = nil
comp.Units[key] = unit
}
comps[i] = comp
}
// Separate any components that are blocked by capabilities config
caps, err := capabilities.LoadFile(paths.AgentCapabilitiesPath(), l)
if err != nil {
return err
}

// Hide runtime specification unless toggled on.
if !opts.showSpec {
for i, comp := range comps {
comp.InputSpec = nil
comp.ShipperSpec = nil
comps[i] = comp
allowed := []component.Component{}
blocked := []component.Component{}
for _, c := range comps {
if blockedByCaps(c, caps) {
blocked = append(blocked, c)
} else {
allowed = append(allowed, c)
}
}

return printComponents(comps, streams)
return printComponents(allowed, blocked, streams)
}

func getMonitoringFn(cfg map[string]interface{}) (component.GenerateMonitoringCfgFn, error) {
Expand All @@ -346,10 +351,6 @@ func getMonitoringFn(cfg map[string]interface{}) (component.GenerateMonitoringCf
}

func getConfigWithVariables(ctx context.Context, l *logger.Logger, cfgPath string, timeout time.Duration) (map[string]interface{}, logp.Level, error) {
caps, err := capabilities.Load(paths.AgentCapabilitiesPath(), l)
if err != nil {
return nil, logp.InfoLevel, fmt.Errorf("failed to determine capabilities: %w", err)
}

cfg, err := operations.LoadFullAgentConfig(l, cfgPath, true)
if err != nil {
Expand All @@ -368,16 +369,6 @@ func getConfigWithVariables(ctx context.Context, l *logger.Logger, cfgPath strin
return nil, lvl, fmt.Errorf("could not create the AST from the configuration: %w", err)
}

var ok bool
updatedAst, err := caps.Apply(ast)
if err != nil {
return nil, lvl, fmt.Errorf("failed to apply capabilities: %w", err)
}
ast, ok = updatedAst.(*transpiler.AST)
if !ok {
return nil, lvl, fmt.Errorf("failed to transform object returned from capabilities to AST: %w", err)
}

// Wait for the variables based on the timeout.
vars, err := vars.WaitForVariables(ctx, l, cfg, timeout)
if err != nil {
Expand Down Expand Up @@ -417,11 +408,17 @@ func getLogLevel(rawCfg *config.Config, cfgPath string) (logp.Level, error) {
return logger.DefaultLogLevel, nil
}

func printComponents(components []component.Component, streams *cli.IOStreams) error {
func printComponents(
components []component.Component,
blocked []component.Component,
ycombinator marked this conversation as resolved.
Show resolved Hide resolved
streams *cli.IOStreams,
) error {
topLevel := struct {
Components []component.Component `yaml:"components"`
Blocked []component.Component `yaml:"blocked_by_capabilities"`
}{
Components: components,
Blocked: blocked,
}
data, err := yaml.Marshal(topLevel)
if err != nil {
Expand Down
Loading