Skip to content

Commit

Permalink
Added new client message type FlowRequest (Velocidex#2391)
Browse files Browse the repository at this point in the history
This message combines all collections in the same flow into a single
message, reducing the total number of task files in message
queues. This should help to reduce IO operations and increase
throughput
  • Loading branch information
scudette authored Jan 22, 2023
1 parent 78affca commit 80ec8da
Show file tree
Hide file tree
Showing 19 changed files with 166 additions and 85 deletions.
8 changes: 1 addition & 7 deletions actions/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,13 +224,7 @@ func (self UpdateEventTable) Run(
defer service_wg.Done()

// Name of the query we are running.
name := ""
for _, q := range event.Query {
if q.Name != "" {
name = q.Name
}
}

name := GetQueryName(event.Query)
if name != "" {
logger.Info("<green>Starting</> monitoring query %s", name)
}
Expand Down
13 changes: 2 additions & 11 deletions actions/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,8 @@ func (self *EventsTestSuite) TestEventTableUpdate() {
// Only one query will be selected now since no label is set
// on the client.
assert.Equal(self.T(), len(message.UpdateEventTable.Event), 1)
assert.Equal(self.T(), getQueryName(message.UpdateEventTable.Event[0]),
"EventArtifact1")
assert.Equal(self.T(), actions.GetQueryName(
message.UpdateEventTable.Event[0].Query), "EventArtifact1")

// Set the new table, this will execute the new queries and
// start the new table.
Expand Down Expand Up @@ -248,15 +248,6 @@ func (self *EventsTestSuite) TestEventTableUpdate() {
assert.Contains(self.T(), string(data), "EventArtifact2")
}

func getQueryName(args *actions_proto.VQLCollectorArgs) string {
for _, query := range args.Query {
if query.Name != "" {
return query.Name
}
}
return ""
}

func TestEventsTestSuite(t *testing.T) {
suite.Run(t, &EventsTestSuite{})
}
12 changes: 12 additions & 0 deletions actions/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package actions

import actions_proto "www.velocidex.com/golang/velociraptor/actions/proto"

func GetQueryName(args []*actions_proto.VQLRequest) string {
for _, query := range args {
if query.Name != "" {
return query.Name
}
}
return ""
}
20 changes: 12 additions & 8 deletions actions/vql.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ func (self VQLClientAction) StartQuery(
return
}

name := GetQueryName(arg.Query)

// Clients do not have a copy of artifacts so they need to be
// sent all artifacts from the server.
manager, err := services.GetRepositoryManager(config_obj)
Expand Down Expand Up @@ -163,7 +165,7 @@ func (self VQLClientAction) StartQuery(
"incorrect or missed results or even crashes.")
}

scope.Log("INFO:Starting query execution.")
scope.Log("INFO:Starting query execution for %v.", name)

throttler := NewThrottler(ctx, scope, float64(rate),
float64(cpu_limit), float64(iops_limit))
Expand All @@ -188,18 +190,19 @@ func (self VQLClientAction) StartQuery(
responder.RaiseError(ctx, msg)
}

scope.Log("INFO:Collection is done after %v", time.Since(start))
scope.Log("INFO:Collection %v is done after %v", name, time.Since(start))
}()

ok, err := CheckPreconditions(ctx, scope, arg)
if err != nil {
scope.Log("While evaluating preconditions: %v", err)
responder.RaiseError(ctx, fmt.Sprintf("While evaluating preconditions: %v", err))
scope.Log("%v: While evaluating preconditions: %v", name, err)
responder.RaiseError(ctx,
fmt.Sprintf("While evaluating preconditions: %v", err))
return
}

if !ok {
scope.Log("Skipping query due to preconditions")
scope.Log("INFO:%v: Skipping query due to preconditions", name)
responder.Return(ctx)
return
}
Expand Down Expand Up @@ -247,7 +250,7 @@ func (self VQLClientAction) StartQuery(

case <-time.After(time.Second * time.Duration(heartbeat)):
responder.Log(ctx, logging.DEFAULT,
fmt.Sprintf("Time %v: %s: Waiting for rows.",
fmt.Sprintf("%v: Time %v: %s: Waiting for rows.", name,
(uint64(time.Now().UTC().UnixNano()/1000)-
query_start)/1000000, query.Name))

Expand Down Expand Up @@ -279,7 +282,8 @@ func (self VQLClientAction) StartQuery(
responder.Log(ctx,
logging.DEFAULT,
fmt.Sprintf(
"Time %v: %s: Sending response part %d %s (%d rows).",
"%v: Time %v: %s: Sending response part %d %s (%d rows).",
name,
(response.Timestamp-query_start)/1000000,
query.Name,
result.Part,
Expand All @@ -296,7 +300,7 @@ func (self VQLClientAction) StartQuery(

if uploader.Count > 0 {
responder.Log(ctx, logging.DEFAULT,
fmt.Sprintf("Uploaded %v files.", uploader.Count))
fmt.Sprintf("%v: Uploaded %v files.", name, uploader.Count))
}

responder.Return(ctx)
Expand Down
40 changes: 18 additions & 22 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,34 +152,30 @@ func (self *ClientExecutor) processRequestPlugin(
return
}

// Handle the requests. This used to be a plugin registration
// process but there are very few plugins any more and so it
// is easier to hard code this.
responder_obj := responder.NewResponder(ctx, config_obj, req, self.Outbound)
defer responder_obj.Close()

// Each request has its own context.
query_ctx, closer := flow_manager.FlowContext(req).NewQueryContext(
responder_obj, req)
defer closer()

// This is the old deprecated VQLClientAction that is sent for old
// client compatibility. New clients ignore this and only process
// a FlowRequest message.
if req.VQLClientAction != nil {
// Control concurrency on the executor only.
if !req.Urgent {
cancel, err := self.concurrency.StartConcurrencyControl(query_ctx)
if err != nil {
responder_obj.RaiseError(query_ctx, fmt.Sprintf("%v", err))
return
}
defer cancel()
}
return
}

actions.VQLClientAction{}.StartQuery(
config_obj, query_ctx, responder_obj, req.VQLClientAction)
if req.FlowRequest != nil {
self.ProcessFlowRequest(ctx, config_obj, req)
return
}

if req.UpdateEventTable != nil {
// Handle the requests. This used to be a plugin registration
// process but there are very few plugins any more and so it
// is easier to hard code this.
responder_obj := responder.NewResponder(ctx, config_obj, req, self.Outbound)
defer responder_obj.Close()

// Each request has its own context.
flow_context := flow_manager.FlowContext(req)
query_ctx, closer := flow_context.NewQueryContext(responder_obj)
defer closer()

actions.UpdateEventTable{}.Run(
config_obj, query_ctx, responder_obj, req.UpdateEventTable)
return
Expand Down
11 changes: 6 additions & 5 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,12 @@ func (self *ExecutorTestSuite) TestLogMessages() {
executor.Inbound <- &crypto_proto.VeloMessage{
AuthState: crypto_proto.VeloMessage_AUTHENTICATED,
SessionId: flow_id,
VQLClientAction: &actions_proto.VQLCollectorArgs{
Query: []*actions_proto.VQLRequest{
// Log 100 messages
{VQL: "SELECT log(message='log %v', args=count()) FROM range(end=10)"},
},
FlowRequest: &crypto_proto.FlowRequest{
VQLClientActions: []*actions_proto.VQLCollectorArgs{{
Query: []*actions_proto.VQLRequest{
// Log 100 messages
{VQL: "SELECT log(message='log %v', args=count()) FROM range(end=10)"},
}}},
},
RequestId: 1}

Expand Down
46 changes: 46 additions & 0 deletions executor/flows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package executor

import (
"context"
"fmt"

"www.velocidex.com/golang/velociraptor/actions"
config_proto "www.velocidex.com/golang/velociraptor/config/proto"
crypto_proto "www.velocidex.com/golang/velociraptor/crypto/proto"
"www.velocidex.com/golang/velociraptor/responder"
)

func (self *ClientExecutor) ProcessFlowRequest(
ctx context.Context,
config_obj *config_proto.Config, req *crypto_proto.VeloMessage) {

flow_manager := responder.GetFlowManager(ctx, config_obj)
flow_context := flow_manager.FlowContext(req)

// Control concurrency for the entire collection at once.
if !req.Urgent {
cancel, err := self.concurrency.StartConcurrencyControl(ctx)
if err != nil {
responder_obj := responder.NewResponder(ctx, config_obj,
req, self.Outbound)
defer responder_obj.Close()

responder_obj.RaiseError(ctx, fmt.Sprintf("%v", err))
return
}
defer cancel()
}

for _, arg := range req.FlowRequest.VQLClientActions {
responder_obj := responder.NewResponder(ctx, config_obj,
req, self.Outbound)
defer responder_obj.Close()

// Each request has its own context.
query_ctx, closer := flow_context.NewQueryContext(responder_obj)
defer closer()

actions.VQLClientAction{}.StartQuery(
config_obj, query_ctx, responder_obj, arg)
}
}
2 changes: 1 addition & 1 deletion flows/artifacts.go
Original file line number Diff line number Diff line change
Expand Up @@ -739,7 +739,7 @@ func (self *FlowRunner) ProcessSingleMessage(
Cancel: &crypto_proto.Cancel{},
SessionId: job.SessionId,
},
true /* notify */, nil)
services.NOTIFY_CLIENT, utils.BackgroundWriter)
if err != nil {
return fmt.Errorf("Queueing for client %v: %w",
job.Source, err)
Expand Down
13 changes: 10 additions & 3 deletions flows/artifacts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,9 +247,16 @@ func (self *TestSuite) TestResourceLimits() {
context.Background(), self.client_id)
assert.NoError(self.T(), err)

// Two requests since there are two source preconditions on
// Generic.Client.Info
assert.Equal(self.T(), len(messages), 2)
// The Generic.Client.Info has two source conditions so it
// contains two queries. To maintain backwards compatibility with
// older clients, GetClientTasks should have two old style
// VQLClientAction request and a new FlowRequest. Old clients will
// ignore the old requests and new clients will ignore the old
// style requests.
assert.Equal(self.T(), len(messages), 3)
assert.True(self.T(), messages[0].VQLClientAction != nil)
assert.True(self.T(), messages[1].VQLClientAction != nil)
assert.Equal(self.T(), len(messages[2].FlowRequest.VQLClientActions), 2)

// Send one row.
message := &crypto_proto.VeloMessage{
Expand Down
4 changes: 3 additions & 1 deletion flows/housekeeping.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
api_proto "www.velocidex.com/golang/velociraptor/api/proto"
config_proto "www.velocidex.com/golang/velociraptor/config/proto"
"www.velocidex.com/golang/velociraptor/services"
utils "www.velocidex.com/golang/velociraptor/utils"
)

var (
Expand Down Expand Up @@ -79,7 +80,8 @@ func CheckClientStatus(

clientEventUpdateCounter.Inc()
err := client_manager.QueueMessageForClient(
ctx, client_id, update_message, true, nil)
ctx, client_id, update_message,
services.NOTIFY_CLIENT, utils.BackgroundWriter)
if err != nil {
return err
}
Expand Down
4 changes: 3 additions & 1 deletion flows/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
crypto_proto "www.velocidex.com/golang/velociraptor/crypto/proto"
flows_proto "www.velocidex.com/golang/velociraptor/flows/proto"
"www.velocidex.com/golang/velociraptor/services"
utils "www.velocidex.com/golang/velociraptor/utils"
)

var (
Expand Down Expand Up @@ -78,5 +79,6 @@ func cancelCollection(
&crypto_proto.VeloMessage{
Cancel: &crypto_proto.Cancel{},
SessionId: flow_id,
}, true /* notify */, nil)
},
services.NOTIFY_CLIENT, utils.BackgroundWriter)
}
4 changes: 1 addition & 3 deletions responder/flow_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,7 @@ func (self *FlowContext) NextUploadId() int64 {
return new_id - 1
}

func (self *FlowContext) NewQueryContext(
responder *Responder, req *crypto_proto.VeloMessage) (
func (self *FlowContext) NewQueryContext(responder *Responder) (
ctx context.Context, closer func()) {
self.mu.Lock()
defer self.mu.Unlock()
Expand All @@ -178,7 +177,6 @@ func (self *FlowContext) NewQueryContext(
result := &QueryContext{
flow_id: self.flow_id,
cancel: cancel,
req: req,
id: utils.GetId(),
}

Expand Down
8 changes: 6 additions & 2 deletions server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,10 @@ func (self *ServerTestSuite) TestScheduleCollection() {

tasks, err := client_info_manager.PeekClientTasks(context.Background(), self.client_id)
assert.NoError(t, err)
assert.Equal(t, len(tasks), 2)
assert.Equal(t, len(tasks), 1)

// The request sends a single FlowRequest task with two queries
assert.Equal(t, len(tasks[0].FlowRequest.VQLClientActions), 2)

collection_context := &flows_proto.ArtifactCollectorContext{}
path_manager := paths.NewFlowPathManager(self.client_id, flow_id)
Expand Down Expand Up @@ -638,7 +641,8 @@ func (self *ServerTestSuite) TestCancellation() {
assert.NoError(t, err)

// Generic.Client.Info has two source preconditions in parallel
assert.Equal(t, len(tasks), 2)
assert.Equal(t, len(tasks), 1)
assert.Equal(t, len(tasks[0].FlowRequest.VQLClientActions), 2)

// Cancelling the flow will notify the client immediately.
launcher, err := services.GetLauncher(self.ConfigObj)
Expand Down
2 changes: 2 additions & 0 deletions services/client_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
)

const (
NOTIFY_CLIENT = true

Unknown ClientOS = iota
Windows
Linux
Expand Down
15 changes: 15 additions & 0 deletions services/client_info/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,21 @@ func (self *ClientInfoManager) GetClientTasks(
if err != nil {
return nil, err
}

// Handle backwards compatibility with older clients by expanding
// FlowRequest into separate VQLClientActions. Newer clients will
// ignore bare VQLClientActions and older clients will ignore
// FlowRequest.
if message.FlowRequest != nil {
for _, request := range message.FlowRequest.VQLClientActions {
result = append(result, &crypto_proto.VeloMessage{
SessionId: message.SessionId,
RequestId: message.RequestId,
VQLClientAction: request,
})
}
}

result = append(result, message)
}

Expand Down
Loading

0 comments on commit 80ec8da

Please sign in to comment.