Skip to content

Commit

Permalink
Implement row limits on collections (Velocidex#602)
Browse files Browse the repository at this point in the history
This change keeps track of the number of rows returned from the client
in each VQLResponse inside the collection context. When the number of
returned rows exceeds the limit, the collection is cancelled
automatically.

A similar limit is also implemented using the TotalUploadedBytes
metric. When the collection collects more than this many bytes, the
collection is also cancelled.

Separately from this an optimization is implemented where clients
encode their response into JSONL format. This allows the server to
directly copy the client's responses to the end of the result queue
within needing to decode and encode it again. This improves collection
efficiency a lot - especially for collections that return a lot of rows.
  • Loading branch information
scudette authored Aug 29, 2020
1 parent 610b382 commit 21b28a4
Show file tree
Hide file tree
Showing 35 changed files with 864 additions and 814 deletions.
220 changes: 115 additions & 105 deletions actions/proto/vql.pb.go

Large diffs are not rendered by default.

7 changes: 7 additions & 0 deletions actions/proto/vql.proto
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,16 @@ message VQLTypeMap {
}

message VQLResponse {
// Response is encoded in a json array of rows.
string Response = 1 [(sem_type) = {
description: "JSON encoded response.",
}];

// Response is encoded as line delimited JSON.
string JSONLResponse = 10 [(sem_type) = {
description: "JSON encoded response.",
}];

repeated string Columns = 2 [(sem_type) = {
description: "A list of column headings produced by the query.",
}];
Expand Down
13 changes: 7 additions & 6 deletions actions/vql.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func (self VQLClientAction) StartQuery(

result_chan := vfilter.GetResponseChannel(
vql, sub_ctx, scope,
vql_subsystem.MarshalJson(scope),
vql_subsystem.MarshalJsonl(scope),
int(max_row),
int(max_wait))
run_query:
Expand Down Expand Up @@ -187,11 +187,12 @@ func (self VQLClientAction) StartQuery(
continue
}
response := &actions_proto.VQLResponse{
Query: query,
QueryId: uint64(query_idx),
Part: uint64(result.Part),
Response: string(result.Payload),
Timestamp: uint64(time.Now().UTC().UnixNano() / 1000),
Query: query,
QueryId: uint64(query_idx),
Part: uint64(result.Part),
JSONLResponse: string(result.Payload),
TotalRows: uint64(result.TotalRows),
Timestamp: uint64(time.Now().UTC().UnixNano() / 1000),
}
// Don't log empty VQL statements.
if query.Name != "" {
Expand Down
11 changes: 7 additions & 4 deletions api/assets_filesystem.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,13 @@ func install_static_assets(config_obj *config_proto.Config, mux *http.ServeMux)
mux.Handle(base+"/static/", http.StripPrefix(base+"/static/", http.FileServer(
http.Dir("gui/static"))))

// Useful for debugging as it allows the browser to set
// breakpoints and matches the sourcemaps.
mux.Handle("/static/", http.StripPrefix("/static/", http.FileServer(
http.Dir("gui/static"))))
if base != "" {
// Useful for debugging as it allows the browser to
// set breakpoints and matches the sourcemaps. the
// static handlers again.
mux.Handle("/static/", http.StripPrefix("/static/", http.FileServer(
http.Dir("gui/static"))))
}

mux.Handle(base+"/favicon.ico",
http.RedirectHandler(base+"/static/images/favicon.ico",
Expand Down
5 changes: 3 additions & 2 deletions api/vfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ func renderDBVFS(

// Empty responses mean the directory is empty - no need to
// worry about downloads.
if result.Response == "" {
json_response := result.Response
if json_response == "" {
return result, nil
}

Expand All @@ -152,7 +153,7 @@ func renderDBVFS(
}

var rows []map[string]interface{}
err := json.Unmarshal([]byte(result.Response), &rows)
err := json.Unmarshal([]byte(json_response), &rows)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion artifacts/definitions/Generic/Utils/FetchBinary.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ sources:
-- clients at least back to 0.3.9
LET info_cache <= SELECT * FROM info()
LET inventory_item <= SELECT inventory_get(tool=ToolName) AS Item FROM scope()
LET inventory_item = SELECT inventory_get(tool=ToolName) AS Item FROM scope()
LET args <= SELECT * FROM switch(
// Try to get info from the ToolInfo parameter.
Expand Down
7 changes: 6 additions & 1 deletion bin/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,12 @@ func doRemoteQuery(
continue
}

rows, err := utils.ParseJsonToDicts([]byte(response.Response))
json_response := response.Response
if json_response == "" {
json_response = response.JSONLResponse
}

rows, err := utils.ParseJsonToDicts([]byte(json_response))
kingpin.FatalIfError(err, "GetAPIClient")

switch format {
Expand Down
2 changes: 1 addition & 1 deletion bin/shell.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func shell_executor(config_obj *config_proto.Config,
return
}

if response.Context.State == flows_proto.ArtifactCollectorContext_TERMINATED {
if response.Context.State == flows_proto.ArtifactCollectorContext_FINISHED {
request := &api_proto.GetTableRequest{
FlowId: flow_id,
Artifact: artifact_name,
Expand Down
12 changes: 11 additions & 1 deletion file_store/directory/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,20 @@ func ReadRowsJSON(
if err != nil {
return
}

// We have reached the end.
if len(row_data) == 0 {
return
}

item := ordereddict.NewDict()

// We failed to unmarshal one line of
// JSON - it may be corrupted, go to
// the next one.
err = item.UnmarshalJSON(row_data)
if err != nil {
return
continue
}

ts, pres := item.Get("_ts")
Expand Down
2 changes: 1 addition & 1 deletion flows/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ func ArchiveFlow(
}
}()

if collection_context.State != flows_proto.ArtifactCollectorContext_TERMINATED &&
if collection_context.State != flows_proto.ArtifactCollectorContext_FINISHED &&
collection_context.State != flows_proto.ArtifactCollectorContext_ERROR {
return nil, errors.New("Flow must be stopped before it can be archived.")
}
Expand Down
85 changes: 61 additions & 24 deletions flows/artifacts.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,31 @@ var (
})
)

// closeContext is called after all messages from the clients are
// processed in this group. Client messages are sent in groups inside
// the same POST request. Most of the time they belong to the same
// collection context. Therefore it makes sense to keep information in
// memory between processing individual messages. At the end of the
// processing we can close the context and flush data to disk.
func closeContext(
config_obj *config_proto.Config,
collection_context *flows_proto.ArtifactCollectorContext) error {

// Context is not dirty - nothing to do.
if !collection_context.Dirty || collection_context.ClientId == "" {
return nil
}

// Decide if this collection exceeded its quota.
err := checkContextResourceLimits(config_obj, collection_context)
if err != nil {
return err
}

if collection_context.StartTime == 0 {
collection_context.StartTime = uint64(time.Now().UnixNano() / 1000)
}

collection_context.ActiveTime = uint64(time.Now().UnixNano() / 1000)

if len(collection_context.Logs) > 0 {
Expand Down Expand Up @@ -164,7 +182,7 @@ func flushContextUploadedFiles(

for _, row := range collection_context.UploadedFiles {
rs_writer.Write(ordereddict.NewDict().
Set("Timestamp", fmt.Sprintf("%v", time.Now().UTC().Unix())).
Set("Timestamp", time.Now().UTC().Unix()).
Set("started", time.Now().UTC().String()).
Set("vfs_path", row.Name).
Set("file_size", row.Size).
Expand Down Expand Up @@ -238,7 +256,6 @@ func ArtifactCollectorProcessOneMessage(
return nil
}

// Restore strings from flow state.
response := message.VQLResponse
if response == nil || response.Query == nil {
return errors.New("Expected args of type VQLResponse")
Expand All @@ -253,7 +270,7 @@ func ArtifactCollectorProcessOneMessage(
return err
}

// Store the event log in the client's VFS.
rows_written := uint64(0)
if response.Query.Name != "" {
path_manager := result_sets.NewArtifactPathManager(config_obj,
collection_context.Request.ClientId,
Expand All @@ -262,30 +279,47 @@ func ArtifactCollectorProcessOneMessage(

rs_writer, err := result_sets.NewResultSetWriter(
config_obj, path_manager, nil, false /* truncate */)
if err != nil {
fmt.Printf("Error: %v\n", err)
return err
}
defer rs_writer.Close()

rows, err := utils.ParseJsonToDicts([]byte(response.Response))
if err != nil {
fmt.Printf("Error: %v\n", err)
return err
}
// Support the old clients which send JSON
// array responses. We need to decode the JSON
// response, then re-encode it into JSONL for
// log files.
if len(response.Response) > 0 {
if err != nil {
return err
}
defer rs_writer.Close()

rows, err := utils.ParseJsonToDicts([]byte(
response.Response))
if err != nil {
return err
}

for _, row := range rows {
rs_writer.Write(row)
for _, row := range rows {
rows_written++
rs_writer.Write(row)
}

// New clients already encode the JSON
// as line delimited, so we only need
// to append to end of the log file -
// much faster!
} else if len(response.JSONLResponse) > 0 {
rs_writer.WriteJSONL([]byte(response.JSONLResponse))
rows_written = response.TotalRows
}

// Update the artifacts with results in the
// context.
if len(rows) > 0 && !utils.InString(
collection_context.ArtifactsWithResults,
response.Query.Name) {
collection_context.ArtifactsWithResults = append(
collection_context.ArtifactsWithResults,
response.Query.Name)
if rows_written > 0 {
if !utils.InString(collection_context.ArtifactsWithResults,
response.Query.Name) {
collection_context.ArtifactsWithResults = append(
collection_context.ArtifactsWithResults,
response.Query.Name)
}
collection_context.TotalCollectedRows += rows_written
collection_context.Dirty = true
}
}
Expand All @@ -299,14 +333,17 @@ func IsRequestComplete(
collection_context *flows_proto.ArtifactCollectorContext,
message *crypto_proto.GrrMessage) (bool, error) {

// Nope request is not complete.
if message.Status == nil {
return false, nil
}

// Complete the collection
if collection_context == nil || collection_context.Request == nil {
return false, errors.New("Invalid collection context")
}

// Update any hunts if needed.
if constants.HuntIdRegex.MatchString(collection_context.Request.Creator) {
err := services.GetHuntDispatcher().ModifyHunt(
collection_context.Request.Creator,
Expand All @@ -323,10 +360,10 @@ func IsRequestComplete(

// Only terminate a running flow.
if collection_context.State == flows_proto.ArtifactCollectorContext_RUNNING {
collection_context.State = flows_proto.ArtifactCollectorContext_TERMINATED
collection_context.KillTimestamp = uint64(time.Now().UnixNano() / 1000)
collection_context.State = flows_proto.ArtifactCollectorContext_FINISHED
collection_context.Dirty = true
}

return true, nil
}

Expand Down Expand Up @@ -355,7 +392,7 @@ func FailIfError(
}

collection_context.State = flows_proto.ArtifactCollectorContext_ERROR
collection_context.KillTimestamp = uint64(time.Now().UnixNano() / 1000)
collection_context.ActiveTime = uint64(time.Now().UnixNano() / 1000)
collection_context.Status = message.Status.ErrorMessage
collection_context.Backtrace = message.Status.Backtrace
collection_context.Dirty = true
Expand Down
Loading

0 comments on commit 21b28a4

Please sign in to comment.