Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support backwards compatibility comms with older clients. #2405

Merged
merged 3 commits into from
Jan 31, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Support backwards compatibility comms with older clients.
  • Loading branch information
scudette committed Jan 30, 2023
commit 8870d810aaa83929e832ab1d8284099b2ac832fb
22 changes: 11 additions & 11 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,26 +124,26 @@ func (self *ClientExecutor) processRequestPlugin(
return
}

// This is the old deprecated VQLClientAction that is sent for old
// client compatibility. New clients ignore this and only process
// a FlowRequest message.
if req.VQLClientAction != nil {
if req.FlowRequest != nil {
self.ProcessFlowRequest(ctx, config_obj, req)
return
}

// This action is deprecated now.
if req.UpdateForeman != nil {
if req.UpdateEventTable != nil {
actions.UpdateEventTable{}.Run(
config_obj, ctx, self.Outbound, req.UpdateEventTable)
return
}

if req.FlowRequest != nil {
self.ProcessFlowRequest(ctx, config_obj, req)
// This is the old deprecated VQLClientAction that is sent for old
// client compatibility. New clients ignore this and only process
// a FlowRequest message.
if req.VQLClientAction != nil {
return
}

if req.UpdateEventTable != nil {
actions.UpdateEventTable{}.Run(
config_obj, ctx, self.Outbound, req.UpdateEventTable)
// This action is deprecated now.
if req.UpdateForeman != nil {
return
}

Expand Down
8 changes: 5 additions & 3 deletions flows/artifacts.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ import (
artifact_paths "www.velocidex.com/golang/velociraptor/paths/artifacts"
"www.velocidex.com/golang/velociraptor/result_sets"
"www.velocidex.com/golang/velociraptor/services"
"www.velocidex.com/golang/velociraptor/services/launcher"
utils "www.velocidex.com/golang/velociraptor/utils"
)

Expand Down Expand Up @@ -471,7 +470,6 @@ func CheckForStatus(
// Update our record of all the status messages from this
// collection.
updateQueryStats(config_obj, collection_context, message.Status)
launcher.UpdateFlowStats(&collection_context.ArtifactCollectorContext)

// Update the active time for each response.
collection_context.ActiveTime = uint64(time.Now().UnixNano() / 1000)
Expand Down Expand Up @@ -664,6 +662,10 @@ type FlowRunner struct {
config_obj *config_proto.Config
}

// A flow runner compatible with old clients. This runner manages flow
// state on the server which is less efficient. Modern clients
// (>0.6.7) will manage flows client side and so we will use
// the more efficient ClientFlowRunner instead.
func NewLegacyFlowRunner(config_obj *config_proto.Config) *FlowRunner {
return &FlowRunner{
config_obj: config_obj,
Expand Down Expand Up @@ -691,7 +693,7 @@ func (self *FlowRunner) ProcessSingleMessage(

// Only process real flows.
if !strings.HasPrefix(job.SessionId, "F.") {
return invalidSessionId
return nil
}

// json.TraceMessage(job.Source+"_job", job)
Expand Down
29 changes: 22 additions & 7 deletions services/client_info/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"google.golang.org/protobuf/proto"
actions_proto "www.velocidex.com/golang/velociraptor/actions/proto"
config_proto "www.velocidex.com/golang/velociraptor/config/proto"
crypto_proto "www.velocidex.com/golang/velociraptor/crypto/proto"
"www.velocidex.com/golang/velociraptor/datastore"
Expand Down Expand Up @@ -292,13 +293,27 @@ func (self *ClientInfoManager) GetClientTasks(
// FlowRequest into separate VQLClientActions. Newer clients will
// ignore bare VQLClientActions and older clients will ignore
// FlowRequest.
if message.FlowRequest != nil {
for _, request := range message.FlowRequest.VQLClientActions {
result = append(result, &crypto_proto.VeloMessage{
SessionId: message.SessionId,
RequestId: message.RequestId,
VQLClientAction: request,
})
if message.FlowRequest != nil &&
len(message.FlowRequest.VQLClientActions) > 0 {

// Tack the first VQLClientAction on top of the
// FlowRequest for backwards compatibility. Newer clients
// procees FlowRequest first and ignore VQLClientAction
// while older clients will process the VQLClientAction
// and ignore the FlowRequest message. In both cases the
// message will be valid.
message.VQLClientAction = proto.Clone(
message.FlowRequest.VQLClientActions[0]).(*actions_proto.VQLCollectorArgs)

// Send the rest of the VQLClientAction as distinct messages.
for idx, request := range message.FlowRequest.VQLClientActions {
if idx > 0 {
result = append(result, &crypto_proto.VeloMessage{
SessionId: message.SessionId,
RequestId: message.RequestId,
VQLClientAction: request,
})
}
}
}

Expand Down
32 changes: 21 additions & 11 deletions services/vfs_service/vfs_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"www.velocidex.com/golang/velociraptor/paths/artifacts"
"www.velocidex.com/golang/velociraptor/result_sets"
"www.velocidex.com/golang/velociraptor/services"
"www.velocidex.com/golang/velociraptor/services/journal"
"www.velocidex.com/golang/velociraptor/utils"
vql_subsystem "www.velocidex.com/golang/velociraptor/vql"
"www.velocidex.com/golang/vfilter"
Expand Down Expand Up @@ -194,7 +195,13 @@ func (self *VFSService) ProcessListDirectoryLegacy(
ctx context.Context,
config_obj *config_proto.Config,
scope vfilter.Scope, row *ordereddict.Dict,
flow *flows_proto.ArtifactCollectorContext) {
basic_flow *flows_proto.ArtifactCollectorContext) {

flow, err := journal.GetFlowFromQueue(config_obj, row)
if err != nil {
return
}
json.Dump(flow)

// An empty result set needs special handling.
if flow.TotalCollectedRows == 0 {
Expand Down Expand Up @@ -303,21 +310,24 @@ func (self *VFSService) flush_state(
config_obj *config_proto.Config, timestamp uint64, client_id, flow_id string,
vfs_components []string, start_idx, end_idx int) error {

record := &api_proto.VFSListResponse{
Timestamp: timestamp,
TotalRows: uint64(end_idx - start_idx),
ClientId: client_id,
FlowId: flow_id,
StartIdx: uint64(start_idx),
EndIdx: uint64(end_idx),
}

client_path_manager := paths.NewClientPathManager(client_id)
db, err := datastore.GetDB(config_obj)
if err != nil {
return err
}
client_path_manager := paths.NewClientPathManager(client_id)
return db.SetSubject(config_obj,

return db.SetSubjectWithCompletion(config_obj,
client_path_manager.VFSPath(vfs_components),
&api_proto.VFSListResponse{
Timestamp: timestamp,
TotalRows: uint64(end_idx - start_idx),
ClientId: client_id,
FlowId: flow_id,
StartIdx: uint64(start_idx),
EndIdx: uint64(end_idx),
})
record, utils.SyncCompleter)
}

// Modern clients do the above work on the client removing load from
Expand Down