Skip to content

Commit

Permalink
Implemented enrollment rate (Velocidex#1003)
Browse files Browse the repository at this point in the history
Now it is possible to limit the number of clients enrolled per
second. Also refactored code and fixed pool client so it supports
multiple requests per flow (e.g. Generic.Client.Info).
  • Loading branch information
scudette authored Apr 4, 2021
1 parent 4a5b2fb commit 029913e
Show file tree
Hide file tree
Showing 80 changed files with 1,511 additions and 1,422 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ name: Linux Build All Arches
on:
pull_request:
types: [closed]
branches:
- master
# branches:
# - master

jobs:
build:
Expand Down
97 changes: 83 additions & 14 deletions actions/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,44 @@ type EventTable struct {

config_obj *config_proto.Config

// This will be closed to signal we need to abort the current
// event queries.
// This will be closed to signal that we need to abort the
// current event queries.
Done chan bool
wg sync.WaitGroup
}

func (self *EventTable) equal(events []*actions_proto.VQLCollectorArgs) bool {
if len(events) != len(self.Events) {
return false
}

for i := range self.Events {
lhs := self.Events[i]
rhs := events[i]

if len(lhs.Query) != len(rhs.Query) {
return false
}

for j := range lhs.Query {
if !proto.Equal(lhs.Query[j], rhs.Query[j]) {
return false
}
}

if len(lhs.Env) != len(rhs.Env) {
return false
}

for j := range lhs.Env {
if !proto.Equal(lhs.Env[j], rhs.Env[j]) {
return false
}
}
}
return true
}

func (self *EventTable) Close() {
logger := logging.GetLogger(self.config_obj, &logging.ClientComponent)
logger.Info("Closing EventTable\n")
Expand All @@ -74,14 +106,28 @@ func GlobalEventTableVersion() uint64 {
func update(
config_obj *config_proto.Config,
responder *responder.Responder,
table *actions_proto.VQLEventTable) (*EventTable, error) {
table *actions_proto.VQLEventTable) (*EventTable, error, bool) {

mu.Lock()
defer mu.Unlock()

// Only update the event table if we need to.
if table.Version <= GlobalEventTable.version {
return GlobalEventTable, nil
return GlobalEventTable, nil, false
}

// If the new update is identical to the old queries we wont
// restart. This can happen e.g. if the server changes label
// groups and recaculates the table version but the actual
// queries dont end up changing.
if GlobalEventTable.equal(table.Event) {
logger := logging.GetLogger(config_obj, &logging.ClientComponent)
logger.Info("Client event query update %v did not "+
"change queries, skipping", table.Version)

// Update the version only but keep queries the same.
GlobalEventTable.version = table.Version
return GlobalEventTable, nil, false
}

// Close the old table.
Expand All @@ -90,10 +136,9 @@ func update(
}

// Make a new table.
GlobalEventTable = NewEventTable(
config_obj, responder, table)
GlobalEventTable = NewEventTable(config_obj, responder, table)

return GlobalEventTable, nil
return GlobalEventTable, nil, true /* changed */
}

func NewEventTable(
Expand All @@ -119,9 +164,24 @@ func (self UpdateEventTable) Run(
arg *actions_proto.VQLEventTable) {

// Make a new table.
table, err := update(config_obj, responder, arg)
table, err, changed := update(config_obj, responder, arg)
if err != nil {
responder.Log(ctx, "Error updating global event table: %v", err)
responder.RaiseError(ctx, fmt.Sprintf(
"Error updating global event table: %v", err))
return
}

// No change required, skip it.
if !changed {
// We still need to write the new version
err = update_writeback(config_obj, arg)
if err != nil {
responder.RaiseError(ctx, fmt.Sprintf(
"Unable to write events to writeback: %v", err))
} else {
responder.Return(ctx)
}
return
}

logger := logging.GetLogger(config_obj, &logging.ClientComponent)
Expand Down Expand Up @@ -178,15 +238,24 @@ func (self UpdateEventTable) Run(
}(event)
}

// Store the event table in the Writeback file.
config_copy := proto.Clone(config_obj).(*config_proto.Config)
event_copy := proto.Clone(arg).(*actions_proto.VQLEventTable)
config_copy.Writeback.EventQueries = event_copy
err = config.UpdateWriteback(config_copy)
err = update_writeback(config_obj, arg)
if err != nil {
responder.RaiseError(ctx, fmt.Sprintf(
"Unable to write events to writeback: %v", err))
return
}

responder.Return(ctx)
}

func update_writeback(
config_obj *config_proto.Config,
event_table *actions_proto.VQLEventTable) error {

// Store the event table in the Writeback file.
config_copy := proto.Clone(config_obj).(*config_proto.Config)
event_copy := proto.Clone(event_table).(*actions_proto.VQLEventTable)
config_copy.Writeback.EventQueries = event_copy

return config.UpdateWriteback(config_copy)
}
199 changes: 100 additions & 99 deletions actions/proto/vql.pb.go

Large diffs are not rendered by default.

6 changes: 2 additions & 4 deletions actions/proto/vql.proto
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,9 @@ message VQLEnv {
string value = 2;
}

// This is the most common type of message - it specifies a query to
// run on the endpoint.
message VQLCollectorArgs {
option (flow_metadata) = {
category: "Generic";
};

// If this is specified we run this query first and if it returns
// any rows we continue with the real query.
string precondition = 29;
Expand Down
2 changes: 1 addition & 1 deletion actions/vql.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ func (self VQLClientAction) StartQuery(
)
}
response.Columns = result.Columns
responder.AddResponse(ctx, &crypto_proto.GrrMessage{
responder.AddResponse(ctx, &crypto_proto.VeloMessage{
VQLResponse: response})
}
}
Expand Down
26 changes: 13 additions & 13 deletions api/proto/api.pb.gw.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 029913e

Please sign in to comment.