Skip to content

Commit

Permalink
Refactor and reimplement the pool client. (Velocidex#2444)
Browse files Browse the repository at this point in the history
  • Loading branch information
scudette authored Feb 13, 2023
1 parent 680c5aa commit 9c061e4
Show file tree
Hide file tree
Showing 23 changed files with 331 additions and 240 deletions.
39 changes: 18 additions & 21 deletions actions/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,21 @@ func (self *EventTable) StartQueries(
}
}

func (self *EventTable) StartFromWriteback(
ctx context.Context, wg *sync.WaitGroup,
config_obj *config_proto.Config,
output_chan chan *crypto_proto.VeloMessage) {

// Get the event table from the writeback if possible.
event_table := &actions_proto.VQLEventTable{}

writeback, err := config.GetWriteback(config_obj.Client)
if err == nil && writeback.EventQueries != nil {
event_table = writeback.EventQueries
self.UpdateEventTable(ctx, wg, config_obj, output_chan, event_table)
}
}

func (self *EventTable) UpdateEventTable(
ctx context.Context,
wg *sync.WaitGroup,
Expand Down Expand Up @@ -277,36 +292,18 @@ func update_writeback(
func NewEventTable(
ctx context.Context,
wg *sync.WaitGroup,
config_obj *config_proto.Config,
output_chan chan *crypto_proto.VeloMessage,
table *actions_proto.VQLEventTable) *EventTable {
config_obj *config_proto.Config) *EventTable {

sub_ctx, cancel := context.WithCancel(ctx)

self := &EventTable{
Events: table.Event,
version: table.Version,
Ctx: sub_ctx,
cancel: cancel,
Ctx: sub_ctx,
cancel: cancel,

// Used to wait for close()
wg: &sync.WaitGroup{},
config_obj: config_obj,
monitoring_manager: responder.NewMonitoringManager(ctx),
}

// When service closes we close the last event table.
wg.Add(1)
go func() {
defer wg.Done()

// Kick off the initial set of queries.
self.StartQueries(ctx, config_obj, output_chan)

// Wait here until our parent context is cancelled.
<-ctx.Done()
self.Close()
}()

return self
}
12 changes: 9 additions & 3 deletions actions/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,20 @@ func (self *EventsTestSuite) SetupTest() {
self.responder = responder.TestResponderWithFlowId(
self.ConfigObj, "EventsTestSuite")
self.event_table = actions.NewEventTable(
self.Ctx, self.Wg, self.ConfigObj, self.responder.Output(),
self.Ctx, self.Wg, self.ConfigObj)
self.event_table.UpdateEventTable(
self.Ctx, self.Wg, self.ConfigObj,
self.responder.Output(),
&actions_proto.VQLEventTable{})
}

func (self *EventsTestSuite) InitializeEventTable(ctx context.Context,
wg *sync.WaitGroup, output_chan chan *crypto_proto.VeloMessage) *actions.EventTable {
return actions.NewEventTable(ctx, wg, self.ConfigObj, output_chan,
&actions_proto.VQLEventTable{})
result := actions.NewEventTable(ctx, wg, self.ConfigObj)
result.UpdateEventTable(ctx, wg, self.ConfigObj,
output_chan, &actions_proto.VQLEventTable{})

return result
}

func (self *EventsTestSuite) TearDownTest() {
Expand Down
3 changes: 1 addition & 2 deletions bin/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,7 @@ func runClientOnce(
return err
}

exe, err := executor.NewClientExecutor(
ctx, writeback.ClientId, config_obj)
exe, err := executor.NewClientExecutor(ctx, writeback.ClientId, config_obj)
if err != nil {
return fmt.Errorf("Can not create executor: %w", err)
}
Expand Down
16 changes: 5 additions & 11 deletions bin/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,23 +93,17 @@ func doPoolClient() error {
}

// Make a copy of all the configs for each client.
configs := make([]*config_proto.Config, 0, number_of_clients)
serialized, _ := json.Marshal(client_config)

for i := 0; i < number_of_clients; i++ {
client_config := &config_proto.Config{}
err := json.Unmarshal(serialized, &client_config)
if err != nil {
return fmt.Errorf("Copying configs: %w", err)
}
configs = append(configs, client_config)
}

c := counter{}

for i := 0; i < number_of_clients; i++ {
go func(i int) error {
client_config := configs[i]
client_config := &config_proto.Config{}
err := json.Unmarshal(serialized, &client_config)
if err != nil {
return fmt.Errorf("Copying configs: %w", err)
}
filename := fmt.Sprintf("pool_client.yaml.%d", i)
client_config.Client.WritebackLinux = path.Join(
*pool_client_writeback_dir, filename)
Expand Down
3 changes: 2 additions & 1 deletion datastore/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,11 @@ func MultiGetSubject(
for _, request := range requests {
wg.Add(1)
go func(request *MultiGetSubjectRequest) {
defer wg.Done()

mu.Lock()
defer mu.Unlock()
request.Err = db.GetSubject(config_obj, request.Path, request.Message)
wg.Done()
}(request)
}
mu.Unlock()
Expand Down
13 changes: 2 additions & 11 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (

"google.golang.org/protobuf/proto"
"www.velocidex.com/golang/velociraptor/actions"
"www.velocidex.com/golang/velociraptor/config"
"www.velocidex.com/golang/velociraptor/json"
"www.velocidex.com/golang/velociraptor/utils"

Expand All @@ -41,7 +40,6 @@ type Executor interface {
ClientId() string

// These are called by the executor code.
ReadFromServer() *crypto_proto.VeloMessage
SendToServer(message *crypto_proto.VeloMessage)

// These two are called by the comms module.
Expand Down Expand Up @@ -184,13 +182,6 @@ func NewClientExecutor(
level = 2
}

// Get the event table from the writeback if possible.
event_table := &actions_proto.VQLEventTable{}
writeback, err := config.GetWriteback(config_obj.Client)
if err == nil && writeback.EventQueries != nil {
event_table = writeback.EventQueries
}

wg := &sync.WaitGroup{}
self := &ClientExecutor{
ctx: ctx,
Expand All @@ -204,8 +195,8 @@ func NewClientExecutor(
}

// Install and initialize the event manager
self.event_manager = actions.NewEventTable(
ctx, wg, config_obj, self.Outbound, event_table)
self.event_manager = actions.NewEventTable(ctx, wg, config_obj)
self.event_manager.StartFromWriteback(ctx, wg, config_obj, self.Outbound)

// Drain messages from server and execute them, pushing
// results to the output channel.
Expand Down
Loading

0 comments on commit 9c061e4

Please sign in to comment.