Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

james/accelerate control via control update #1250

Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
c09f45e
adds accelerate control consumer and registers with control service
James-Pickett May 19, 2023
7bdbfbc
fixes pkg naming typos
James-Pickett May 19, 2023
353e8d5
fix typo
James-Pickett May 19, 2023
6689b21
WIP of launcher control server cmd tracking
James-Pickett May 25, 2023
f92fd8a
WIP command tracker wrapper for command consumers
James-Pickett May 26, 2023
0e858a8
adds action middleware, updates notifications to use action middleware
James-Pickett Jun 9, 2023
e799834
Merge branch 'main' into james/accelerate-control-via-control-update
James-Pickett Jul 11, 2023
c995ef5
updates notificaitons to use actions queue, adds accelerate contorl c…
James-Pickett Jul 20, 2023
c282dbd
Merge branch 'main' into james/accelerate-control-via-control-update
James-Pickett Jul 20, 2023
577e0f4
clean up
James-Pickett Jul 20, 2023
4843da5
Merge branch 'main' into james/accelerate-control-via-control-update
James-Pickett Jul 31, 2023
c768ca2
updates accelerate control consumer to use seconds
James-Pickett Jul 31, 2023
2cd6c86
naming updates, feedback
James-Pickett Aug 1, 2023
78c125b
better names
James-Pickett Aug 1, 2023
09a34eb
Merge branch 'kolide:main' into james/accelerate-control-via-control-…
James-Pickett Aug 1, 2023
c5e0f71
fix tests
James-Pickett Aug 1, 2023
862f5c5
Merge branch 'main' into james/accelerate-control-via-control-update
James-Pickett Aug 1, 2023
3eb7465
Merge branch 'main' into james/accelerate-control-via-control-update
James-Pickett Aug 1, 2023
6a153b3
add compoenent to action queue logger, update action queue to check o…
James-Pickett Aug 7, 2023
e427d61
Merge branch 'main' into james/accelerate-control-via-control-update
James-Pickett Aug 7, 2023
c8fb035
Merge branch 'main' into james/accelerate-control-via-control-update
James-Pickett Aug 15, 2023
e25062f
Merge branch 'main' into james/accelerate-control-via-control-update
James-Pickett Aug 23, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 20 additions & 11 deletions cmd/launcher/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"github.com/kolide/kit/version"
"github.com/kolide/launcher/cmd/launcher/internal"
"github.com/kolide/launcher/cmd/launcher/internal/updater"
"github.com/kolide/launcher/ee/control/actionqueue"
"github.com/kolide/launcher/ee/control/consumers/acceleratecontrolconsumer"
"github.com/kolide/launcher/ee/control/consumers/keyvalueconsumer"
"github.com/kolide/launcher/ee/control/consumers/notificationconsumer"
desktopRunner "github.com/kolide/launcher/ee/desktop/runner"
Expand Down Expand Up @@ -287,11 +289,9 @@ func runLauncher(ctx context.Context, cancel func(), opts *launcher.Options) err
runGroup.Add(controlService.ExecuteWithContext(ctx), controlService.Interrupt)

// serverDataConsumer handles server data table updates
serverDataConsumer := keyvalueconsumer.New(k.ServerProvidedDataStore())
controlService.RegisterConsumer(serverDataSubsystemName, serverDataConsumer)
controlService.RegisterConsumer(serverDataSubsystemName, keyvalueconsumer.New(k.ServerProvidedDataStore()))
// agentFlagConsumer handles agent flags pushed from the control server
agentFlagsConsumer := keyvalueconsumer.New(flagController)
controlService.RegisterConsumer(agentFlagsSubsystemName, agentFlagsConsumer)
controlService.RegisterConsumer(agentFlagsSubsystemName, keyvalueconsumer.New(flagController))

