From 637e71c5be157da967a37077db8149fd2aa351b7 Mon Sep 17 00:00:00 2001 From: Mike Cohen Date: Mon, 29 Nov 2021 02:10:38 +1000 Subject: [PATCH] More multi-frontend optimizations (#1393) * More multi-frontend optimizations * Client info manager now also keeps track of LastHuntTimestamp and LastEventTableVersion for each client. This means we do not need to send an UpdateForeman message to the client since the server keeps track * Client info manager now keeps track of the state of client queues. If no new messages are queued for the client, the frontend is able to skip making an IO operation on the data store. * Replication service now writes the events to storage by itself. This ensures that minions are able to take on the writing work for event queries. * Replication service now receives an update on all Watchers in the master, and avoids sending any events to the master than no watches are interested in. This reduces the number of gRPC calls between minion and master and therefore improves performance. --- actions/proto/vql.pb.go | 63 ++++-- actions/proto/vql.proto | 4 + api/api.go | 21 +- api/builder.go | 12 -- api/notebooks.go | 4 +- api/proto/api.pb.gw.go | 28 +-- api/replication.go | 32 ++++ .../Server/Internal/ClientTasks.yaml | 7 + .../Server/Internal/MasterRegistrations.yaml | 9 + config/proto/config.pb.go | 7 +- file_store/api/queues.go | 1 + file_store/directory/queue.go | 16 ++ file_store/memory/queue.go | 16 ++ file_store/queue.go | 2 +- file_store/test_utils/testsuite.go | 6 + flows/api.go | 7 +- flows/artifacts.go | 42 ++-- flows/{foreman.go => housekeeping.go} | 100 ++++------ flows/limits.go | 10 +- flows/monitoring.go | 10 + notifications/notifications.go | 49 ----- reporting/acls_test.go | 19 +- search/index_test.go | 2 + server/comms.go | 40 ++-- server/metrics.go | 32 ++++ server/server.go | 7 +- server/server_test.go | 55 +++--- services/client_info.go | 20 +- services/client_info/client_info.go | 180 +++++++++--------- services/client_info/client_info_test.go | 9 +- services/client_info/tasks.go | 82 +++++++- services/client_info/tasks_test.go | 10 +- services/hunt_dispatcher/hunt_dispatcher.go | 25 +-- .../hunt_dispatcher/hunt_dispatcher_test.go | 12 +- services/hunt_manager/hunt_manager.go | 11 +- services/interrogation/interrogation.go | 3 +- services/inventory/inventory_test.go | 2 + services/journal.go | 2 + services/journal/journal.go | 26 ++- services/journal/replication.go | 153 ++++++++++++++- services/journal/replication_test.go | 21 ++ services/launcher/launcher.go | 69 +++---- services/launcher/launcher_test.go | 2 + services/notifications.go | 16 +- services/notifications/notifications.go | 103 +++++----- services/sanity/sanity_test.go | 9 +- .../server_artifacts/server_artifacts_test.go | 2 +- vql/filesystem/zip_test.go | 2 + vql/parsers/sqlite_test.go | 2 + vql/server/artifacts.go | 3 +- vql/server/clients/delete.go | 3 +- vql/server/kill.go | 13 +- 52 files changed, 833 insertions(+), 548 deletions(-) create mode 100644 artifacts/definitions/Server/Internal/ClientTasks.yaml create mode 100644 artifacts/definitions/Server/Internal/MasterRegistrations.yaml rename flows/{foreman.go => housekeeping.go} (53%) create mode 100644 server/metrics.go diff --git a/actions/proto/vql.pb.go b/actions/proto/vql.pb.go index ccf63beced3..6e1345b073f 100644 --- a/actions/proto/vql.pb.go +++ b/actions/proto/vql.pb.go @@ -574,10 +574,13 @@ type ClientInfo struct { Architecture string `protobuf:"bytes,7,opt,name=architecture,proto3" json:"architecture,omitempty"` IpAddress string `protobuf:"bytes,10,opt,name=ip_address,json=ipAddress,proto3" json:"ip_address,omitempty"` Ping uint64 `protobuf:"varint,11,opt,name=ping,proto3" json:"ping,omitempty"` + PingTime string `protobuf:"bytes,19,opt,name=ping_time,json=pingTime,proto3" json:"ping_time,omitempty"` ClientVersion string `protobuf:"bytes,12,opt,name=client_version,json=clientVersion,proto3" json:"client_version,omitempty"` ClientName string `protobuf:"bytes,13,opt,name=client_name,json=clientName,proto3" json:"client_name,omitempty"` Labels []string `protobuf:"bytes,15,rep,name=labels,proto3" json:"labels,omitempty"` LastInterrogateFlowId string `protobuf:"bytes,16,opt,name=last_interrogate_flow_id,json=lastInterrogateFlowId,proto3" json:"last_interrogate_flow_id,omitempty"` + LastHuntTimestamp uint64 `protobuf:"varint,17,opt,name=last_hunt_timestamp,json=lastHuntTimestamp,proto3" json:"last_hunt_timestamp,omitempty"` + LastEventTableVersion uint64 `protobuf:"varint,18,opt,name=last_event_table_version,json=lastEventTableVersion,proto3" json:"last_event_table_version,omitempty"` } func (x *ClientInfo) Reset() { @@ -668,6 +671,13 @@ func (x *ClientInfo) GetPing() uint64 { return 0 } +func (x *ClientInfo) GetPingTime() string { + if x != nil { + return x.PingTime + } + return "" +} + func (x *ClientInfo) GetClientVersion() string { if x != nil { return x.ClientVersion @@ -696,6 +706,20 @@ func (x *ClientInfo) GetLastInterrogateFlowId() string { return "" } +func (x *ClientInfo) GetLastHuntTimestamp() uint64 { + if x != nil { + return x.LastHuntTimestamp + } + return 0 +} + +func (x *ClientInfo) GetLastEventTableVersion() uint64 { + if x != nil { + return x.LastEventTableVersion + } + return 0 +} + var File_vql_proto protoreflect.FileDescriptor var file_vql_proto_rawDesc = []byte{ @@ -865,7 +889,7 @@ var file_vql_proto_rawDesc = []byte{ 0xe2, 0xfc, 0xe3, 0xc4, 0x01, 0x22, 0x12, 0x20, 0x54, 0x68, 0x65, 0x20, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x20, 0x6f, 0x66, 0x20, 0x74, 0x68, 0x69, 0x73, 0x20, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x20, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x2e, 0x52, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, - 0x6e, 0x22, 0xfb, 0x02, 0x0a, 0x0a, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x49, 0x6e, 0x66, 0x6f, + 0x6e, 0x22, 0x81, 0x04, 0x0a, 0x0a, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x1b, 0x0a, 0x09, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x49, 0x64, 0x12, 0x1a, 0x0a, 0x08, 0x68, 0x6f, 0x73, 0x74, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, @@ -879,20 +903,29 @@ var file_vql_proto_rawDesc = []byte{ 0x75, 0x72, 0x65, 0x12, 0x1d, 0x0a, 0x0a, 0x69, 0x70, 0x5f, 0x61, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x18, 0x0a, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x69, 0x70, 0x41, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x12, 0x12, 0x0a, 0x04, 0x70, 0x69, 0x6e, 0x67, 0x18, 0x0b, 0x20, 0x01, 0x28, 0x04, - 0x52, 0x04, 0x70, 0x69, 0x6e, 0x67, 0x12, 0x25, 0x0a, 0x0e, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, - 0x5f, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x0c, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, - 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x1f, 0x0a, - 0x0b, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x0d, 0x20, 0x01, - 0x28, 0x09, 0x52, 0x0a, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x16, - 0x0a, 0x06, 0x6c, 0x61, 0x62, 0x65, 0x6c, 0x73, 0x18, 0x0f, 0x20, 0x03, 0x28, 0x09, 0x52, 0x06, - 0x6c, 0x61, 0x62, 0x65, 0x6c, 0x73, 0x12, 0x37, 0x0a, 0x18, 0x6c, 0x61, 0x73, 0x74, 0x5f, 0x69, - 0x6e, 0x74, 0x65, 0x72, 0x72, 0x6f, 0x67, 0x61, 0x74, 0x65, 0x5f, 0x66, 0x6c, 0x6f, 0x77, 0x5f, - 0x69, 0x64, 0x18, 0x10, 0x20, 0x01, 0x28, 0x09, 0x52, 0x15, 0x6c, 0x61, 0x73, 0x74, 0x49, 0x6e, - 0x74, 0x65, 0x72, 0x72, 0x6f, 0x67, 0x61, 0x74, 0x65, 0x46, 0x6c, 0x6f, 0x77, 0x49, 0x64, 0x42, - 0x35, 0x5a, 0x33, 0x77, 0x77, 0x77, 0x2e, 0x76, 0x65, 0x6c, 0x6f, 0x63, 0x69, 0x64, 0x65, 0x78, - 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x67, 0x6f, 0x6c, 0x61, 0x6e, 0x67, 0x2f, 0x76, 0x65, 0x6c, 0x6f, - 0x63, 0x69, 0x72, 0x61, 0x70, 0x74, 0x6f, 0x72, 0x2f, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x73, - 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x52, 0x04, 0x70, 0x69, 0x6e, 0x67, 0x12, 0x1b, 0x0a, 0x09, 0x70, 0x69, 0x6e, 0x67, 0x5f, 0x74, + 0x69, 0x6d, 0x65, 0x18, 0x13, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x70, 0x69, 0x6e, 0x67, 0x54, + 0x69, 0x6d, 0x65, 0x12, 0x25, 0x0a, 0x0e, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x5f, 0x76, 0x65, + 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x0c, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, 0x63, 0x6c, 0x69, + 0x65, 0x6e, 0x74, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x1f, 0x0a, 0x0b, 0x63, 0x6c, + 0x69, 0x65, 0x6e, 0x74, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x0d, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x0a, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x6c, + 0x61, 0x62, 0x65, 0x6c, 0x73, 0x18, 0x0f, 0x20, 0x03, 0x28, 0x09, 0x52, 0x06, 0x6c, 0x61, 0x62, + 0x65, 0x6c, 0x73, 0x12, 0x37, 0x0a, 0x18, 0x6c, 0x61, 0x73, 0x74, 0x5f, 0x69, 0x6e, 0x74, 0x65, + 0x72, 0x72, 0x6f, 0x67, 0x61, 0x74, 0x65, 0x5f, 0x66, 0x6c, 0x6f, 0x77, 0x5f, 0x69, 0x64, 0x18, + 0x10, 0x20, 0x01, 0x28, 0x09, 0x52, 0x15, 0x6c, 0x61, 0x73, 0x74, 0x49, 0x6e, 0x74, 0x65, 0x72, + 0x72, 0x6f, 0x67, 0x61, 0x74, 0x65, 0x46, 0x6c, 0x6f, 0x77, 0x49, 0x64, 0x12, 0x2e, 0x0a, 0x13, + 0x6c, 0x61, 0x73, 0x74, 0x5f, 0x68, 0x75, 0x6e, 0x74, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, + 0x61, 0x6d, 0x70, 0x18, 0x11, 0x20, 0x01, 0x28, 0x04, 0x52, 0x11, 0x6c, 0x61, 0x73, 0x74, 0x48, + 0x75, 0x6e, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x12, 0x37, 0x0a, 0x18, + 0x6c, 0x61, 0x73, 0x74, 0x5f, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x5f, 0x74, 0x61, 0x62, 0x6c, 0x65, + 0x5f, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x12, 0x20, 0x01, 0x28, 0x04, 0x52, 0x15, + 0x6c, 0x61, 0x73, 0x74, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x54, 0x61, 0x62, 0x6c, 0x65, 0x56, 0x65, + 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x42, 0x35, 0x5a, 0x33, 0x77, 0x77, 0x77, 0x2e, 0x76, 0x65, 0x6c, + 0x6f, 0x63, 0x69, 0x64, 0x65, 0x78, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x67, 0x6f, 0x6c, 0x61, 0x6e, + 0x67, 0x2f, 0x76, 0x65, 0x6c, 0x6f, 0x63, 0x69, 0x72, 0x61, 0x70, 0x74, 0x6f, 0x72, 0x2f, 0x61, + 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/actions/proto/vql.proto b/actions/proto/vql.proto index 3ed9d3d3aa2..4368ec43966 100644 --- a/actions/proto/vql.proto +++ b/actions/proto/vql.proto @@ -159,10 +159,14 @@ message ClientInfo { string architecture = 7; string ip_address = 10; uint64 ping = 11; + string ping_time = 19; string client_version = 12; string client_name = 13; repeated string labels = 15; string last_interrogate_flow_id = 16; + + uint64 last_hunt_timestamp = 17; + uint64 last_event_table_version = 18; } \ No newline at end of file diff --git a/api/api.go b/api/api.go index b7925b3c89d..e6b1459d845 100644 --- a/api/api.go +++ b/api/api.go @@ -218,13 +218,7 @@ func (self *ApiServer) CollectArtifact( } flow_id, err := launcher.ScheduleArtifactCollection( - ctx, self.config, acl_manager, repository, in, - func() { - notifier := services.GetNotifier() - if notifier != nil { - notifier.NotifyListener(self.config, in.ClientId) - } - }) + ctx, self.config, acl_manager, repository, in, nil) if err != nil { return nil, err } @@ -292,13 +286,10 @@ func (self *ApiServer) NotifyClients( return nil, errors.New("Notifier not ready") } - if in.NotifyAll { - self.server_obj.Info("sending notification to everyone") - err = notifier.NotifyAllListeners(self.config) - - } else if in.ClientId != "" { + if in.ClientId != "" { self.server_obj.Info("sending notification to %s", in.ClientId) - err = services.GetNotifier().NotifyListener(self.config, in.ClientId) + err = services.GetNotifier().NotifyListener( + self.config, in.ClientId, "API.NotifyClients") } else { return nil, status.Error(codes.InvalidArgument, "client id should be specified") @@ -841,10 +832,6 @@ func (self *ApiServer) SetClientMonitoringState( return nil, err } - _, err = self.NotifyClients(ctx, &api_proto.NotificationRequest{ - NotifyAll: true, - }) - return &empty.Empty{}, err } diff --git a/api/builder.go b/api/builder.go index 624ac8f7637..8297e273d1a 100644 --- a/api/builder.go +++ b/api/builder.go @@ -14,7 +14,6 @@ import ( config_proto "www.velocidex.com/golang/velociraptor/config/proto" "www.velocidex.com/golang/velociraptor/logging" "www.velocidex.com/golang/velociraptor/server" - "www.velocidex.com/golang/velociraptor/services" _ "www.velocidex.com/golang/velociraptor/result_sets/timed" ) @@ -426,13 +425,6 @@ func StartFrontendPlainHttp( defer cancel() server.SetKeepAlivesEnabled(false) - notifier := services.GetNotifier() - if notifier != nil { - err := notifier.NotifyAllListeners(config_obj) - if err != nil { - server_obj.Error("Frontend server error %v", err) - } - } err := server.Shutdown(time_ctx) if err != nil { server_obj.Error("Frontend server error %v", err) @@ -523,10 +515,6 @@ func StartFrontendWithAutocert( defer cancel() server.SetKeepAlivesEnabled(false) - notifier := services.GetNotifier() - if notifier != nil { - _ = notifier.NotifyAllListeners(config_obj) - } err := server.Shutdown(timeout_ctx) if err != nil { logger.Error("Frontend shutdown error: %v", err) diff --git a/api/notebooks.go b/api/notebooks.go index 232549e3853..bcc26a8a41d 100644 --- a/api/notebooks.go +++ b/api/notebooks.go @@ -853,13 +853,15 @@ func (self *ApiServer) CancelNotebookCell( } notebook_cell.Calculating = false + // Make sure we write the cancel message ASAP err = db.SetSubject(self.config, notebook_cell_path_manager.Path(), notebook_cell) if err != nil { return nil, err } - return &empty.Empty{}, services.GetNotifier().NotifyListener(self.config, in.CellId) + return &empty.Empty{}, services.GetNotifier().NotifyListener( + self.config, in.CellId, "CancelNotebookCell") } func (self *ApiServer) UploadNotebookAttachment( diff --git a/api/proto/api.pb.gw.go b/api/proto/api.pb.gw.go index 746ecf7c330..5aaa06f2679 100644 --- a/api/proto/api.pb.gw.go +++ b/api/proto/api.pb.gw.go @@ -22,8 +22,8 @@ import ( "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" "google.golang.org/protobuf/proto" - proto_1 "www.velocidex.com/golang/velociraptor/artifacts/proto" - proto_3 "www.velocidex.com/golang/velociraptor/flows/proto" + proto_2 "www.velocidex.com/golang/velociraptor/artifacts/proto" + proto_0 "www.velocidex.com/golang/velociraptor/flows/proto" ) // Suppress "imported and not used" errors @@ -983,7 +983,7 @@ func local_request_API_GetTable_0(ctx context.Context, marshaler runtime.Marshal } func request_API_CollectArtifact_0(ctx context.Context, marshaler runtime.Marshaler, client APIClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { - var protoReq proto_3.ArtifactCollectorArgs + var protoReq proto_0.ArtifactCollectorArgs var metadata runtime.ServerMetadata newReader, berr := utilities.IOReaderFactory(req.Body) @@ -1000,7 +1000,7 @@ func request_API_CollectArtifact_0(ctx context.Context, marshaler runtime.Marsha } func local_request_API_CollectArtifact_0(ctx context.Context, marshaler runtime.Marshaler, server APIServer, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { - var protoReq proto_3.ArtifactCollectorArgs + var protoReq proto_0.ArtifactCollectorArgs var metadata runtime.ServerMetadata newReader, berr := utilities.IOReaderFactory(req.Body) @@ -1317,7 +1317,7 @@ var ( ) func request_API_GetToolInfo_0(ctx context.Context, marshaler runtime.Marshaler, client APIClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { - var protoReq proto_1.Tool + var protoReq proto_2.Tool var metadata runtime.ServerMetadata if err := req.ParseForm(); err != nil { @@ -1333,7 +1333,7 @@ func request_API_GetToolInfo_0(ctx context.Context, marshaler runtime.Marshaler, } func local_request_API_GetToolInfo_0(ctx context.Context, marshaler runtime.Marshaler, server APIServer, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { - var protoReq proto_1.Tool + var protoReq proto_2.Tool var metadata runtime.ServerMetadata if err := req.ParseForm(); err != nil { @@ -1349,7 +1349,7 @@ func local_request_API_GetToolInfo_0(ctx context.Context, marshaler runtime.Mars } func request_API_SetToolInfo_0(ctx context.Context, marshaler runtime.Marshaler, client APIClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { - var protoReq proto_1.Tool + var protoReq proto_2.Tool var metadata runtime.ServerMetadata newReader, berr := utilities.IOReaderFactory(req.Body) @@ -1366,7 +1366,7 @@ func request_API_SetToolInfo_0(ctx context.Context, marshaler runtime.Marshaler, } func local_request_API_SetToolInfo_0(ctx context.Context, marshaler runtime.Marshaler, server APIServer, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { - var protoReq proto_1.Tool + var protoReq proto_2.Tool var metadata runtime.ServerMetadata newReader, berr := utilities.IOReaderFactory(req.Body) @@ -1435,7 +1435,7 @@ func local_request_API_GetServerMonitoringState_0(ctx context.Context, marshaler } func request_API_SetServerMonitoringState_0(ctx context.Context, marshaler runtime.Marshaler, client APIClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { - var protoReq proto_3.ArtifactCollectorArgs + var protoReq proto_0.ArtifactCollectorArgs var metadata runtime.ServerMetadata newReader, berr := utilities.IOReaderFactory(req.Body) @@ -1452,7 +1452,7 @@ func request_API_SetServerMonitoringState_0(ctx context.Context, marshaler runti } func local_request_API_SetServerMonitoringState_0(ctx context.Context, marshaler runtime.Marshaler, server APIServer, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { - var protoReq proto_3.ArtifactCollectorArgs + var protoReq proto_0.ArtifactCollectorArgs var metadata runtime.ServerMetadata newReader, berr := utilities.IOReaderFactory(req.Body) @@ -1473,7 +1473,7 @@ var ( ) func request_API_GetClientMonitoringState_0(ctx context.Context, marshaler runtime.Marshaler, client APIClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { - var protoReq proto_3.GetClientMonitoringStateRequest + var protoReq proto_0.GetClientMonitoringStateRequest var metadata runtime.ServerMetadata if err := req.ParseForm(); err != nil { @@ -1489,7 +1489,7 @@ func request_API_GetClientMonitoringState_0(ctx context.Context, marshaler runti } func local_request_API_GetClientMonitoringState_0(ctx context.Context, marshaler runtime.Marshaler, server APIServer, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { - var protoReq proto_3.GetClientMonitoringStateRequest + var protoReq proto_0.GetClientMonitoringStateRequest var metadata runtime.ServerMetadata if err := req.ParseForm(); err != nil { @@ -1505,7 +1505,7 @@ func local_request_API_GetClientMonitoringState_0(ctx context.Context, marshaler } func request_API_SetClientMonitoringState_0(ctx context.Context, marshaler runtime.Marshaler, client APIClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { - var protoReq proto_3.ClientEventTable + var protoReq proto_0.ClientEventTable var metadata runtime.ServerMetadata newReader, berr := utilities.IOReaderFactory(req.Body) @@ -1522,7 +1522,7 @@ func request_API_SetClientMonitoringState_0(ctx context.Context, marshaler runti } func local_request_API_SetClientMonitoringState_0(ctx context.Context, marshaler runtime.Marshaler, server APIServer, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { - var protoReq proto_3.ClientEventTable + var protoReq proto_0.ClientEventTable var metadata runtime.ServerMetadata newReader, berr := utilities.IOReaderFactory(req.Body) diff --git a/api/replication.go b/api/replication.go index 29f180ed477..aa1e56c7b4b 100644 --- a/api/replication.go +++ b/api/replication.go @@ -4,6 +4,9 @@ import ( "crypto/x509" "fmt" + "github.com/Velocidex/ordereddict" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "github.com/sirupsen/logrus" context "golang.org/x/net/context" "google.golang.org/grpc/codes" @@ -19,6 +22,17 @@ import ( "www.velocidex.com/golang/velociraptor/services" ) +var ( + replicationReceiveHistorgram = promauto.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "replication_master_send_latency", + Help: "Latency for the master to send replication messages to the minion.", + Buckets: prometheus.LinearBuckets(0.1, 1, 10), + }, + []string{"status"}, + ) +) + func streamEvents( ctx context.Context, config_obj *config_proto.Config, @@ -37,6 +51,16 @@ func streamEvents( return err } + // Special case this so the caller can immediately initialize the + // watchers. + if in.Queue == "Server.Internal.MasterRegistrations" { + result := ordereddict.NewDict().Set("Events", journal.GetWatchers()) + serialized, _ := result.MarshalJSON() + stream.Send(&api_proto.EventResponse{ + Jsonl: serialized, + }) + } + // The API service is running on the master only! This means // the journal service is local. output_chan, cancel := journal.Watch(ctx, in.Queue) @@ -50,7 +74,15 @@ func streamEvents( response := &api_proto.EventResponse{ Jsonl: serialized, } + + timer := prometheus.NewTimer( + prometheus.ObserverFunc(func(v float64) { + replicationReceiveHistorgram.WithLabelValues("").Observe(v) + })) + err = stream.Send(response) + timer.ObserveDuration() + if err != nil { continue } diff --git a/artifacts/definitions/Server/Internal/ClientTasks.yaml b/artifacts/definitions/Server/Internal/ClientTasks.yaml new file mode 100644 index 00000000000..117aa6e728b --- /dev/null +++ b/artifacts/definitions/Server/Internal/ClientTasks.yaml @@ -0,0 +1,7 @@ +name: Server.Internal.ClientTasks +description: | + This event will be fired when a client has new tasks scheduled. + +type: INTERNAL +column_types: + - name: ClientId diff --git a/artifacts/definitions/Server/Internal/MasterRegistrations.yaml b/artifacts/definitions/Server/Internal/MasterRegistrations.yaml new file mode 100644 index 00000000000..20b0846bcb2 --- /dev/null +++ b/artifacts/definitions/Server/Internal/MasterRegistrations.yaml @@ -0,0 +1,9 @@ +name: Server.Internal.MasterRegistrations +description: | + The master will advertise to the minions the events it is interested + in. + +type: INTERNAL +column_types: + - name: Events + type: json_array diff --git a/config/proto/config.pb.go b/config/proto/config.pb.go index c13b8ea19f8..b68efda9816 100644 --- a/config/proto/config.pb.go +++ b/config/proto/config.pb.go @@ -2644,9 +2644,10 @@ type Config struct { // A list of possible frontends to use. When deployed in // multi-frontend configuration we select on those and populate // the Frontend field above. - ExtraFrontends []*FrontendConfig `protobuf:"bytes,31,rep,name=ExtraFrontends,proto3" json:"ExtraFrontends,omitempty"` - Datastore *DatastoreConfig `protobuf:"bytes,6,opt,name=Datastore,proto3" json:"Datastore,omitempty"` - Writeback *Writeback `protobuf:"bytes,9,opt,name=Writeback,proto3" json:"Writeback,omitempty"` + ExtraFrontends []*FrontendConfig `protobuf:"bytes,31,rep,name=ExtraFrontends,proto3" json:"ExtraFrontends,omitempty"` + Datastore *DatastoreConfig `protobuf:"bytes,6,opt,name=Datastore,proto3" json:"Datastore,omitempty"` + Writeback *Writeback `protobuf:"bytes,9,opt,name=Writeback,proto3" json:"Writeback,omitempty"` + // Deprecated - Mail plugin setting are now provided by args. Mail *MailConfig `protobuf:"bytes,11,opt,name=Mail,proto3" json:"Mail,omitempty"` Logging *LoggingConfig `protobuf:"bytes,23,opt,name=Logging,proto3" json:"Logging,omitempty"` Verbose bool `protobuf:"varint,20,opt,name=verbose,proto3" json:"verbose,omitempty"` diff --git a/file_store/api/queues.go b/file_store/api/queues.go index 1bacd339e33..4420668554e 100644 --- a/file_store/api/queues.go +++ b/file_store/api/queues.go @@ -13,6 +13,7 @@ type QueueManager interface { // Broadcast events only for local listeners without writing to // storage. Broadcast(path_manager PathManager, rows []*ordereddict.Dict) + GetWatchers() []string PushEventRows(path_manager PathManager, rows []*ordereddict.Dict) error Watch(ctx context.Context, queue_name string) ( diff --git a/file_store/directory/queue.go b/file_store/directory/queue.go index c39b02da8e0..91ec3945e3e 100644 --- a/file_store/directory/queue.go +++ b/file_store/directory/queue.go @@ -51,6 +51,18 @@ type QueuePool struct { registrations map[string][]*Listener } +func (self *QueuePool) GetWatchers() []string { + self.mu.Lock() + defer self.mu.Unlock() + + result := make([]string, 0, len(self.registrations)) + for name := range self.registrations { + result = append(result, name) + } + + return result +} + func (self *QueuePool) Register( ctx context.Context, vfs_path string, options QueueOptions) (<-chan *ordereddict.Dict, func()) { @@ -191,6 +203,10 @@ func (self *DirectoryQueueManager) PushEventRows( return nil } +func (self *DirectoryQueueManager) GetWatchers() []string { + return self.queue_pool.GetWatchers() +} + func (self *DirectoryQueueManager) Watch(ctx context.Context, queue_name string) (<-chan *ordereddict.Dict, func()) { diff --git a/file_store/memory/queue.go b/file_store/memory/queue.go index a9d3bb5b690..79c574090c2 100644 --- a/file_store/memory/queue.go +++ b/file_store/memory/queue.go @@ -53,6 +53,18 @@ type QueuePool struct { registrations map[string][]*Listener } +func (self *QueuePool) GetWatchers() []string { + self.mu.Lock() + defer self.mu.Unlock() + + result := make([]string, 0, len(self.registrations)) + for name := range self.registrations { + result = append(result, name) + } + + return result +} + func (self *QueuePool) Register(vfs_path string) (<-chan *ordereddict.Dict, func()) { self.mu.Lock() defer self.mu.Unlock() @@ -179,6 +191,10 @@ func (self *MemoryQueueManager) PushEventRows( return nil } +func (self *MemoryQueueManager) GetWatchers() []string { + return GlobalQueuePool(self.config_obj).GetWatchers() +} + func (self *MemoryQueueManager) Watch( ctx context.Context, queue_name string) (output <-chan *ordereddict.Dict, cancel func()) { return GlobalQueuePool(self.config_obj).Register(queue_name) diff --git a/file_store/queue.go b/file_store/queue.go index a7ce6d58525..9457b8ba9a3 100644 --- a/file_store/queue.go +++ b/file_store/queue.go @@ -25,7 +25,7 @@ func GetQueueManager(config_obj *config_proto.Config) (api.QueueManager, error) case "Test": return memory.NewMemoryQueueManager(config_obj, file_store), nil - case "FileBaseDataStore", "MemcacheFileDataStore": + case "FileBaseDataStore", "MemcacheFileDataStore", "RemoteFileDataStore": return directory.NewDirectoryQueueManager(config_obj, file_store), nil default: diff --git a/file_store/test_utils/testsuite.go b/file_store/test_utils/testsuite.go index a596daa674b..932c07903f2 100644 --- a/file_store/test_utils/testsuite.go +++ b/file_store/test_utils/testsuite.go @@ -67,6 +67,12 @@ type: CLIENT_EVENT name: System.Hunt.Participation type: INTERNAL `, ` +name: Server.Internal.MasterRegistrations +type: INTERNAL +`, ` +name: Server.Internal.ClientTasks +type: INTERNAL +`, ` name: Generic.Client.Info type: CLIENT sources: diff --git a/flows/api.go b/flows/api.go index 770b6546194..426e3f58385 100644 --- a/flows/api.go +++ b/flows/api.go @@ -259,12 +259,7 @@ func CancelFlow( &crypto_proto.VeloMessage{ Cancel: &crypto_proto.Cancel{}, SessionId: flow_id, - }, func() { - notifier := services.GetNotifier() - if notifier != nil { - notifier.NotifyListener(config_obj, client_id) - } - }) + }, true /* notify */, nil) if err != nil { return nil, err } diff --git a/flows/artifacts.go b/flows/artifacts.go index 40ecb3bb72c..2e434d1966f 100644 --- a/flows/artifacts.go +++ b/flows/artifacts.go @@ -87,15 +87,6 @@ type CollectionContext struct { send_update bool } -func (self *CollectionContext) batchRows(flow_id string, rows []*ordereddict.Dict) { - batch, _ := self.monitoring_batch[flow_id] - batch = append(batch, rows...) - self.monitoring_batch[flow_id] = batch - if len(rows) > 0 { - self.Dirty = true - } -} - func NewCollectionContext(config_obj *config_proto.Config) *CollectionContext { self := &CollectionContext{ ArtifactCollectorContext: flows_proto.ArtifactCollectorContext{}, @@ -726,20 +717,13 @@ func (self *FlowRunner) ProcessSingleMessage( ctx context.Context, job *crypto_proto.VeloMessage) { - // json.TraceMessage(job.Source+"_job", job) - - // Foreman messages are related to hunts. - if job.ForemanCheckin != nil { - err := ForemanProcessMessage( - ctx, self.config_obj, - job.Source, job.ForemanCheckin) - if err != nil { - logger := logging.GetLogger(self.config_obj, &logging.FrontendComponent) - logger.Error("ForemanCheckin for client %v: %v", job.Source, err) - } + // Only process real flows. + if !strings.HasPrefix(job.SessionId, "F.") { return } + // json.TraceMessage(job.Source+"_job", job) + // CSR messages are related to enrolment. By the time the // message arrives here, it is authenticated and the client is // fully enrolled so it serves no purpose here - Just ignore it. @@ -757,12 +741,6 @@ func (self *FlowRunner) ProcessSingleMessage( return } - // Only process real flows. - if !strings.HasPrefix(job.SessionId, "F.") { - logger.Error("Invalid job SessionId %v", job.SessionId) - return - } - collection_context, err = LoadCollectionContext( self.config_obj, job.Source, job.SessionId) if err != nil { @@ -784,7 +762,8 @@ func (self *FlowRunner) ProcessSingleMessage( &crypto_proto.VeloMessage{ Cancel: &crypto_proto.Cancel{}, SessionId: job.SessionId, - }, nil) + }, + true /* notify */, nil) if err != nil { logger.Error("Queueing for client %v: %v", job.Source, err) @@ -821,10 +800,17 @@ func (self *FlowRunner) ProcessSingleMessage( } func (self *FlowRunner) ProcessMessages(ctx context.Context, - message_info *crypto.MessageInfo) (err error) { + message_info *crypto.MessageInfo) error { self.mu.Lock() defer self.mu.Unlock() + // Do some housekeeping with the client + err := CheckClientStatus(ctx, self.config_obj, message_info.Source) + if err != nil { + logger := logging.GetLogger(self.config_obj, &logging.FrontendComponent) + logger.Error("ForemanCheckin for client %v: %v", message_info.Source, err) + } + return message_info.IterateJobs(ctx, self.ProcessSingleMessage) } diff --git a/flows/foreman.go b/flows/housekeeping.go similarity index 53% rename from flows/foreman.go rename to flows/housekeeping.go index 997b032baab..c146b21bb5b 100644 --- a/flows/foreman.go +++ b/flows/housekeeping.go @@ -15,23 +15,6 @@ You should have received a copy of the GNU Affero General Public License along with this program. If not, see . */ -// The foreman is a Well Known Flow for clients to check for hunt -// memberships. Each client periodically informs the foreman of the -// most recent hunt it executed, and the foreman launches the relevant -// flow on the client. - -// The process goes like this: - -// 1. The client sends a message to the foreman periodically with the -// timestamp of the most recent hunt it ran (as well latest event -// table version). - -// 2. If a newer hunt exists, the foreman sends the hunt_condition -// query to the client with the response directed to the -// System.Hunt.Participation artifact monitoring queue. - -// 3. The hunt manager service scans the System.Hunt.Participation -// monitoring queue and launches the relevant flows on each client. package flows @@ -42,11 +25,8 @@ import ( errors "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" - actions_proto "www.velocidex.com/golang/velociraptor/actions/proto" api_proto "www.velocidex.com/golang/velociraptor/api/proto" config_proto "www.velocidex.com/golang/velociraptor/config/proto" - constants "www.velocidex.com/golang/velociraptor/constants" - crypto_proto "www.velocidex.com/golang/velociraptor/crypto/proto" "www.velocidex.com/golang/velociraptor/services" ) @@ -55,56 +35,61 @@ var ( Name: "client_event_update", Help: "Total number of client Event Table Update messages sent.", }) - - clientHuntTimestampUpdateCounter = promauto.NewCounter(prometheus.CounterOpts{ - Name: "client_hunt_timestamp_update", - Help: "Total number of client Update Hunt Timestamp messages sent.", - }) ) -// ForemanProcessMessage processes a ForemanCheckin message from the -// client. -func ForemanProcessMessage( +func CheckClientStatus( ctx context.Context, config_obj *config_proto.Config, - client_id string, - foreman_checkin *actions_proto.ForemanCheckin) error { + client_id string) error { - if foreman_checkin == nil { - return errors.New("Expected args of type ForemanCheckin") + client_manager, err := services.GetClientInfoManager() + if err != nil { + return err } - client_manager, err := services.GetClientInfoManager() + stats, err := client_manager.GetStats(client_id) if err != nil { return err } - // Update the client's event tables. + // Check the client's event table for validity. client_event_manager := services.ClientEventManager() if client_event_manager != nil && client_event_manager.CheckClientEventsVersion( - config_obj, client_id, - foreman_checkin.LastEventTableVersion) { + config_obj, client_id, stats.LastEventTableVersion) { + + update_message := client_event_manager.GetClientUpdateEventTableMessage( + config_obj, client_id) + + if update_message.UpdateEventTable == nil { + return errors.New("Invalid event update") + } + + // Inform the client manager that this client will now receive + // the latest event table. + client_manager.UpdateStats(client_id, func(s *services.Stats) { + s.LastEventTableVersion = update_message.UpdateEventTable.Version + }) + clientEventUpdateCounter.Inc() - err := client_manager.QueueMessageForClient(client_id, - client_event_manager.GetClientUpdateEventTableMessage( - config_obj, client_id), nil) + err := client_manager.QueueMessageForClient( + client_id, update_message, true, nil) if err != nil { return err } } + // Check the client's hunt status // Process any needed hunts. dispatcher := services.GetHuntDispatcher() if dispatcher == nil { return nil } - client_last_timestamp := foreman_checkin.LastHuntTimestamp // Can we get away without a lock? If the client is already up // to date we dont need to look further. hunts_last_timestamp := dispatcher.GetLastTimestamp() - if client_last_timestamp >= hunts_last_timestamp { + if stats.LastHuntTimestamp >= hunts_last_timestamp { return nil } @@ -118,7 +103,7 @@ func ForemanProcessMessage( } // This hunt is not relevant to this client. - if hunt.StartTime <= client_last_timestamp { + if hunt.StartTime <= stats.LastHuntTimestamp { return nil } @@ -131,12 +116,16 @@ func ForemanProcessMessage( return nil }) - // Nothing to do, return - if len(hunts) == 0 { + if err != nil { return err } - // Now schedule the client for all the hunts that it needs to run. + // This will now only contains hunts launched since the last hunt + // timestamp. If it is empty there is nothing to do, return + if len(hunts) == 0 { + return nil + } + journal, err := services.GetJournal() if err != nil { return err @@ -157,21 +146,8 @@ func ForemanProcessMessage( } } - // Let the client know it needs to update its foreman state to - // the latest time. We schedule an UpdateForeman message for - // the client. Note that it is possible that the client does - // not update its timestamp immediately and therefore might - // end up sending multiple participation events to the hunt - // manager - this is ok since the hunt manager keeps hunt - // participation index and will automatically skip multiple - // messages. - clientHuntTimestampUpdateCounter.Inc() - return client_manager.QueueMessageForClient(client_id, - &crypto_proto.VeloMessage{ - SessionId: constants.MONITORING_WELL_KNOWN_FLOW, - RequestId: constants.IgnoreResponseState, - UpdateForeman: &actions_proto.ForemanCheckin{ - LastHuntTimestamp: latest_timestamp, - }, - }, nil) + client_manager.UpdateStats(client_id, func(s *services.Stats) { + s.LastHuntTimestamp = latest_timestamp + }) + return nil } diff --git a/flows/limits.go b/flows/limits.go index b8aaee974cc..d645811ca42 100644 --- a/flows/limits.go +++ b/flows/limits.go @@ -65,15 +65,9 @@ func cancelCollection(config_obj *config_proto.Config, client_id, flow_id string return err } - err = client_manager.QueueMessageForClient(client_id, + return client_manager.QueueMessageForClient(client_id, &crypto_proto.VeloMessage{ Cancel: &crypto_proto.Cancel{}, SessionId: flow_id, - }, nil) - if err != nil { - return err - } - - // Notify the client immediately. - return services.GetNotifier().NotifyListener(config_obj, client_id) + }, true /* notify */, nil) } diff --git a/flows/monitoring.go b/flows/monitoring.go index efe18d30f61..48915e27c2f 100644 --- a/flows/monitoring.go +++ b/flows/monitoring.go @@ -121,6 +121,16 @@ func flushContextLogsMonitoring( return nil } +func (self *CollectionContext) batchRows( + artifact_name string, rows []*ordereddict.Dict) { + batch, _ := self.monitoring_batch[artifact_name] + batch = append(batch, rows...) + self.monitoring_batch[artifact_name] = batch + if len(rows) > 0 { + self.Dirty = true + } +} + func flushMonitoringLogs( config_obj *config_proto.Config, collection_context *CollectionContext) error { diff --git a/notifications/notifications.go b/notifications/notifications.go index cc602282401..911b936a250 100644 --- a/notifications/notifications.go +++ b/notifications/notifications.go @@ -1,14 +1,10 @@ package notifications import ( - "context" - "regexp" "sync" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" - "golang.org/x/time/rate" - config_proto "www.velocidex.com/golang/velociraptor/config/proto" ) var ( @@ -94,47 +90,6 @@ func (self *NotificationPool) Notify(client_id string) { self.mu.Unlock() } -func (self *NotificationPool) NotifyByRegex( - config_obj *config_proto.Config, re *regexp.Regexp) { - - // First take a snapshot of the current clients connected. - self.mu.Lock() - snapshot := make([]string, 0, len(self.clients)) - for key := range self.clients { - if re.MatchString(key) { - snapshot = append(snapshot, key) - } - } - self.mu.Unlock() - - // Now notify all these clients in the background if - // possible. Take it slow so as not to overwhelm the server. - limiter_rate := rate.Limit(config_obj.Frontend.Resources.NotificationsPerSecond) - subctx, cancel := context.WithCancel(context.Background()) - limiter := rate.NewLimiter(limiter_rate, 1) - - go func() { - select { - case <-self.done: - cancel() - } - }() - - go func() { - for _, client_id := range snapshot { - self.mu.Lock() - c, pres := self.clients[client_id] - if pres { - notificationCounter.Inc() - close(c) - delete(self.clients, client_id) - } - self.mu.Unlock() - limiter.Wait(subctx) - } - }() -} - func (self *NotificationPool) Shutdown() { self.mu.Lock() defer self.mu.Unlock() @@ -149,7 +104,3 @@ func (self *NotificationPool) Shutdown() { self.clients = make(map[string]chan bool) } - -func (self *NotificationPool) NotifyAll(config_obj *config_proto.Config) { - self.NotifyByRegex(config_obj, regexp.MustCompile(".")) -} diff --git a/reporting/acls_test.go b/reporting/acls_test.go index 3b7a74cd39f..fdfe50d4d51 100644 --- a/reporting/acls_test.go +++ b/reporting/acls_test.go @@ -1,4 +1,4 @@ -package reporting +package reporting_test import ( "testing" @@ -9,6 +9,9 @@ import ( "www.velocidex.com/golang/velociraptor/datastore" "www.velocidex.com/golang/velociraptor/file_store/test_utils" "www.velocidex.com/golang/velociraptor/paths" + "www.velocidex.com/golang/velociraptor/reporting" + + _ "www.velocidex.com/golang/velociraptor/result_sets/timed" ) type ACLTestSuite struct { @@ -29,7 +32,7 @@ func (self *ACLTestSuite) TestNotebookPublicACL() { assert.NoError(self.T(), err) // Check that everyone has access - assert.True(self.T(), CheckNotebookAccess(new_notebook, "User1")) + assert.True(self.T(), reporting.CheckNotebookAccess(new_notebook, "User1")) // Make the notebook not public. new_notebook.Public = false @@ -38,30 +41,30 @@ func (self *ACLTestSuite) TestNotebookPublicACL() { assert.NoError(self.T(), err) // User1 lost access. - assert.False(self.T(), CheckNotebookAccess(new_notebook, "User1")) + assert.False(self.T(), reporting.CheckNotebookAccess(new_notebook, "User1")) // The creator always has access regardless - assert.True(self.T(), CheckNotebookAccess(new_notebook, "Creator")) + assert.True(self.T(), reporting.CheckNotebookAccess(new_notebook, "Creator")) // Explicitly share with User1 new_notebook.Collaborators = append(new_notebook.Collaborators, "User1") err = db.SetSubject(self.ConfigObj, notebook_path_manager.Path(), new_notebook) assert.NoError(self.T(), err) - err = UpdateShareIndex(self.ConfigObj, new_notebook) + err = reporting.UpdateShareIndex(self.ConfigObj, new_notebook) assert.NoError(self.T(), err) // User1 now has access - assert.True(self.T(), CheckNotebookAccess(new_notebook, "User1")) + assert.True(self.T(), reporting.CheckNotebookAccess(new_notebook, "User1")) // What notebooks does User1 have access to? - notebooks, err := GetSharedNotebooks(self.ConfigObj, "User1", 0, 100) + notebooks, err := reporting.GetSharedNotebooks(self.ConfigObj, "User1", 0, 100) assert.NoError(self.T(), err) assert.Equal(self.T(), 1, len(notebooks)) assert.Equal(self.T(), new_notebook.NotebookId, notebooks[0].NotebookId) // Check GetAllNotebooks without ACL checks - all_notebooks, err := GetAllNotebooks(self.ConfigObj) + all_notebooks, err := reporting.GetAllNotebooks(self.ConfigObj) assert.NoError(self.T(), err) assert.Equal(self.T(), 1, len(notebooks)) assert.Equal(self.T(), new_notebook.NotebookId, all_notebooks[0].NotebookId) diff --git a/search/index_test.go b/search/index_test.go index 6ab5726c50d..58309fa3d40 100644 --- a/search/index_test.go +++ b/search/index_test.go @@ -14,6 +14,8 @@ import ( "www.velocidex.com/golang/velociraptor/paths" "www.velocidex.com/golang/velociraptor/search" "www.velocidex.com/golang/velociraptor/services/indexing" + + _ "www.velocidex.com/golang/velociraptor/result_sets/timed" ) type TestSuite struct { diff --git a/server/comms.go b/server/comms.go index 8ed48989285..880015db87e 100644 --- a/server/comms.go +++ b/server/comms.go @@ -99,22 +99,20 @@ var ( Help: "Number of responses rejected due to concurrency timeouts.", }) - concurrencyHistorgram = promauto.NewHistogramVec( + concurrencyHistorgram = promauto.NewHistogram( prometheus.HistogramOpts{ Name: "frontend_receiver_latency", Help: "Latency to receive client data in second.", Buckets: prometheus.LinearBuckets(0.1, 1, 10), }, - []string{"status"}, ) - concurrencyWaitHistorgram = promauto.NewHistogramVec( + concurrencyWaitHistorgram = promauto.NewHistogram( prometheus.HistogramOpts{ Name: "frontend_concurrency_wait_latency", Help: "Latency for clients waiting to get a concurrency slot (excludes actual serving time).", Buckets: prometheus.LinearBuckets(0.1, 1, 10), }, - []string{"status"}, ) ) @@ -130,8 +128,8 @@ func PrepareFrontendMux( base := config_obj.Frontend.BasePath router.Handle(base+"/healthz", healthz(server_obj)) router.Handle(base+"/server.pem", server_pem(config_obj)) - router.Handle(base+"/control", control(server_obj)) - router.Handle(base+"/reader", reader(config_obj, server_obj)) + router.Handle(base+"/control", RecordHTTPStats(control(server_obj))) + router.Handle(base+"/reader", RecordHTTPStats(reader(config_obj, server_obj))) // Publicly accessible part of the filestore. NOTE: this // does not have to be a physical directory - it is served @@ -288,7 +286,7 @@ func control(server_obj *Server) http.Handler { // waiting for a concurrency slot. If this time is too // long it means concurrency may need to be increased. timer := prometheus.NewTimer(prometheus.ObserverFunc(func(v float64) { - concurrencyWaitHistorgram.WithLabelValues("").Observe(v) + concurrencyWaitHistorgram.Observe(v) })) cancel, err := server_obj.Concurrency().StartConcurrencyControl(ctx) @@ -306,9 +304,8 @@ func control(server_obj *Server) http.Handler { } // Measure the latency from this point on. - var status string timer := prometheus.NewTimer(prometheus.ObserverFunc(func(v float64) { - concurrencyHistorgram.WithLabelValues(status).Observe(v) + concurrencyHistorgram.Observe(v) })) defer func() { timer.ObserveDuration() @@ -363,7 +360,8 @@ func control(server_obj *Server) http.Handler { sync := make(chan []byte) go func() { - defer cancel() + defer close(sync) + response, _, err := server_obj.Process(ctx, message_info, false, // drain_requests_for_client ) @@ -390,8 +388,11 @@ func control(server_obj *Server) http.Handler { case <-ctx.Done(): return - case response := <-sync: - _, _ = w.Write(response) + case response, ok := <-sync: + if ok { + _, _ = w.Write(response) + } + return case <-time.After(3 * time.Second): _, _ = w.Write(serialized_pad) @@ -542,11 +543,16 @@ func reader(config_obj *config_proto.Config, server_obj *Server) http.Handler { return case <-deadline: - // Notify ourselves, this will trigger - // an empty response to be written and - // the connection to be terminated - // (case above). - cancel() + // Deadline exceeded - write an empty response and + // send it. The client will reconnect immediately. + _, err := w.Write(serialized_pad) + if err != nil { + logger.Info("reader: Error %v", err) + return + } + + flusher.Flush() + return // Write a pad message every 10 seconds // to keep the conenction alive. diff --git a/server/metrics.go b/server/metrics.go new file mode 100644 index 00000000000..f155b53afbf --- /dev/null +++ b/server/metrics.go @@ -0,0 +1,32 @@ +package server + +import ( + "fmt" + "net/http" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +var ( + httpErrorStatusCounters = promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "frontend_http_status", + Help: "Count of various http status.", + }, + []string{"status"}, + ) +) + +func RecordHTTPStats(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + rec := &statusRecorder{ + w, + w.(http.Flusher), + 200, nil} + + next.ServeHTTP(rec, r) + status := fmt.Sprintf("%v", rec.status) + httpErrorStatusCounters.WithLabelValues(status).Inc() + }) +} diff --git a/server/server.go b/server/server.go index 003d1efb691..40a41c870c9 100644 --- a/server/server.go +++ b/server/server.go @@ -276,8 +276,11 @@ func (self *Server) Process( if err != nil { return nil, 0, err } - err = client_info_manager.UpdatePing( - message_info.Source, message_info.RemoteAddr) + err = client_info_manager.UpdateStats(message_info.Source, + func(s *services.Stats) { + s.Ping = uint64(time.Now().UnixNano() / 1000) + s.IpAddress = message_info.RemoteAddr + }) if err != nil { return nil, 0, err } diff --git a/server/server_test.go b/server/server_test.go index 6739fe6f983..28160f53693 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -18,6 +18,7 @@ import ( api_proto "www.velocidex.com/golang/velociraptor/api/proto" config_proto "www.velocidex.com/golang/velociraptor/config/proto" "www.velocidex.com/golang/velociraptor/constants" + "www.velocidex.com/golang/velociraptor/crypto" crypto_client "www.velocidex.com/golang/velociraptor/crypto/client" crypto_proto "www.velocidex.com/golang/velociraptor/crypto/proto" "www.velocidex.com/golang/velociraptor/datastore" @@ -103,6 +104,13 @@ func (self *ServerTestSuite) SetupTest() { require.NoError(self.T(), err) self.client_id = self.client_crypto.ClientId + db, err := datastore.GetDB(self.ConfigObj) + assert.NoError(self.T(), err) + + client_path_manager := paths.NewClientPathManager(self.client_id) + err = db.SetSubject(self.ConfigObj, client_path_manager.Path(), + &actions_proto.ClientInfo{ClientId: self.client_id}) + assert.NoError(self.T(), err) } func (self *ServerTestSuite) TestEnrollment() { @@ -171,16 +179,10 @@ func (self *ServerTestSuite) TestClientEventTable() { // Wait a bit. time.Sleep(time.Second) - // Send a foreman checkin message from client with old event - // table version. - runner.ProcessSingleMessage( - context.Background(), - &crypto_proto.VeloMessage{ - Source: self.client_id, - ForemanCheckin: &actions_proto.ForemanCheckin{ - LastEventTableVersion: 0, - }, - }) + // Send a message from client to trigger check + runner.ProcessMessages(context.Background(), &crypto.MessageInfo{ + Source: self.client_id, + }) client_info_manager, err := services.GetClientInfoManager() assert.NoError(self.T(), err) @@ -234,23 +236,10 @@ func (self *ServerTestSuite) TestForeman() { assert.Equal(t, hunt.StartRequest, expected) - // Send a foreman checkin message from client with old hunt - // timestamp. - runner.ProcessSingleMessage( - context.Background(), - &crypto_proto.VeloMessage{ - Source: self.client_id, - ForemanCheckin: &actions_proto.ForemanCheckin{ - LastHuntTimestamp: 0, - - // We do not want to trigger an event - // table update in this test so we - // pretend our version is later than - // the automatic table that will be - // created. - LastEventTableVersion: 10000000000000000000, - }, - }) + // Send a message from client to trigger check + runner.ProcessMessages(context.Background(), &crypto.MessageInfo{ + Source: self.client_id, + }) // Server should schedule the new hunt on the client. client_info_manager, err := services.GetClientInfoManager() @@ -260,11 +249,15 @@ func (self *ServerTestSuite) TestForeman() { assert.NoError(t, err) assert.Equal(t, len(tasks), 1) - // Task should be UpdateForeman message. + // Task should be UpdateEventTable message. assert.Equal(t, tasks[0].SessionId, "F.Monitoring") - require.NotNil(t, tasks[0].UpdateForeman) - assert.Equal(t, tasks[0].UpdateForeman.LastHuntTimestamp, services.GetHuntDispatcher(). - GetLastTimestamp()) + require.NotNil(t, tasks[0].UpdateEventTable) + + // The client_info_manager will remember the last hunt timestamp + stats, err := client_info_manager.GetStats(self.client_id) + assert.NoError(t, err) + + assert.Equal(t, stats.LastHuntTimestamp, hunt.StartTime) } func (self *ServerTestSuite) RequiredFilestoreContains( diff --git a/services/client_info.go b/services/client_info.go index cd95bf90a1f..02f8a2dd5ad 100644 --- a/services/client_info.go +++ b/services/client_info.go @@ -23,6 +23,15 @@ const ( type ClientOS int +// Keep some stats about the client in the cache. These will be synced +// to disk periodically. +type Stats struct { + Ping uint64 + LastHuntTimestamp uint64 + LastEventTableVersion uint64 + IpAddress string +} + func GetClientInfoManager() (ClientInfoManager, error) { client_info_manager_mu.Lock() defer client_info_manager_mu.Unlock() @@ -64,18 +73,27 @@ func (self ClientInfo) OS() ClientOS { } type ClientInfoManager interface { - UpdatePing(client_id, ip_address string) error Get(client_id string) (*ClientInfo, error) + GetStats(client_id string) (*Stats, error) + UpdateStats(client_id string, cb func(stats *Stats)) error + // Get the client's tasks and remove them from the queue. GetClientTasks(client_id string) ([]*crypto_proto.VeloMessage, error) // Get all the tasks without de-queuing them. PeekClientTasks(client_id string) ([]*crypto_proto.VeloMessage, error) + QueueMessagesForClient( + client_id string, + req []*crypto_proto.VeloMessage, + notify bool, /* Also notify the client about the new task */ + ) error + QueueMessageForClient( client_id string, req *crypto_proto.VeloMessage, + notify bool, /* Also notify the client about the new task */ completion func()) error UnQueueMessageForClient( diff --git a/services/client_info/client_info.go b/services/client_info/client_info.go index 63d3dbb8288..e5de1e80153 100644 --- a/services/client_info/client_info.go +++ b/services/client_info/client_info.go @@ -13,6 +13,7 @@ import ( actions_proto "www.velocidex.com/golang/velociraptor/actions/proto" config_proto "www.velocidex.com/golang/velociraptor/config/proto" "www.velocidex.com/golang/velociraptor/datastore" + "www.velocidex.com/golang/velociraptor/json" "www.velocidex.com/golang/velociraptor/logging" "www.velocidex.com/golang/velociraptor/paths" "www.velocidex.com/golang/velociraptor/services" @@ -66,52 +67,52 @@ func (self *CachedInfo) GetHasTasks() TASKS_AVAILABLE_STATUS { return self.has_tasks } -// last seen is in uS -func (self *CachedInfo) UpdatePing( - last_seen uint64, ip_address string) { +func (self *CachedInfo) GetStats() *services.Stats { self.mu.Lock() defer self.mu.Unlock() - if last_seen == 0 { - self.record.Ping = uint64(self.owner.Clock.Now().UnixNano() / 1000) - } else { - self.record.Ping = last_seen + return &services.Stats{ + Ping: self.record.Ping, + IpAddress: self.record.IpAddress, + LastHuntTimestamp: self.record.LastHuntTimestamp, + LastEventTableVersion: self.record.LastEventTableVersion, } - - if ip_address != "" { - self.record.IpAddress = ip_address - } - self.dirty = true } -// Update the ping and notify other client info managers of the change. -func (self *CachedInfo) UpdatePingAndNotify( - last_seen uint64, ip_address string) { +func (self *CachedInfo) UpdateStats(cb func(stats *services.Stats)) { self.mu.Lock() - if last_seen == 0 { - self.record.Ping = uint64(self.owner.Clock.Now().UnixNano() / 1000) - } else { - self.record.Ping = last_seen + stats := &services.Stats{} + + cb(stats) + self.dirty = true + + if stats.Ping > 0 { + self.record.Ping = stats.Ping } - if ip_address != "" { - self.record.IpAddress = ip_address + if stats.IpAddress != "" { + self.record.IpAddress = stats.IpAddress + } + + if stats.LastHuntTimestamp > 0 { + self.record.LastHuntTimestamp = stats.LastHuntTimestamp } - self.dirty = true - // Notify of the ping. - if self.record.Ping > self.last_flush+1000000*MAX_PING_SYNC_SEC { + if stats.LastEventTableVersion > 0 { + self.record.LastEventTableVersion = stats.LastEventTableVersion + } + + // Notify other minions of the stats update. + now := uint64(self.owner.Clock.Now().UnixNano() / 1000) + if now > self.last_flush+1000000*MAX_PING_SYNC_SEC { self.mu.Unlock() - self.owner.SendPing( - self.record.ClientId, ip_address, self.record.Ping) - return + self.owner.SendStats(self.record.ClientId, stats) self.Flush() return } - self.mu.Unlock() } @@ -126,8 +127,11 @@ func (self *CachedInfo) Flush() error { } ping_client_info := &actions_proto.ClientInfo{ - Ping: self.record.Ping, - IpAddress: self.record.IpAddress, + Ping: self.record.Ping, + PingTime: time.Unix(0, int64(self.record.Ping)*1000).String(), + IpAddress: self.record.IpAddress, + LastHuntTimestamp: self.record.LastHuntTimestamp, + LastEventTableVersion: self.record.LastEventTableVersion, } client_id := self.record.ClientId self.dirty = false @@ -163,23 +167,38 @@ type ClientInfoManager struct { queue []*ordereddict.Dict } -func (self *ClientInfoManager) SendPing(client_id, ip_address string, ping uint64) { - self.mu.Lock() - self.queue = append(self.queue, ordereddict.NewDict(). - Set("ClientId", client_id). - Set("Ping", ping). - Set("IpAddress", ip_address). - Set("From", self.uuid)) - self.mu.Unlock() +func (self *ClientInfoManager) SendStats(client_id string, stats *services.Stats) { + journal, err := services.GetJournal() + if err != nil { + return + } + + journal.PushRowsToArtifactAsync(self.config_obj, + ordereddict.NewDict(). + Set("ClientId", client_id). + Set("Stats", json.MustMarshalString(stats)). + Set("From", self.uuid), + "Server.Internal.ClientPing") } -func (self *ClientInfoManager) UpdatePing(client_id, ip string) error { +func (self *ClientInfoManager) GetStats(client_id string) (*services.Stats, error) { + cached_info, err := self.GetCacheInfo(client_id) + if err != nil { + return nil, err + } + + return cached_info.GetStats(), nil +} + +func (self *ClientInfoManager) UpdateStats( + client_id string, + cb func(stats *services.Stats)) error { cached_info, err := self.GetCacheInfo(client_id) if err != nil { return err } - cached_info.UpdatePingAndNotify(0, ip) + cached_info.UpdateStats(cb) return nil } @@ -201,7 +220,7 @@ func (self *ClientInfoManager) Start( // When clients are notified they need to refresh their tasks list // and invalidate the cache. err = journal.WatchQueueWithCB(ctx, config_obj, wg, - "Server.Internal.Notifications", self.ProcessNotification) + "Server.Internal.ClientTasks", self.ProcessNotification) if err != nil { return err } @@ -214,49 +233,8 @@ func (self *ClientInfoManager) Start( if err != nil { return err } - } else { - wait_time := uint64(10000) // 10 seconds - if config_obj.Frontend != nil && - config_obj.Frontend.Resources != nil && - config_obj.Frontend.Resources.MinionBatchWaitTimeMs > 0 { - wait_time = config_obj.Frontend.Resources.MinionBatchWaitTimeMs - } - - journal_manager, err := services.GetJournal() - if err != nil { - return err - } - - // Minions will push rows to the master to inform it about the - // new ping times. - wg.Add(1) - go func() { - defer wg.Done() - - for { - select { - case <-ctx.Done(): - return - - case <-time.After(time.Duration(wait_time) * time.Millisecond): - self.mu.Lock() - queue := self.queue - self.queue = nil - self.mu.Unlock() - - // Push the rows to the master. - if len(queue) > 0 { - err = journal_manager.PushRowsToArtifact( - self.config_obj, queue, - "Server.Internal.ClientPing", "server", "") - if err != nil { - logger.Debug("RPC Error: %v\n", err) - } - } - } - } - }() } + return journal.WatchQueueWithCB(ctx, config_obj, wg, "Server.Internal.Interrogation", self.ProcessInterrogateResults) } @@ -286,17 +264,18 @@ func (self *ClientInfoManager) ProcessPing( } client_id, pres := row.GetString("ClientId") - if !pres || client_id == "" { + if !pres { return invalidError } - ping, pres := row.GetInt64("Ping") - if !pres || ping == 0 { + stats := &services.Stats{} + serialized, pres := row.GetString("Stats") + if !pres { return invalidError } - ip_address, pres := row.GetString("IpAddress") - if !pres { + err := json.Unmarshal([]byte(serialized), &stats) + if err != nil { return invalidError } @@ -304,7 +283,23 @@ func (self *ClientInfoManager) ProcessPing( if err == nil { // Update our internal cache but do not notify further (since // this came from a notification anyway). - cached_info.UpdatePing(uint64(ping), ip_address) + cached_info.UpdateStats(func(cached_stats *services.Stats) { + if stats.Ping != 0 { + cached_stats.Ping = stats.Ping + } + + if stats.IpAddress != "" { + cached_stats.IpAddress = stats.IpAddress + } + + if stats.LastHuntTimestamp > 0 { + cached_stats.LastHuntTimestamp = stats.LastHuntTimestamp + } + + if stats.LastEventTableVersion > 0 { + cached_stats.LastEventTableVersion = stats.LastEventTableVersion + } + }) } return nil @@ -362,7 +357,10 @@ func (self *ClientInfoManager) GetCacheInfo(client_id string) (*CachedInfo, erro // Read the main client record err = db.GetSubject(self.config_obj, client_path_manager.Path(), &client_info.ClientInfo) - if err != nil { + // Special case the server - it is a special client that does not + // need to enrol. It actually does have a client record becuase it + // needs to schedule tasks for itself. + if err != nil && client_id != "server" { return nil, err } @@ -377,6 +375,8 @@ func (self *ClientInfoManager) GetCacheInfo(client_id string) (*CachedInfo, erro if err == nil { client_info.Ping = ping_info.Ping client_info.IpAddress = ping_info.IpAddress + client_info.LastHuntTimestamp = ping_info.LastHuntTimestamp + client_info.LastEventTableVersion = ping_info.LastEventTableVersion cache_info.last_flush = ping_info.Ping } diff --git a/services/client_info/client_info_test.go b/services/client_info/client_info_test.go index 9bcc49b343d..bed28ba9d23 100644 --- a/services/client_info/client_info_test.go +++ b/services/client_info/client_info_test.go @@ -69,7 +69,10 @@ func (self *ClientInfoTestSuite) TestClientInfo() { assert.Equal(self.T(), info.Ping, uint64(0)) // Update the IP address - client_info_manager.UpdatePing(self.client_id, "127.0.0.1") + client_info_manager.UpdateStats(self.client_id, func(s *services.Stats) { + s.Ping = uint64(100 * 1000000) + s.IpAddress = "127.0.0.1" + }) // Now get the client record and check that it is updated info, err = client_info_manager.Get(self.client_id) @@ -112,7 +115,9 @@ func (self *ClientInfoTestSuite) TestMasterMinion() { assert.NoError(self.T(), err) // Update the minion timestamp - minion_client_info_manager.UpdatePing(self.client_id, "127.0.0.1") + minion_client_info_manager.UpdateStats(self.client_id, func(s *services.Stats) { + s.IpAddress = "127.0.0.1" + }) vtesting.WaitUntil(time.Second, self.T(), func() bool { client_info, err := master_client_info_manager.Get(self.client_id) diff --git a/services/client_info/tasks.go b/services/client_info/tasks.go index f2d9a776b0e..80eb4868f70 100644 --- a/services/client_info/tasks.go +++ b/services/client_info/tasks.go @@ -8,19 +8,27 @@ package client_info import ( "context" - "strings" "sync/atomic" "github.com/Velocidex/ordereddict" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" config_proto "www.velocidex.com/golang/velociraptor/config/proto" crypto_proto "www.velocidex.com/golang/velociraptor/crypto/proto" "www.velocidex.com/golang/velociraptor/datastore" "www.velocidex.com/golang/velociraptor/file_store/api" "www.velocidex.com/golang/velociraptor/paths" + "www.velocidex.com/golang/velociraptor/services" "www.velocidex.com/golang/velociraptor/utils" ) var ( + tasksClearCount = promauto.NewGauge( + prometheus.GaugeOpts{ + Name: "client_info_client_tasks_notifications", + Help: "Number of notifications received that clients have new tasks", + }) + Clock utils.Clock = &utils.RealClock{} g_id uint64 ) @@ -36,14 +44,21 @@ const ( func (self *ClientInfoManager) ProcessNotification( ctx context.Context, config_obj *config_proto.Config, row *ordereddict.Dict) error { - client_id, pres := row.GetString("Target") - if pres && strings.HasPrefix(client_id, "C.") { + client_id, pres := row.GetString("ClientId") + if pres { cached_info, err := self.GetCacheInfo(client_id) if err != nil { return err } + // Next access will check for real. - cached_info.SetHasTasks(TASKS_AVAILABLE_STATUS_UNKNOWN) + tasksClearCount.Inc() + cached_info.SetHasTasks(TASKS_AVAILABLE_STATUS_YES) + + notifier := services.GetNotifier() + if notifier != nil { + notifier.NotifyDirectListener(client_id) + } } return nil } @@ -61,14 +76,59 @@ func (self *ClientInfoManager) UnQueueMessageForClient( client_path_manager.Task(message.TaskId)) } +func (self *ClientInfoManager) QueueMessagesForClient( + client_id string, + req []*crypto_proto.VeloMessage, + /* Also notify the client about the new task */ + notify bool) error { + + journal, err := services.GetJournal() + if err != nil { + return err + } + + db, err := datastore.GetDB(self.config_obj) + if err != nil { + return err + } + + // When the completer is done send a message to all the minions + // that the tasks are ready to be read. + completer := utils.NewCompleter(func() { + journal.PushRowsToArtifactAsync(self.config_obj, + ordereddict.NewDict(). + Set("ClientId", client_id). + Set("Notify", notify), + "Server.Internal.ClientTasks") + }) + defer completer.GetCompletionFunc()() + + client_path_manager := paths.NewClientPathManager(client_id) + + for _, r := range req { + // Task ID is related to time. + r.TaskId = currentTaskId() + + err = db.SetSubjectWithCompletion(self.config_obj, + client_path_manager.Task(r.TaskId), + r, completer.GetCompletionFunc()) + } + return nil +} + func (self *ClientInfoManager) QueueMessageForClient( client_id string, - req *crypto_proto.VeloMessage, + req *crypto_proto.VeloMessage, notify bool, completion func()) error { // Task ID is related to time. req.TaskId = currentTaskId() + journal, err := services.GetJournal() + if err != nil { + return err + } + db, err := datastore.GetDB(self.config_obj) if err != nil { return err @@ -77,7 +137,17 @@ func (self *ClientInfoManager) QueueMessageForClient( client_path_manager := paths.NewClientPathManager(client_id) return db.SetSubjectWithCompletion(self.config_obj, client_path_manager.Task(req.TaskId), - req, completion) + req, func() { + if completion != nil { + completion() + } + + journal.PushRowsToArtifactAsync(self.config_obj, + ordereddict.NewDict(). + Set("ClientId", client_id). + Set("Notify", notify), + "Server.Internal.ClientTasks") + }) } // Get the client tasks but do not dequeue them (Generally only called diff --git a/services/client_info/tasks_test.go b/services/client_info/tasks_test.go index cd242268476..4b7c7e3d6e6 100644 --- a/services/client_info/tasks_test.go +++ b/services/client_info/tasks_test.go @@ -18,7 +18,8 @@ func (self *ClientInfoTestSuite) TestQueueMessages() { assert.NoError(self.T(), err) message1 := &crypto_proto.VeloMessage{Source: "Server", SessionId: "1"} - err = client_info_manager.QueueMessageForClient(self.client_id, message1, nil) + err = client_info_manager.QueueMessageForClient( + self.client_id, message1, true, nil) assert.NoError(self.T(), err) manager := client_info_manager.(*client_info.ClientInfoManager) @@ -50,7 +51,7 @@ func (self *ClientInfoTestSuite) TestFastQueueMessages() { for i := 0; i < 10; i++ { message := &crypto_proto.VeloMessage{Source: "Server", SessionId: fmt.Sprintf("%d", i)} err := client_info_manager.QueueMessageForClient( - self.client_id, message, nil) + self.client_id, message, true, nil) assert.NoError(self.T(), err) written = append(written, message) @@ -105,10 +106,7 @@ func (self *ClientInfoTestSuite) TestGetClientTasksIsCached() { // Schedule a new task for the client. err = client_info_manager.QueueMessageForClient(self.client_id, - &crypto_proto.VeloMessage{}, func() { - notifier := services.GetNotifier() - notifier.NotifyListener(self.ConfigObj, self.client_id) - }) + &crypto_proto.VeloMessage{}, true, nil) assert.NoError(self.T(), err) // Wait until we can see the new task diff --git a/services/hunt_dispatcher/hunt_dispatcher.go b/services/hunt_dispatcher/hunt_dispatcher.go index 3bbafd44639..1f8cfe239fa 100644 --- a/services/hunt_dispatcher/hunt_dispatcher.go +++ b/services/hunt_dispatcher/hunt_dispatcher.go @@ -42,7 +42,6 @@ package hunt_dispatcher import ( "context" - "errors" "fmt" "strings" "sync" @@ -54,11 +53,9 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" "google.golang.org/protobuf/encoding/protojson" "google.golang.org/protobuf/proto" - actions_proto "www.velocidex.com/golang/velociraptor/actions/proto" api_proto "www.velocidex.com/golang/velociraptor/api/proto" config_proto "www.velocidex.com/golang/velociraptor/config/proto" "www.velocidex.com/golang/velociraptor/constants" - crypto_proto "www.velocidex.com/golang/velociraptor/crypto/proto" "www.velocidex.com/golang/velociraptor/datastore" "www.velocidex.com/golang/velociraptor/json" "www.velocidex.com/golang/velociraptor/logging" @@ -118,43 +115,23 @@ func (self *HuntDispatcher) participateAllConnectedClients( ctx context.Context, config_obj *config_proto.Config, hunt_id string) error { - hunt_obj, pres := self.GetHunt(hunt_id) - if !pres { - return errors.New("Unknown hunt id") - } - notifier := services.GetNotifier() journal, err := services.GetJournal() if err != nil { return err } - client_manager, err := services.GetClientInfoManager() - if err != nil { - return err - } - for _, c := range notifier.ListClients() { if !strings.HasPrefix(c, "C.") { continue } + // Notify the hunt manager about the new client journal.PushRowsToArtifactAsync(config_obj, ordereddict.NewDict(). Set("HuntId", hunt_id). Set("ClientId", c), "System.Hunt.Participation") - - // Get the client to update the LastHuntTimestamp so it does - // not trigger the foreman again. - _ = client_manager.QueueMessageForClient(c, - &crypto_proto.VeloMessage{ - SessionId: constants.MONITORING_WELL_KNOWN_FLOW, - RequestId: constants.IgnoreResponseState, - UpdateForeman: &actions_proto.ForemanCheckin{ - LastHuntTimestamp: hunt_obj.StartTime, - }, - }, nil) } return nil diff --git a/services/hunt_dispatcher/hunt_dispatcher_test.go b/services/hunt_dispatcher/hunt_dispatcher_test.go index ebba4dd2682..473fda06cf6 100644 --- a/services/hunt_dispatcher/hunt_dispatcher_test.go +++ b/services/hunt_dispatcher/hunt_dispatcher_test.go @@ -132,11 +132,13 @@ func (self *HuntDispatcherTestSuite) TestModifyingHuntPropagateChanges() { // Changes are not visible in the data store immediately. assert.Equal(self.T(), modification, services.HuntPropagateChanges) - hunt_path_manager := paths.NewHuntPathManager("H.2") - hunt_obj := &api_proto.Hunt{} - err = db.GetSubject(self.ConfigObj, hunt_path_manager.Path(), hunt_obj) - assert.NoError(self.T(), err) - assert.Equal(self.T(), hunt_obj.State, api_proto.Hunt_RUNNING) + vtesting.WaitUntil(5*time.Second, self.T(), func() bool { + hunt_path_manager := paths.NewHuntPathManager("H.2") + hunt_obj := &api_proto.Hunt{} + err = db.GetSubject(self.ConfigObj, hunt_path_manager.Path(), hunt_obj) + assert.NoError(self.T(), err) + return hunt_obj.State == api_proto.Hunt_RUNNING + }) // But they should be visible in master hunt_obj, pres := self.master_dispatcher.GetHunt("H.2") diff --git a/services/hunt_manager/hunt_manager.go b/services/hunt_manager/hunt_manager.go index da996bb959b..5b0e8c0a9a7 100644 --- a/services/hunt_manager/hunt_manager.go +++ b/services/hunt_manager/hunt_manager.go @@ -676,8 +676,7 @@ func scheduleHuntOnClient( } // The request is pre-compiled into the hunt object. - request := &flows_proto.ArtifactCollectorArgs{} - proto.Merge(request, hunt_obj.StartRequest) + request := proto.Clone(hunt_obj.StartRequest).(*flows_proto.ArtifactCollectorArgs) // Direct the request against our client and schedule it. request.ClientId = client_id @@ -688,13 +687,7 @@ func scheduleHuntOnClient( flow_id, err := launcher.ScheduleArtifactCollection( ctx, config_obj, vql_subsystem.NullACLManager{}, - repository, request, func() { - // Notify the client that the hunt applies to it. - notifier := services.GetNotifier() - if notifier != nil { - notifier.NotifyListener(config_obj, client_id) - } - }) + repository, request, nil) if err != nil { return err } diff --git a/services/interrogation/interrogation.go b/services/interrogation/interrogation.go index 08630b136a4..53826e83376 100644 --- a/services/interrogation/interrogation.go +++ b/services/interrogation/interrogation.go @@ -121,7 +121,8 @@ func (self *EnrollmentService) ProcessEnrollment( // Notify the client notifier := services.GetNotifier() if notifier != nil { - notifier.NotifyListener(config_obj, client_id) + notifier.NotifyListener( + config_obj, client_id, "Interrogate") } }) if err != nil { diff --git a/services/inventory/inventory_test.go b/services/inventory/inventory_test.go index 2379ad56ec3..e02f9ae5bb1 100644 --- a/services/inventory/inventory_test.go +++ b/services/inventory/inventory_test.go @@ -20,6 +20,8 @@ import ( "www.velocidex.com/golang/velociraptor/services/inventory" "www.velocidex.com/golang/velociraptor/services/launcher" vql_subsystem "www.velocidex.com/golang/velociraptor/vql" + + _ "www.velocidex.com/golang/velociraptor/result_sets/timed" ) type MockClient struct { diff --git a/services/journal.go b/services/journal.go index 628d3e73639..32eda4f36f5 100644 --- a/services/journal.go +++ b/services/journal.go @@ -56,6 +56,8 @@ type JournalService interface { Watch(ctx context.Context, queue_name string) (output <-chan *ordereddict.Dict, cancel func()) + GetWatchers() []string + // Push the rows into the result set in the filestore. NOTE: This // method synchronises access to the files within the process. AppendToResultSet(config_obj *config_proto.Config, diff --git a/services/journal/journal.go b/services/journal/journal.go index bd668170a8d..4e545b00016 100644 --- a/services/journal/journal.go +++ b/services/journal/journal.go @@ -41,9 +41,19 @@ type JournalService struct { Clock utils.Clock } +func (self *JournalService) GetWatchers() []string { + return self.qm.GetWatchers() +} + +func (self *JournalService) publishWatchers() { + self.PushRowsToArtifact(self.config_obj, + []*ordereddict.Dict{ordereddict.NewDict(). + Set("Events", self.GetWatchers())}, + "Server.Internal.MasterRegistrations", "server", "") +} + func (self *JournalService) Watch( - ctx context.Context, queue_name string) ( - output <-chan *ordereddict.Dict, cancel func()) { + ctx context.Context, queue_name string) (<-chan *ordereddict.Dict, func()) { if self == nil || self.qm == nil { // Readers block on nil channel. @@ -52,7 +62,17 @@ func (self *JournalService) Watch( logger := logging.GetLogger(self.config_obj, &logging.FrontendComponent) logger.Info("Watching for events from %v", queue_name) - return self.qm.Watch(ctx, queue_name) + res, cancel := self.qm.Watch(ctx, queue_name) + + // Advertise new watchers + self.publishWatchers() + + return res, func() { + cancel() + + // Advertise that a watcher was removed. + self.publishWatchers() + } } // Write rows to a simple result set. This function manages concurrent diff --git a/services/journal/replication.go b/services/journal/replication.go index 728cdeea61f..3f7e9bd152b 100644 --- a/services/journal/replication.go +++ b/services/journal/replication.go @@ -14,6 +14,7 @@ import ( "github.com/Velocidex/ordereddict" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/sirupsen/logrus" api_proto "www.velocidex.com/golang/velociraptor/api/proto" config_proto "www.velocidex.com/golang/velociraptor/config/proto" "www.velocidex.com/golang/velociraptor/file_store" @@ -21,11 +22,21 @@ import ( "www.velocidex.com/golang/velociraptor/grpc_client" "www.velocidex.com/golang/velociraptor/json" "www.velocidex.com/golang/velociraptor/logging" + "www.velocidex.com/golang/velociraptor/paths/artifacts" "www.velocidex.com/golang/velociraptor/result_sets" "www.velocidex.com/golang/velociraptor/services" + "www.velocidex.com/golang/velociraptor/utils" ) var ( + replicationSendHistorgram = promauto.NewHistogram( + prometheus.HistogramOpts{ + Name: "replication_minion_latency", + Help: "Latency to send replication messages from minion to the master.", + Buckets: prometheus.LinearBuckets(0.1, 1, 10), + }, + ) + replicationTotalSent = promauto.NewCounter(prometheus.CounterOpts{ Name: "replication_service_total_send", Help: "Total number of PushRow rpc calls.", @@ -48,6 +59,10 @@ type ReplicationService struct { tmpfile *os.File ctx context.Context + // Locally connected watchers. + qm api.QueueManager + Clock utils.Clock + sender chan *api_proto.PushEventRequest api_client api_proto.APIClient @@ -59,6 +74,9 @@ type ReplicationService struct { locks map[string]*sync.Mutex retryDuration time.Duration + // The set of events the master is interested in. + masterRegistrations map[string]bool + // Store rows for async push batch map[string][]*ordereddict.Dict } @@ -77,6 +95,14 @@ func (self *ReplicationService) SetRetryDuration(duration time.Duration) { self.retryDuration = duration } +func (self *ReplicationService) isEventRegistered(artifact string) bool { + self.mu.Lock() + defer self.mu.Unlock() + + ok, pres := self.masterRegistrations[artifact] + return pres && ok +} + func (self *ReplicationService) pumpEventFromBufferFile() { for { event, err := self.Buffer.Lease() @@ -111,8 +137,12 @@ func (self *ReplicationService) pumpEventFromBufferFile() { } } +// Periodically flush the batches built up during +// PushRowsToArtifactAsync calls. func (self *ReplicationService) startAsyncLoop( - ctx context.Context, wg *sync.WaitGroup, config_obj *config_proto.Config) { + ctx context.Context, + wg *sync.WaitGroup, + config_obj *config_proto.Config) { wg.Add(1) go func() { @@ -198,7 +228,15 @@ func (self *ReplicationService) Start( if !ok { return } + + timer := prometheus.NewTimer( + prometheus.ObserverFunc(func(v float64) { + replicationSendHistorgram.Observe(v) + })) + _, err := self.api_client.PushEvents(ctx, request) + timer.ObserveDuration() + if err != nil { replicationTotalSendErrors.Inc() @@ -207,6 +245,7 @@ func (self *ReplicationService) Start( // for later delivery. _ = self.Buffer.Enqueue(request) } + } } }() @@ -216,6 +255,8 @@ func (self *ReplicationService) Start( // small events to write in larger chunks for gRPC efficiency. self.startAsyncLoop(ctx, wg, config_obj) + self.startMasterRegistrationLoop(ctx, wg, config_obj) + logger := logging.GetLogger(self.config_obj, &logging.FrontendComponent) logger.Debug("Starting Replication service to master frontend at %v", grpc_client.GetAPIConnectionString(self.config_obj)) @@ -223,6 +264,57 @@ func (self *ReplicationService) Start( return nil } +func (self *ReplicationService) ProcessMasterRegistrations(event *ordereddict.Dict) { + names_any, pres := event.Get("Events") + if !pres { + return + } + + names, ok := names_any.([]interface{}) + if ok { + self.mu.Lock() + self.masterRegistrations = make(map[string]bool) + + for _, name := range names { + name_str, ok := name.(string) + if ok { + self.masterRegistrations[name_str] = true + } + } + logger := logging.GetLogger(self.config_obj, &logging.FrontendComponent) + logger.WithFields(logrus.Fields{ + "events": names, + }).Info("Master event registrations") + self.mu.Unlock() + } +} + +func (self *ReplicationService) startMasterRegistrationLoop( + ctx context.Context, wg *sync.WaitGroup, config_obj *config_proto.Config) { + + events, cancel := self.Watch(ctx, "Server.Internal.MasterRegistrations") + + wg.Add(1) + go func() { + defer wg.Done() + defer cancel() + + for { + select { + case <-ctx.Done(): + return + + case event, ok := <-events: + if !ok { + return + } + self.ProcessMasterRegistrations(event) + } + } + }() + +} + func (self *ReplicationService) AppendToResultSet( config_obj *config_proto.Config, path api.FSPathSpec, @@ -281,10 +373,49 @@ func (self *ReplicationService) PushRowsToArtifactAsync( self.batch[artifact] = queue } +func (self *ReplicationService) pushRowsToLocalQueueManager( + config_obj *config_proto.Config, rows []*ordereddict.Dict, + artifact, client_id, flows_id string) error { + + path_manager, err := artifacts.NewArtifactPathManager( + config_obj, client_id, flows_id, artifact) + if err != nil { + return err + } + path_manager.Clock = self.Clock + + // Just a regular artifact, append to the existing result set. + if !path_manager.IsEvent() { + path, err := path_manager.GetPathForWriting() + if err != nil { + return err + } + return self.AppendToResultSet(config_obj, path, rows) + } + + // The Queue manager will manage writing event artifacts to a + // timed result set, including multi frontend synchronisation. + if self != nil && self.qm != nil { + return self.qm.PushEventRows(path_manager, rows) + } + return errors.New("Filestore not initialized") +} + func (self *ReplicationService) PushRowsToArtifact( config_obj *config_proto.Config, rows []*ordereddict.Dict, artifact, client_id, flow_id string) error { + err := self.pushRowsToLocalQueueManager( + config_obj, rows, artifact, client_id, flow_id) + if err != nil { + return err + } + + // Do not replicate the event if the master does not care about it. + if !self.isEventRegistered(artifact) { + return nil + } + replicationTotalSent.Inc() serialized, err := json.MarshalJsonl(rows) @@ -313,6 +444,11 @@ func (self *ReplicationService) PushRowsToArtifact( } } +func (self *ReplicationService) GetWatchers() []string { + return nil +} + +// Watch the master for new events func (self *ReplicationService) Watch(ctx context.Context, queue string) ( <-chan *ordereddict.Dict, func()) { @@ -406,12 +542,19 @@ func NewReplicationService( *ReplicationService, error) { service := &ReplicationService{ - config_obj: config_obj, - locks: make(map[string]*sync.Mutex), - batch: make(map[string][]*ordereddict.Dict), + config_obj: config_obj, + locks: make(map[string]*sync.Mutex), + masterRegistrations: make(map[string]bool), + batch: make(map[string][]*ordereddict.Dict), + Clock: utils.RealClock{}, + } + + qm, err := file_store.GetQueueManager(config_obj) + if err != nil || qm != nil { + service.qm = qm } - err := service.Start(ctx, config_obj, wg) + err = service.Start(ctx, config_obj, wg) if err == nil { services.RegisterJournal(service) } diff --git a/services/journal/replication_test.go b/services/journal/replication_test.go index 52f45ca03de..16193e0e6e6 100644 --- a/services/journal/replication_test.go +++ b/services/journal/replication_test.go @@ -29,6 +29,8 @@ import ( "www.velocidex.com/golang/velociraptor/services/repository" "www.velocidex.com/golang/velociraptor/utils" "www.velocidex.com/golang/velociraptor/vtesting" + + _ "www.velocidex.com/golang/velociraptor/result_sets/timed" ) type MockFrontendService struct { @@ -68,6 +70,17 @@ func (self *ReplicationTestSuite) startServices() { assert.NoError(t, self.sm.Start(repository.StartRepositoryManagerForTest)) } +func (self *ReplicationTestSuite) LoadArtifacts(definitions []string) { + manager, _ := services.GetRepositoryManager() + global_repo, err := manager.GetGlobalRepository(self.config_obj) + assert.NoError(self.T(), err) + + for _, def := range definitions { + _, err := global_repo.LoadYaml(def, true) + assert.NoError(self.T(), err) + } +} + func (self *ReplicationTestSuite) SetupTest() { var err error self.config_obj, err = new(config.Loader).WithFileLoader( @@ -129,6 +142,11 @@ func (self *ReplicationTestSuite) TestReplicationServiceStandardWatchers() { self.startServices() + self.LoadArtifacts([]string{` +name: Test.Artifact +type: CLIENT_EVENT +`}) + // Wait here until we call all the watchers. vtesting.WaitUntil(5*time.Second, self.T(), func() bool { mu.Lock() @@ -139,6 +157,7 @@ func (self *ReplicationTestSuite) TestReplicationServiceStandardWatchers() { // master. This is used to let the master know // if a client is connected to us. "Server.Internal.Ping", + "Server.Internal.MasterRegistrations", // The notifications service will watch for // notifications through us. @@ -183,6 +202,8 @@ func (self *ReplicationTestSuite) TestSendingEvents() { replicator := journal_service.(*journal.ReplicationService) replicator.SetRetryDuration(100 * time.Millisecond) + replicator.ProcessMasterRegistrations(ordereddict.NewDict(). + Set("Events", []interface{}{"Test.Artifact"})) events = nil err = journal_service.PushRowsToArtifact(self.config_obj, diff --git a/services/launcher/launcher.go b/services/launcher/launcher.go index 503a250dd1f..377290062d5 100644 --- a/services/launcher/launcher.go +++ b/services/launcher/launcher.go @@ -138,7 +138,6 @@ import ( "www.velocidex.com/golang/velociraptor/logging" "www.velocidex.com/golang/velociraptor/paths" "www.velocidex.com/golang/velociraptor/services" - "www.velocidex.com/golang/velociraptor/utils" vql_subsystem "www.velocidex.com/golang/velociraptor/vql" ) @@ -501,50 +500,36 @@ func ScheduleArtifactCollectionFromCollectorArgs( vql_collector_args []*actions_proto.VQLCollectorArgs, completion func()) (string, error) { - completer := utils.NewCompleter(completion) - defer completer.GetCompletionFunc()() - client_id := collector_request.ClientId if client_id == "" { return "", errors.New("Client id not provided.") } - // Generate a new collection context. - collection_context := &flows_proto.ArtifactCollectorContext{ - SessionId: NewFlowId(client_id), - CreateTime: uint64(time.Now().UnixNano() / 1000), - State: flows_proto.ArtifactCollectorContext_RUNNING, - Request: collector_request, - ClientId: client_id, - OutstandingRequests: int64(len(vql_collector_args)), - } db, err := datastore.GetDB(config_obj) if err != nil { return "", err } - // Save the collection context. - flow_path_manager := paths.NewFlowPathManager(client_id, - collection_context.SessionId) - err = db.SetSubjectWithCompletion(config_obj, - flow_path_manager.Path(), - collection_context, completer.GetCompletionFunc()) + client_manager, err := services.GetClientInfoManager() if err != nil { return "", err } - tasks := []*crypto_proto.VeloMessage{} + session_id := NewFlowId(client_id) + // Compile all the requests into specific tasks to be sent to the + // client. + tasks := []*crypto_proto.VeloMessage{} for id, arg := range vql_collector_args { // If sending to the server record who actually launched this. if client_id == "server" { - arg.Principal = collection_context.Request.Creator + arg.Principal = collector_request.Creator } // The task we will schedule for the client. task := &crypto_proto.VeloMessage{ QueryId: uint64(id), - SessionId: collection_context.SessionId, + SessionId: session_id, RequestId: constants.ProcessVQLResponses, VQLClientAction: arg, } @@ -554,24 +539,40 @@ func ScheduleArtifactCollectionFromCollectorArgs( task.Urgent = true } - client_manager, err := services.GetClientInfoManager() - if err != nil { - return "", err - } - - err = client_manager.QueueMessageForClient( - client_id, task, completer.GetCompletionFunc()) - if err != nil { - return "", err - } tasks = append(tasks, task) } + // Save the collection context first. + flow_path_manager := paths.NewFlowPathManager(client_id, session_id) + + // Generate a new collection context for this flow. + collection_context := &flows_proto.ArtifactCollectorContext{ + SessionId: session_id, + CreateTime: uint64(time.Now().UnixNano() / 1000), + State: flows_proto.ArtifactCollectorContext_RUNNING, + Request: collector_request, + ClientId: client_id, + OutstandingRequests: int64(len(tasks)), + } + + // Store the collection_context first, then queue all the tasks. + err = db.SetSubjectWithCompletion(config_obj, + flow_path_manager.Path(), + collection_context, + + func() { + // Queue and notify the client about the new tasks + client_manager.QueueMessagesForClient( + client_id, tasks, true /* notify */) + }) + if err != nil { + return "", err + } + // Record the tasks for provenance of what we actually did. err = db.SetSubjectWithCompletion(config_obj, flow_path_manager.Task(), - &api_proto.ApiFlowRequestDetails{Items: tasks}, - completer.GetCompletionFunc()) + &api_proto.ApiFlowRequestDetails{Items: tasks}, nil) if err != nil { return "", err } diff --git a/services/launcher/launcher_test.go b/services/launcher/launcher_test.go index e50072cb69e..2060589cc19 100644 --- a/services/launcher/launcher_test.go +++ b/services/launcher/launcher_test.go @@ -33,6 +33,8 @@ import ( // Load plugins (timestamp, parse_csv) _ "www.velocidex.com/golang/velociraptor/vql/functions" _ "www.velocidex.com/golang/velociraptor/vql/parsers/csv" + + _ "www.velocidex.com/golang/velociraptor/result_sets/timed" ) type LauncherTestSuite struct { diff --git a/services/notifications.go b/services/notifications.go index e5d33a24988..4f6264c43f2 100644 --- a/services/notifications.go +++ b/services/notifications.go @@ -50,21 +50,15 @@ type Notifier interface { // the Journal service. ListenForNotification(id string) (chan bool, func()) - // Send a notification to everyone - this is a global event - // directed at everyone. Only used in server shutdown - use - // NotifyByRegex instead. - NotifyAllListeners(config_obj *config_proto.Config) error - - // Send a notification to all listeners with an id matching - // this regex. - NotifyByRegex(config_obj *config_proto.Config, regex string) error - // Send a notification to a specific listener based on its id // that was registered above. - NotifyListener(config_obj *config_proto.Config, id string) error + NotifyListener(config_obj *config_proto.Config, id, tag string) error + + // Notify a directly connected listener. + NotifyDirectListener(id string) // Notify in the near future - no guarantee of delivery. - NotifyListenerAsync(config_obj *config_proto.Config, id string) + NotifyListenerAsync(config_obj *config_proto.Config, id, tag string) // Check if there is someone listening for the specified id. This // method queries all minion nodes to check if the client is diff --git a/services/notifications/notifications.go b/services/notifications/notifications.go index a40c6d00834..4a552c1c376 100644 --- a/services/notifications/notifications.go +++ b/services/notifications/notifications.go @@ -17,7 +17,6 @@ package notifications import ( "context" "fmt" - "regexp" "sync" "sync/atomic" "time" @@ -37,6 +36,16 @@ var ( Name: "client_ping_timeout", Help: "Number of times the client ping has timed out.", }) + + notificationsSentCounter = promauto.NewCounter(prometheus.CounterOpts{ + Name: "notifications_send_count", + Help: "Number of notification messages sent.", + }) + + notificationsReceivedCounter = promauto.NewCounter(prometheus.CounterOpts{ + Name: "notifications_receive_count", + Help: "Number of notification messages received.", + }) ) type Notifier struct { @@ -106,25 +115,8 @@ func StartNotificationService( if !ok { continue } - - if target == "Regex" { - regex_str, ok := event.GetString("Regex") - if ok { - regex, err := regexp.Compile(regex_str) - if err != nil { - logger.Error("Notification service: "+ - "Unable to compiler regex '%v': %v\n", - regex_str, err) - continue - } - self.notification_pool.NotifyByRegex(config_obj, regex) - } - - } else if target == "All" { - self.notification_pool.NotifyAll(config_obj) - } else { - self.notification_pool.Notify(target) - } + notificationsReceivedCounter.Inc() + self.notification_pool.Notify(target) } } }() @@ -146,25 +138,29 @@ func (self *Notifier) ProcessPing(ctx context.Context, return nil } - // Client is directly connected - inform the client info - // manager. Normally the Ping is sent by the frontend to find out - // which clients are connected to a minion. In this case it is - // worth the extra ping updates to deliver fresh data to the GUI - - // there are not too many clients but we need to know accurate - // data. - client_info_manager, err := services.GetClientInfoManager() - if err != nil { - return err - } - client_info_manager.UpdatePing(client_id, "") + /* + // Client is directly connected - inform the client info + // manager. Normally the Ping is sent by the frontend to find out + // which clients are connected to a minion. In this case it is + // worth the extra ping updates to deliver fresh data to the GUI - + // there are not too many clients but we need to know accurate + // data. + client_info_manager, err := services.GetClientInfoManager() + if err != nil { + return err + } + client_info_manager.UpdateStats(client_id, func(stats *services.Stats) { + stats.Ping = + }) + */ notify_target, pres := row.GetString("NotifyTarget") if !pres { return nil } // Notify the target of the Ping. - return self.NotifyListener(config_obj, notify_target) + return self.NotifyListener(config_obj, notify_target, "ClientPing") } func (self *Notifier) ListenForNotification(client_id string) (chan bool, func()) { @@ -178,52 +174,41 @@ func (self *Notifier) ListenForNotification(client_id string) (chan bool, func() return notification_pool.Listen(client_id) } -func (self *Notifier) NotifyAllListeners(config_obj *config_proto.Config) error { - journal, err := services.GetJournal() - if err != nil { - return err - } - - return journal.PushRowsToArtifact(config_obj, - []*ordereddict.Dict{ordereddict.NewDict().Set("Target", "All")}, - "Server.Internal.Notifications", "server", "", - ) -} - -func (self *Notifier) NotifyByRegex( - config_obj *config_proto.Config, regex string) error { +func (self *Notifier) NotifyListener(config_obj *config_proto.Config, + id, tag string) error { journal, err := services.GetJournal() if err != nil { return err } + // We need to send this ASAP so we do not use an async send. + notificationsSentCounter.Inc() return journal.PushRowsToArtifact(config_obj, - []*ordereddict.Dict{ordereddict.NewDict().Set("Target", "Regex"). - Set("Regex", regex)}, + []*ordereddict.Dict{ordereddict.NewDict(). + Set("Tag", tag). + Set("Target", id)}, "Server.Internal.Notifications", "server", "", ) } -func (self *Notifier) NotifyListener(config_obj *config_proto.Config, id string) error { - journal, err := services.GetJournal() - if err != nil { - return err +func (self *Notifier) NotifyDirectListener(client_id string) { + if self.notification_pool.IsClientConnected(client_id) { + self.notification_pool.Notify(client_id) } - - return journal.PushRowsToArtifact(config_obj, - []*ordereddict.Dict{ordereddict.NewDict().Set("Target", id)}, - "Server.Internal.Notifications", "server", "", - ) } -func (self *Notifier) NotifyListenerAsync(config_obj *config_proto.Config, id string) { +func (self *Notifier) NotifyListenerAsync(config_obj *config_proto.Config, + id, tag string) { journal, err := services.GetJournal() if err != nil { return } + notificationsSentCounter.Inc() journal.PushRowsToArtifactAsync(config_obj, - ordereddict.NewDict().Set("Target", id), + ordereddict.NewDict(). + Set("Tag", tag). + Set("Target", id), "Server.Internal.Notifications") } diff --git a/services/sanity/sanity_test.go b/services/sanity/sanity_test.go index 41b4f76629a..a40e1b689d3 100644 --- a/services/sanity/sanity_test.go +++ b/services/sanity/sanity_test.go @@ -1,4 +1,4 @@ -package sanity +package sanity_test import ( "testing" @@ -18,7 +18,10 @@ import ( "www.velocidex.com/golang/velociraptor/paths" "www.velocidex.com/golang/velociraptor/services" "www.velocidex.com/golang/velociraptor/services/inventory" + "www.velocidex.com/golang/velociraptor/services/sanity" "www.velocidex.com/golang/velociraptor/utils" + + _ "www.velocidex.com/golang/velociraptor/result_sets/timed" ) type ServicesTestSuite struct { @@ -57,7 +60,7 @@ tools: }) assert.NoError(self.T(), err) - require.NoError(self.T(), self.Sm.Start(StartSanityCheckService)) + require.NoError(self.T(), self.Sm.Start(sanity.StartSanityCheckService)) db := test_utils.GetMemoryDataStore(self.T(), self.ConfigObj) inventory_config := &artifacts_proto.ThirdParty{} @@ -84,7 +87,7 @@ func (self *ServicesTestSuite) TestCreateUser() { PasswordSalt: "0f61ad0fd6391513021242efb9ac780245cc21527fa3f9c5e552d47223e383a2", }, } - require.NoError(self.T(), self.Sm.Start(StartSanityCheckService)) + require.NoError(self.T(), self.Sm.Start(sanity.StartSanityCheckService)) db := test_utils.GetMemoryDataStore(self.T(), self.ConfigObj) diff --git a/services/server_artifacts/server_artifacts_test.go b/services/server_artifacts/server_artifacts_test.go index 47838a3c136..29d40c6cdb3 100644 --- a/services/server_artifacts/server_artifacts_test.go +++ b/services/server_artifacts/server_artifacts_test.go @@ -88,7 +88,7 @@ func (self *ServerArtifactsTestSuite) ScheduleAndWait( }, func() { // Notify it about the new job notifier := services.GetNotifier() - err = notifier.NotifyListener(self.ConfigObj, "server") + err = notifier.NotifyListener(self.ConfigObj, "server", "") assert.NoError(self.T(), err) }) assert.NoError(self.T(), err) diff --git a/vql/filesystem/zip_test.go b/vql/filesystem/zip_test.go index 23f82d63b53..ac5ca91fd07 100644 --- a/vql/filesystem/zip_test.go +++ b/vql/filesystem/zip_test.go @@ -19,6 +19,8 @@ import ( vql_subsystem "www.velocidex.com/golang/velociraptor/vql" vfilter "www.velocidex.com/golang/vfilter" "www.velocidex.com/golang/vfilter/types" + + _ "www.velocidex.com/golang/velociraptor/result_sets/timed" ) type ZipTestSuite struct { diff --git a/vql/parsers/sqlite_test.go b/vql/parsers/sqlite_test.go index b8e15e39a84..a7445dd6e6f 100644 --- a/vql/parsers/sqlite_test.go +++ b/vql/parsers/sqlite_test.go @@ -22,6 +22,8 @@ import ( "www.velocidex.com/golang/velociraptor/services" vql_subsystem "www.velocidex.com/golang/velociraptor/vql" vfilter "www.velocidex.com/golang/vfilter" + + _ "www.velocidex.com/golang/velociraptor/result_sets/timed" ) type TestSuite struct { diff --git a/vql/server/artifacts.go b/vql/server/artifacts.go index 6223ed91e27..315d35a14ee 100644 --- a/vql/server/artifacts.go +++ b/vql/server/artifacts.go @@ -149,7 +149,8 @@ func (self *ScheduleCollectionFunction) Call(ctx context.Context, // Notify the client about it. notifier := services.GetNotifier() if notifier != nil { - notifier.NotifyListener(config_obj, arg.ClientId) + notifier.NotifyListener( + config_obj, arg.ClientId, "collect_client") } }) if err != nil { diff --git a/vql/server/clients/delete.go b/vql/server/clients/delete.go index 485f16dee3a..8b842b02a5c 100644 --- a/vql/server/clients/delete.go +++ b/vql/server/clients/delete.go @@ -133,7 +133,8 @@ func (self DeleteClientPlugin) Call(ctx context.Context, // it is already up. notifier := services.GetNotifier() if notifier != nil { - err = notifier.NotifyListener(config_obj, arg.ClientId) + err = notifier.NotifyListener( + config_obj, arg.ClientId, "DeleteClient") if err != nil { scope.Log("client_delete: %s", err) } diff --git a/vql/server/kill.go b/vql/server/kill.go index ddc1170e486..90ea140b7de 100644 --- a/vql/server/kill.go +++ b/vql/server/kill.go @@ -44,12 +44,6 @@ func (self *KillClientFunction) Call(ctx context.Context, return vfilter.Null{} } - config_obj, ok := vql_subsystem.GetServerConfig(scope) - if !ok { - scope.Log("Command can only run on the server") - return vfilter.Null{} - } - // Queue a cancellation message to the client for this flow // id. client_manager, err := services.GetClientInfoManager() @@ -61,12 +55,7 @@ func (self *KillClientFunction) Call(ctx context.Context, &crypto_proto.VeloMessage{ KillKillKill: &crypto_proto.Cancel{}, SessionId: constants.MONITORING_WELL_KNOWN_FLOW, - }, func() { - notifier := services.GetNotifier() - if notifier != nil { - notifier.NotifyListener(config_obj, arg.ClientId) - } - }) + }, true, nil) if err != nil { scope.Log("killkillkill: %s", err.Error()) return vfilter.Null{}