Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow client side collections to be traced. #2422

Merged
merged 3 commits into from
Feb 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions artifacts/definitions/Generic/Client/Trace.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
name: Generic.Client.Trace
description: |
This artifact collects profiling information about the running
client. The artifact is automatically added when the GUI selects a
non zero Trace frequency.

NOTE: You can also add the artifact directly, but then you will need
to cancel the collection manually since it will continue to run
until the timeout is reached.

Minimum Version: 0.6.8

parameters:
- name: FrequencySec
type: int
default: 10

sources:
- query: |
SELECT * FROM if(condition=version(function="trace"),
then={
SELECT trace() AS TraceFile
FROM clock(start=0, period=FrequencySec)
})
2 changes: 2 additions & 0 deletions artifacts/testdata/server/testcases/artifacts.out.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ SELECT *, basename(path=file_store(path=vfs_path)) FROM uploads(client_id='C.4f5
"timeout": 0,
"max_rows": 0,
"max_upload_bytes": 0,
"trace_freq_sec": 0,
"allow_custom_overrides": false,
"log_batch_time": 0,
"compiled_collector_args": [],
Expand Down Expand Up @@ -103,6 +104,7 @@ SELECT *, basename(path=file_store(path=vfs_path)) FROM uploads(client_id='C.4f5
"timeout": 0,
"max_rows": 0,
"max_upload_bytes": 0,
"trace_freq_sec": 0,
"allow_custom_overrides": false,
"log_batch_time": 0,
"compiled_collector_args": [],
Expand Down
7 changes: 1 addition & 6 deletions artifacts/testdata/server/testcases/profile.out.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,6 @@ SELECT * FROM profile(metrics=TRUE) WHERE Line.name =~ "client_comms_current_con
"help": "Number of currently connected clients.",
"value": 0
},
"FullPath": "",
"_RawMetric": {
"gauge": {
"value": 0
}
}
"FullPath": ""
}
]
691 changes: 352 additions & 339 deletions crypto/proto/jobs.pb.go

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions crypto/proto/jobs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ message FlowRequest {
// If the client exceeds this, the client will abort the
// collection.
uint64 max_upload_bytes = 5;

// Execute this trace query while the main collection is running.
repeated VQLCollectorArgs trace = 6;
}


Expand Down
19 changes: 9 additions & 10 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type Executor interface {
type ClientExecutor struct {
client_id string

wg *sync.WaitGroup
Inbound chan *crypto_proto.VeloMessage
Outbound chan *crypto_proto.VeloMessage

Expand Down Expand Up @@ -162,11 +163,12 @@ func NewClientExecutor(
level = 2
}

result := &ClientExecutor{
self := &ClientExecutor{
client_id: client_id,
Inbound: make(chan *crypto_proto.VeloMessage, 10),
Outbound: make(chan *crypto_proto.VeloMessage, 10),
concurrency: utils.NewConcurrencyControl(level, time.Hour),
wg: &sync.WaitGroup{},
config_obj: config_obj,
}

Expand All @@ -177,11 +179,8 @@ func NewClientExecutor(
// channels. The executed queries should finish by
// themselves when the context is done.

// defer close(result.Outbound)

// Do not exit until all goroutines have finished.
wg := &sync.WaitGroup{}
defer wg.Wait()
defer self.wg.Wait()

logger := logging.GetLogger(config_obj, &logging.ClientComponent)

Expand All @@ -195,25 +194,25 @@ func NewClientExecutor(

// Pump messages from input channel and
// process each request.
case req, ok := <-result.Inbound:
case req, ok := <-self.Inbound:
if !ok {
return
}

// Ignore unauthenticated messages - the
// server should never send us those.
if req.AuthState == crypto_proto.VeloMessage_AUTHENTICATED {
wg.Add(1)
self.wg.Add(1)
go func() {
defer wg.Done()
defer self.wg.Done()

logger.Debug("Received request: %v", req)
result.processRequestPlugin(config_obj, ctx, req)
self.processRequestPlugin(config_obj, ctx, req)
}()
}
}
}
}()

return result, nil
return self, nil
}
53 changes: 45 additions & 8 deletions executor/flows.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package executor

import (
"context"
"sync"

"www.velocidex.com/golang/velociraptor/actions"
actions_proto "www.velocidex.com/golang/velociraptor/actions/proto"
config_proto "www.velocidex.com/golang/velociraptor/config/proto"
crypto_proto "www.velocidex.com/golang/velociraptor/crypto/proto"
"www.velocidex.com/golang/velociraptor/responder"
Expand Down Expand Up @@ -31,14 +33,49 @@ func (self *ClientExecutor) ProcessFlowRequest(
defer cancel()
}

// Wait for the trace to finish recording all its data.
trace_wg := &sync.WaitGroup{}
defer trace_wg.Wait()

// Cancel traces when the entire collection exist.
trace_ctx, cancel := context.WithCancel(ctx)
defer cancel()

// Run trace queries now but do not wait for them to exit before
// cancelling them.
for _, arg := range req.FlowRequest.Trace {
trace_wg.Add(1)
go func(arg *actions_proto.VQLCollectorArgs) {
defer trace_wg.Done()

_, responder_obj := flow_context.NewResponder(arg)
defer responder_obj.Close()

actions.VQLClientAction{}.StartQuery(
config_obj, trace_ctx, responder_obj, arg)
}(arg)

}

// Wait for all subqueries before closing the collection.
wg := &sync.WaitGroup{}
defer wg.Wait()

for _, arg := range req.FlowRequest.VQLClientActions {
// A responder is used to track each specific query within the
// entire flow. There can be multiple queries run in parallel
// within the same flow.
sub_ctx, responder_obj := flow_context.NewResponder(arg)
defer responder_obj.Close()

actions.VQLClientAction{}.StartQuery(
config_obj, sub_ctx, responder_obj, arg)
wg.Add(1)

// Run each VQLClientActions in another goroutine.
go func(arg *actions_proto.VQLCollectorArgs) {
defer wg.Done()

// A responder is used to track each specific query within the
// entire flow. There can be multiple queries run in parallel
// within the same flow.
sub_ctx, responder_obj := flow_context.NewResponder(arg)
defer responder_obj.Close()

actions.VQLClientAction{}.StartQuery(
config_obj, sub_ctx, responder_obj, arg)
}(arg)
}
}
4 changes: 2 additions & 2 deletions flows/artifacts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -562,8 +562,8 @@ func (self *TestSuite) TestCollectionCompletionMultiQueryOkStatus() {
assert.Equal(self.T(), flows_proto.ArtifactCollectorContext_FINISHED,
flow.State)

// The flow represents the total from all queries.
assert.Equal(self.T(), int64(225), flow.ExecutionDuration)
// The duration represents the longest duration of all queries.
assert.Equal(self.T(), int64(125), flow.ExecutionDuration)
assert.Equal(self.T(), uint64(6), flow.TotalCollectedRows)
assert.Equal(self.T(), []string{
"Generic.Client.Info/BasicInformation",
Expand Down
Loading