runner, err = desktopRunner.New(
k,
Expand All @@ -305,22 +305,31 @@ func runLauncher(ctx context.Context, cancel func(), opts *launcher.Options) err

runGroup.Add(runner.Execute, runner.Interrupt)
controlService.RegisterConsumer(desktopMenuSubsystemName, runner)
// Run the notification service

// create an action queue for all other action style commands
actionsQueue := actionqueue.New(
actionqueue.WithContext(ctx),
actionqueue.WithLogger(logger),
actionqueue.WithStore(k.ControlServerActionsStore()),
)
runGroup.Add(actionsQueue.StartCleanup, actionsQueue.StopCleanup)
controlService.RegisterConsumer(actionqueue.ActionsSubsystem, actionsQueue)

// register accelerate control consumer
actionsQueue.RegisterActor(acceleratecontrolconsumer.AccelerateControlSubsystem, acceleratecontrolconsumer.New(k))

// create notification consumer
notificationConsumer, err := notificationconsumer.NewNotifyConsumer(
k.SentNotificationsStore(),
runner,
ctx,
notificationconsumer.WithLogger(logger),
)
if err != nil {
return fmt.Errorf("failed to set up notifier: %w", err)
}
// Runs the cleanup routine for old notification records
runGroup.Add(notificationConsumer.Execute, notificationConsumer.Interrupt)

if err := controlService.RegisterConsumer(notificationconsumer.NotificationSubsystem, notificationConsumer); err != nil {
return fmt.Errorf("failed to register notify consumer: %w", err)
}
// register notifications consumer
actionsQueue.RegisterActor(notificationconsumer.NotificationSubsystem, notificationConsumer)

// Set up our tracing instrumentation
authTokenConsumer := keyvalueconsumer.New(k.TokenStore())
Expand Down
241 changes: 241 additions & 0 deletions ee/control/actionqueue/actionqueue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
package actionqueue

import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"time"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/kolide/launcher/pkg/agent/storage/inmemory"
"github.com/kolide/launcher/pkg/agent/types"
)

const (
ActionsSubsystem = "actions"
defaultCleanupInterval = time.Hour * 12
// about 6 months, long enough to ensure that K2 no longer has the message
// and will not send a duplicate
actionRetentionPeriod = time.Hour * 24 * 30 * 6
)

type actor interface {
Do(data io.Reader) error
}

type action struct {
ID string `json:"id"`
ValidUntil int64 `json:"valid_until"` // timestamp
Type string `json:"type"`
ProcessedAt time.Time `json:"processed_at,omitempty"`
}

type actionqueue struct {
ctx context.Context
actors map[string]actor
store types.KVStore
logger log.Logger
actionCleanupInterval time.Duration
cancel context.CancelFunc
}

type actionqueueOption func(*actionqueue)

func WithLogger(logger log.Logger) actionqueueOption {
return func(aq *actionqueue) {
aq.logger = logger
James-Pickett marked this conversation as resolved.
Show resolved Hide resolved
}
}

func WithStore(store types.KVStore) actionqueueOption {
return func(aq *actionqueue) {
aq.store = store
}
}

func WithCleanupInterval(cleanupInterval time.Duration) actionqueueOption {
return func(aq *actionqueue) {
aq.actionCleanupInterval = cleanupInterval
}
}

func WithContext(ctx context.Context) actionqueueOption {
return func(aq *actionqueue) {
aq.ctx = ctx
}
}

func New(opts ...actionqueueOption) *actionqueue {
aq := &actionqueue{
ctx: context.Background(),
actors: make(map[string]actor, 0),
actionCleanupInterval: defaultCleanupInterval,
logger: log.NewNopLogger(),
}

for _, opt := range opts {
opt(aq)
}

if aq.store == nil {
aq.store = inmemory.NewStore(aq.logger)
}

return aq
}

