diff --git a/services/client_monitoring/client_monitoring.go b/services/client_monitoring/client_monitoring.go index b759e53a26f..94f4f58cb53 100644 --- a/services/client_monitoring/client_monitoring.go +++ b/services/client_monitoring/client_monitoring.go @@ -350,6 +350,31 @@ func (self *ClientEventTable) GetClientUpdateEventTableMessage( } } +func (self *ClientEventTable) ProcessServerMetadataModificationEvent( + ctx context.Context, + config_obj *config_proto.Config, + event *ordereddict.Dict) { + + // Only trigger on server metadata changes + client_id, pres := event.GetString("client_id") + if !pres || client_id != "server" { + return + } + + logger := logging.GetLogger(self.config_obj, &logging.FrontendComponent) + logger.Info("client_monitoring: Reloading table because server metadata was updated") + + err := self.load_from_file(ctx, config_obj) + if err != nil { + logger := logging.GetLogger( + config_obj, &logging.FrontendComponent) + logger.Error("self.setClientMonitoringState: %v", err) + } + + // Update version to reflect the new time. + self.state.Version = uint64(self.Clock.Now().UnixNano()) +} + func (self *ClientEventTable) ProcessArtifactModificationEvent( ctx context.Context, config_obj *config_proto.Config, event *ordereddict.Dict) { @@ -488,16 +513,28 @@ func NewClientMonitoringService( ctx, "Server.Internal.ArtifactModification", "client_monitoring_service") + metadata_mod_event, metadata_mod_event_cancel := journal.Watch( + ctx, "Server.Internal.MetadataModifications", + "client_monitoring_service") + wg.Add(1) go func() { defer wg.Done() defer cancel() + defer metadata_mod_event_cancel() for { select { case <-ctx.Done(): return + case event, ok := <-metadata_mod_event: + if !ok { + return + } + event_table.ProcessServerMetadataModificationEvent( + ctx, config_obj, event) + case event, ok := <-events: if !ok { return diff --git a/services/server_artifacts/collection_context.go b/services/server_artifacts/collection_context.go index cbcfc0f0c76..613f64b1907 100644 --- a/services/server_artifacts/collection_context.go +++ b/services/server_artifacts/collection_context.go @@ -67,7 +67,7 @@ func NewCollectionContextManager( flow_id := collection_context.SessionId sub_ctx, cancel := context.WithCancel(ctx) - log_writer, err := NewServerLogWriter(ctx, config_obj, flow_id) + log_writer, err := NewServerLogWriter(sub_ctx, config_obj, flow_id) if err != nil { return nil, err } @@ -259,15 +259,15 @@ func (self *contextManager) Cancel(ctx context.Context, principal string) { } self.mu.Unlock() + self.maybeSendCompletionMessage(ctx) self.cancel() - self.wg.Wait() - self.maybeSendCompletionMessage(ctx) } func (self *contextManager) Close(ctx context.Context) { - self.wg.Wait() self.maybeSendCompletionMessage(ctx) + self.cancel() + self.wg.Wait() } // Called when each query is completed. Will send the message once for diff --git a/services/server_artifacts/server_artifacts.go b/services/server_artifacts/server_artifacts.go index 2be00672a3a..0ccd46654c0 100644 --- a/services/server_artifacts/server_artifacts.go +++ b/services/server_artifacts/server_artifacts.go @@ -57,20 +57,22 @@ func (self *ServerArtifactRunner) LaunchServerArtifact( req *crypto_proto.FlowRequest, collection_context *flows_proto.ArtifactCollectorContext) error { - sub_ctx, cancel := context.WithCancel(self.ctx) collection_context_manager, err := NewCollectionContextManager( - sub_ctx, self.wg, config_obj, req, collection_context) + self.ctx, self.wg, config_obj, req, collection_context) if err != nil { return err } + sub_ctx, cancel := context.WithCancel(self.ctx) + collection_context_manager.StartRefresh(self.wg) self.wg.Add(1) go func() { defer self.wg.Done() defer cancel() - defer collection_context_manager.Save() + defer collection_context_manager.Close(self.ctx) + self.ProcessTask(sub_ctx, config_obj, session_id, collection_context_manager, req) }() @@ -100,6 +102,8 @@ func (self *ServerArtifactRunner) ProcessTask( collection_context CollectionContextManager, req *crypto_proto.FlowRequest) error { + defer collection_context.Close(ctx) + self.mu.Lock() self.in_flight_collections[session_id] = collection_context self.mu.Unlock() diff --git a/services/server_monitoring/server_monitoring.go b/services/server_monitoring/server_monitoring.go index f159faf0cb1..3046e336f6d 100644 --- a/services/server_monitoring/server_monitoring.go +++ b/services/server_monitoring/server_monitoring.go @@ -147,6 +147,25 @@ func (self *EventTable) ProcessArtifactModificationEvent( } +func (self *EventTable) ProcessServerMetadataModificationEvent( + ctx context.Context, + config_obj *config_proto.Config, + event *ordereddict.Dict) { + + client_id, pres := event.GetString("client_id") + if !pres || client_id != "server" { + return + } + + logger := logging.GetLogger(self.config_obj, &logging.FrontendComponent) + logger.Info("server_monitoring: Reloading table because server metadata was updated") + + request := self.Get() + + // Restart the queries + self.StartQueries(config_obj, request) +} + func (self *EventTable) Update( config_obj *config_proto.Config, principal string, @@ -161,7 +180,7 @@ func (self *EventTable) Update( } logger := logging.GetLogger(self.config_obj, &logging.FrontendComponent) - logger.Info("server_monitoring: Updating monitoring table") + logger.Info("server_monitoring: Updating monitoring table") // Now store the monitoring table on disk. db, err := datastore.GetDB(config_obj) @@ -467,14 +486,20 @@ func NewServerMonitoringService( if err != nil { return nil, err } + events, cancel := journal.Watch( ctx, "Server.Internal.ArtifactModification", "server_monitoring_service") + metadata_mod_event, metadata_mod_event_cancel := journal.Watch( + ctx, "Server.Internal.MetadataModifications", + "server_monitoring_service") + wg.Add(1) go func() { defer wg.Done() defer cancel() + defer metadata_mod_event_cancel() // Shut down all server queries in an orderly fasion defer manager.Close() @@ -484,6 +509,13 @@ func NewServerMonitoringService( case <-ctx.Done(): return + case event, ok := <-metadata_mod_event: + if !ok { + return + } + manager.ProcessServerMetadataModificationEvent( + ctx, config_obj, event) + case event, ok := <-events: if !ok { return