Skip to content

Commit

Permalink
More multi-frontend optimizations (Velocidex#1393)
Browse files Browse the repository at this point in the history
* More multi-frontend optimizations

* Client info manager now also keeps track of LastHuntTimestamp and
  LastEventTableVersion for each client. This means we do not need to
  send an UpdateForeman message to the client since the server keeps
  track

* Client info manager now keeps track of the state of client
  queues. If no new messages are queued for the client, the frontend
  is able to skip making an IO operation on the data store.

* Replication service now writes the events to storage by itself. This
  ensures that minions are able to take on the writing work for event
  queries.

* Replication service now receives an update on all Watchers in the
  master, and avoids sending any events to the master than no watches
  are interested in. This reduces the number of gRPC calls between
  minion and master and therefore improves performance.
  • Loading branch information
scudette authored Nov 28, 2021
1 parent 44d2585 commit 637e71c
Show file tree
Hide file tree
Showing 52 changed files with 833 additions and 548 deletions.
63 changes: 48 additions & 15 deletions actions/proto/vql.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions actions/proto/vql.proto
Original file line number Diff line number Diff line change
Expand Up @@ -159,10 +159,14 @@ message ClientInfo {
string architecture = 7;
string ip_address = 10;
uint64 ping = 11;
string ping_time = 19;
string client_version = 12;
string client_name = 13;

repeated string labels = 15;

string last_interrogate_flow_id = 16;

uint64 last_hunt_timestamp = 17;
uint64 last_event_table_version = 18;
}
21 changes: 4 additions & 17 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,13 +218,7 @@ func (self *ApiServer) CollectArtifact(
}

flow_id, err := launcher.ScheduleArtifactCollection(
ctx, self.config, acl_manager, repository, in,
func() {
notifier := services.GetNotifier()
if notifier != nil {
notifier.NotifyListener(self.config, in.ClientId)
}
})
ctx, self.config, acl_manager, repository, in, nil)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -292,13 +286,10 @@ func (self *ApiServer) NotifyClients(
return nil, errors.New("Notifier not ready")
}

if in.NotifyAll {
self.server_obj.Info("sending notification to everyone")
err = notifier.NotifyAllListeners(self.config)

} else if in.ClientId != "" {
if in.ClientId != "" {
self.server_obj.Info("sending notification to %s", in.ClientId)
err = services.GetNotifier().NotifyListener(self.config, in.ClientId)
err = services.GetNotifier().NotifyListener(
self.config, in.ClientId, "API.NotifyClients")
} else {
return nil, status.Error(codes.InvalidArgument,
"client id should be specified")
Expand Down Expand Up @@ -841,10 +832,6 @@ func (self *ApiServer) SetClientMonitoringState(
return nil, err
}

_, err = self.NotifyClients(ctx, &api_proto.NotificationRequest{
NotifyAll: true,
})

return &empty.Empty{}, err
}

Expand Down
12 changes: 0 additions & 12 deletions api/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
config_proto "www.velocidex.com/golang/velociraptor/config/proto"
"www.velocidex.com/golang/velociraptor/logging"
"www.velocidex.com/golang/velociraptor/server"
"www.velocidex.com/golang/velociraptor/services"

_ "www.velocidex.com/golang/velociraptor/result_sets/timed"
)
Expand Down Expand Up @@ -426,13 +425,6 @@ func StartFrontendPlainHttp(
defer cancel()

server.SetKeepAlivesEnabled(false)
notifier := services.GetNotifier()
if notifier != nil {
err := notifier.NotifyAllListeners(config_obj)
if err != nil {
server_obj.Error("Frontend server error %v", err)
}
}
err := server.Shutdown(time_ctx)
if err != nil {
server_obj.Error("Frontend server error %v", err)
Expand Down Expand Up @@ -523,10 +515,6 @@ func StartFrontendWithAutocert(
defer cancel()

server.SetKeepAlivesEnabled(false)
notifier := services.GetNotifier()
if notifier != nil {
_ = notifier.NotifyAllListeners(config_obj)
}
err := server.Shutdown(timeout_ctx)
if err != nil {
logger.Error("Frontend shutdown error: %v", err)
Expand Down
4 changes: 3 additions & 1 deletion api/notebooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -853,13 +853,15 @@ func (self *ApiServer) CancelNotebookCell(
}

notebook_cell.Calculating = false
// Make sure we write the cancel message ASAP
err = db.SetSubject(self.config, notebook_cell_path_manager.Path(),
notebook_cell)
if err != nil {
return nil, err
}

return &empty.Empty{}, services.GetNotifier().NotifyListener(self.config, in.CellId)
return &empty.Empty{}, services.GetNotifier().NotifyListener(
self.config, in.CellId, "CancelNotebookCell")
}

func (self *ApiServer) UploadNotebookAttachment(
Expand Down
28 changes: 14 additions & 14 deletions api/proto/api.pb.gw.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 32 additions & 0 deletions api/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ import (
"crypto/x509"
"fmt"

"github.com/Velocidex/ordereddict"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/sirupsen/logrus"
context "golang.org/x/net/context"
"google.golang.org/grpc/codes"
Expand All @@ -19,6 +22,17 @@ import (
"www.velocidex.com/golang/velociraptor/services"
)

var (
replicationReceiveHistorgram = promauto.NewHistogramVec(
prometheus.HistogramOpts{
Name: "replication_master_send_latency",
Help: "Latency for the master to send replication messages to the minion.",
Buckets: prometheus.LinearBuckets(0.1, 1, 10),
},
[]string{"status"},
)
)

func streamEvents(
ctx context.Context,
config_obj *config_proto.Config,
Expand All @@ -37,6 +51,16 @@ func streamEvents(
return err
}

// Special case this so the caller can immediately initialize the
// watchers.
if in.Queue == "Server.Internal.MasterRegistrations" {
result := ordereddict.NewDict().Set("Events", journal.GetWatchers())
serialized, _ := result.MarshalJSON()
stream.Send(&api_proto.EventResponse{
Jsonl: serialized,
})
}

// The API service is running on the master only! This means
// the journal service is local.
output_chan, cancel := journal.Watch(ctx, in.Queue)
Expand All @@ -50,7 +74,15 @@ func streamEvents(
response := &api_proto.EventResponse{
Jsonl: serialized,
}

timer := prometheus.NewTimer(
prometheus.ObserverFunc(func(v float64) {
replicationReceiveHistorgram.WithLabelValues("").Observe(v)
}))

err = stream.Send(response)
timer.ObserveDuration()

if err != nil {
continue
}
Expand Down
7 changes: 7 additions & 0 deletions artifacts/definitions/Server/Internal/ClientTasks.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
name: Server.Internal.ClientTasks
description: |
This event will be fired when a client has new tasks scheduled.
type: INTERNAL
column_types:
- name: ClientId
Loading

0 comments on commit 637e71c

Please sign in to comment.