func (aq *actionqueue) Update(data io.Reader) error {
// We want to unmarshal each action separately, so that we don't fail to send all actions
// if only some are malformed.
var rawActionsToProcess []json.RawMessage
if err := json.NewDecoder(data).Decode(&rawActionsToProcess); err != nil {
return fmt.Errorf("failed to decode actions data: %w", err)
}

for _, rawAction := range rawActionsToProcess {
var action action
if err := json.Unmarshal(rawAction, &action); err != nil {
level.Debug(aq.logger).Log("msg", "received action in unexpected format from K2, discarding", "err", err)
continue
}

if !aq.isActionValid(action) || !aq.isActionNew(action.ID) {
continue
}

actor, err := aq.actorForAction(action)
if err != nil {
level.Info(aq.logger).Log("msg", "getting actor for action", "err", err)
continue
}

if err := actor.Do(bytes.NewReader(rawAction)); err != nil {
level.Info(aq.logger).Log("msg", "failed to do action with action, not marking action complete", "err", err)
continue
}

// only mark processed when actor was successful
action.ProcessedAt = time.Now().UTC()
aq.storeActionRecord(action)
}

return nil
}

func (aq *actionqueue) RegisterActor(actorType string, actorToRegister actor) {
aq.actors[actorType] = actorToRegister
}

func (aq *actionqueue) StartCleanup() error {
aq.runCleanup()
return nil
}

func (aq *actionqueue) runCleanup() {
ctx, cancel := context.WithCancel(aq.ctx)
aq.cancel = cancel

t := time.NewTicker(aq.actionCleanupInterval)
defer t.Stop()

for {
select {
case <-ctx.Done():
level.Debug(aq.logger).Log("msg", "action cleanup stopped due to context cancel")
return
case <-t.C:
aq.cleanupActions()
}
}
}

func (aq *actionqueue) StopCleanup(err error) {
aq.cancel()
}

func (aq *actionqueue) storeActionRecord(actionToStore action) {
rawAction, err := json.Marshal(actionToStore)
if err != nil {
level.Error(aq.logger).Log("msg", "could not marshal complete action", "err", err)
return
}

if err := aq.store.Set([]byte(actionToStore.ID), rawAction); err != nil {
level.Debug(aq.logger).Log("msg", "could not mark action complete", "err", err)
}
}

func (aq *actionqueue) isActionNew(id string) bool {
completedActionRaw, err := aq.store.Get([]byte(id))
if err != nil {
level.Error(aq.logger).Log("msg", "could not read action from bucket", "err", err)
James-Pickett marked this conversation as resolved.
Show resolved Hide resolved
return false
}

if completedActionRaw == nil {
// No previous record -- action has not been processed before, it's new
return true
}

return false
}

func (aq *actionqueue) isActionValid(a action) bool {
if a.ID == "" {
level.Info(aq.logger).Log("msg", "action ID is empty", "action", a)
return false
}

if a.ValidUntil <= 0 {
level.Info(aq.logger).Log("msg", "action valid until is empty", "action", a)
return false
}

return a.ValidUntil > time.Now().Unix()
}

func (aq *actionqueue) actorForAction(a action) (actor, error) {
if len(aq.actors) == 0 {
return nil, errors.New("no actor registered")
}

if a.Type == "" {
return nil, errors.New("action does not have type, cannot determine actor")
}

actor, ok := aq.actors[a.Type]
if !ok {
return nil, fmt.Errorf("actor type %s not found", a.Type)
}

return actor, nil
}

func (aq *actionqueue) cleanupActions() {
// Read through all keys in bucket to determine which ones are old enough to be deleted
keysToDelete := make([][]byte, 0)

if err := aq.store.ForEach(func(k, v []byte) error {
var processedAction action
if err := json.Unmarshal(v, &processedAction); err != nil {
return fmt.Errorf("error processing %s: %w", string(k), err)
}

if processedAction.ProcessedAt.Add(actionRetentionPeriod).Before(time.Now().UTC()) {
keysToDelete = append(keysToDelete, k)
}

return nil
}); err != nil {
level.Debug(aq.logger).Log("msg", "could not iterate over bucket items to determine which are expired", "err", err)
}

// Delete all old keys
if err := aq.store.Delete(keysToDelete...); err != nil {
level.Debug(aq.logger).Log("msg", "could not delete old actions from bucket", "err", err)
}
}
Loading
Loading