Skip to content

Commit

Permalink
Refactored Flows into artifact collections. (Velocidex#139)
Browse files Browse the repository at this point in the history
Flows were an integral part of the old GRR design. GRR uses flows as a
plugin to implement new server side code. Velociraptor's design
revolves around VQL and so now there is only a single "Flow" left -
ArtifactCollector.

This changes removes the old pluggable flow and its machinary into a
single ArtifactCollector - features that the old flow system has that
were not needed are now removed: No more flow context holding opaque
data, no more double serializations.
  • Loading branch information
scudette authored Oct 25, 2019
1 parent 5b5139c commit a22e898
Show file tree
Hide file tree
Showing 214 changed files with 2,350 additions and 41,887 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ build_release: build_docker

debug:
dlv debug --build-flags="-tags 'server_vql extras'" \
./bin/ -- frontend -v
./bin/ -- frontend -v --debug

debug_client:
dlv debug --build-flags="-tags 'server_vql extras'" \
Expand Down
16 changes: 9 additions & 7 deletions actions/vql.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,18 @@ func (self VQLClientAction) StartQuery(
arg *actions_proto.VQLCollectorArgs) {

// Set reasonable defaults.
if arg.MaxWait == 0 {
arg.MaxWait = config_obj.Client.DefaultMaxWait
max_wait := arg.MaxWait
if max_wait == 0 {
max_wait = config_obj.Client.DefaultMaxWait

if arg.MaxWait == 0 {
arg.MaxWait = 100
if max_wait == 0 {
max_wait = 100
}
}

if arg.MaxRow == 0 {
arg.MaxRow = 10000
max_row := arg.MaxRow
if max_row == 0 {
max_row = 10000
}

rate := arg.OpsPerSecond
Expand Down Expand Up @@ -143,7 +145,7 @@ func (self VQLClientAction) StartQuery(
}

result_chan := vfilter.GetResponseChannel(
vql, sub_ctx, scope, int(arg.MaxRow), int(arg.MaxWait))
vql, sub_ctx, scope, int(max_row), int(max_wait))
run_query:
for {
select {
Expand Down
20 changes: 10 additions & 10 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,10 @@ func (self *ApiServer) GetReport(
return getReport(ctx, self.config, in)
}

func (self *ApiServer) LaunchFlow(
func (self *ApiServer) CollectArtifact(
ctx context.Context,
in *flows_proto.FlowRunnerArgs) (*api_proto.StartFlowResponse, error) {
result := &api_proto.StartFlowResponse{}
in *flows_proto.ArtifactCollectorRequest) (*flows_proto.ArtifactCollectorResponse, error) {
result := &flows_proto.ArtifactCollectorResponse{Request: in}
creator := GetGRPCUserInfo(self.config, ctx).Name

// Internal calls from the frontend can set the creator.
Expand All @@ -153,12 +153,12 @@ func (self *ApiServer) LaunchFlow(
}
}

flow_id, err := flows.StartFlow(self.config, in)
flow_id, err := flows.ScheduleArtifactCollection(self.config, in)
if err != nil {
return nil, err
}
result.FlowId = *flow_id
result.RunnerArgs = in

result.FlowId = flow_id

// Notify the client if it is listenning.
channel := grpc_client.GetChannel(self.config)
Expand All @@ -173,14 +173,14 @@ func (self *ApiServer) LaunchFlow(
return nil, err
}

// Log this event as and Audit event.
// Log this event as an Audit event.
logging.GetLogger(self.config, &logging.Audit).
WithFields(logrus.Fields{
"user": in.Creator,
"client": in.ClientId,
"flow_id": flow_id,
"details": fmt.Sprintf("%v", in),
}).Info("LaunchFlow")
}).Info("CollectArtifact")

return result, nil
}
Expand Down Expand Up @@ -416,7 +416,7 @@ func (self *ApiServer) GetClientFlows(

func (self *ApiServer) GetFlowDetails(
ctx context.Context,
in *api_proto.ApiFlowRequest) (*api_proto.ApiFlow, error) {
in *api_proto.ApiFlowRequest) (*api_proto.FlowDetails, error) {

result, err := flows.GetFlowDetails(self.config, in.ClientId, in.FlowId)
return result, err
Expand Down Expand Up @@ -478,7 +478,7 @@ func (self *ApiServer) VFSStatDirectory(
func (self *ApiServer) VFSRefreshDirectory(
ctx context.Context,
in *api_proto.VFSRefreshDirectoryRequest) (
*api_proto.StartFlowResponse, error) {
*flows_proto.ArtifactCollectorResponse, error) {

result, err := vfsRefreshDirectory(
self, ctx, in.ClientId, in.VfsPath, in.Depth)
Expand Down
35 changes: 35 additions & 0 deletions api/artifacts.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@ import (
"strings"

context "golang.org/x/net/context"
actions_proto "www.velocidex.com/golang/velociraptor/actions/proto"
api_proto "www.velocidex.com/golang/velociraptor/api/proto"
"www.velocidex.com/golang/velociraptor/artifacts"
artifacts_proto "www.velocidex.com/golang/velociraptor/artifacts/proto"
config_proto "www.velocidex.com/golang/velociraptor/config/proto"
"www.velocidex.com/golang/velociraptor/constants"
file_store "www.velocidex.com/golang/velociraptor/file_store"
flows_proto "www.velocidex.com/golang/velociraptor/flows/proto"
)

const (
Expand Down Expand Up @@ -261,3 +263,36 @@ func (self *ApiServer) ListAvailableEventResults(

return result, nil
}

// MakeCollectorRequest is a convenience function for creating
// flows_proto.ArtifactCollectorRequest protobufs.
func MakeCollectorRequest(
client_id string, artifact_name string,
parameters ...string) *flows_proto.ArtifactCollectorRequest {
result := &flows_proto.ArtifactCollectorRequest{
ClientId: client_id,
Request: &flows_proto.ArtifactCollectorArgs{
Artifacts: &flows_proto.Artifacts{
Names: []string{artifact_name},
},
Parameters: &flows_proto.ArtifactParameters{},
},
}

if len(parameters)%2 != 0 {
parameters = parameters[:len(parameters)-len(parameters)%2]
}

for i := 0; i < len(parameters); {
k := parameters[i]
i++
v := parameters[i]
i++
result.Request.Parameters.Env = append(result.Request.Parameters.Env,
&actions_proto.VQLEnv{
Key: k, Value: v,
})
}

return result
}
2 changes: 1 addition & 1 deletion api/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ func filestorePathForVFSPath(
// monitoring and artifacts vfs folders are in the client's
// space.
if strings.HasPrefix(vfs_path, "/monitoring/") ||
strings.HasPrefix(vfs_path, "/flows/") ||
strings.HasPrefix(vfs_path, "/collections/") ||
strings.HasPrefix(vfs_path, "/artifacts/") {
return path.Join(
"clients", client_id, vfs_path)
Expand Down
Loading

0 comments on commit a22e898

Please sign in to comment.