Skip to content

Commit

Permalink
Trigger client and server monitoring table rebuild (Velocidex#2501)
Browse files Browse the repository at this point in the history
When server metadata is updated.
  • Loading branch information
scudette authored Mar 6, 2023
1 parent 46ffb9a commit 533acab
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 8 deletions.
37 changes: 37 additions & 0 deletions services/client_monitoring/client_monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,31 @@ func (self *ClientEventTable) GetClientUpdateEventTableMessage(
}
}

func (self *ClientEventTable) ProcessServerMetadataModificationEvent(
ctx context.Context,
config_obj *config_proto.Config,
event *ordereddict.Dict) {

// Only trigger on server metadata changes
client_id, pres := event.GetString("client_id")
if !pres || client_id != "server" {
return
}

logger := logging.GetLogger(self.config_obj, &logging.FrontendComponent)
logger.Info("<green>client_monitoring</>: Reloading table because server metadata was updated")

err := self.load_from_file(ctx, config_obj)
if err != nil {
logger := logging.GetLogger(
config_obj, &logging.FrontendComponent)
logger.Error("self.setClientMonitoringState: %v", err)
}

// Update version to reflect the new time.
self.state.Version = uint64(self.Clock.Now().UnixNano())
}

func (self *ClientEventTable) ProcessArtifactModificationEvent(
ctx context.Context,
config_obj *config_proto.Config, event *ordereddict.Dict) {
Expand Down Expand Up @@ -488,16 +513,28 @@ func NewClientMonitoringService(
ctx, "Server.Internal.ArtifactModification",
"client_monitoring_service")

metadata_mod_event, metadata_mod_event_cancel := journal.Watch(
ctx, "Server.Internal.MetadataModifications",
"client_monitoring_service")

wg.Add(1)
go func() {
defer wg.Done()
defer cancel()
defer metadata_mod_event_cancel()

for {
select {
case <-ctx.Done():
return

case event, ok := <-metadata_mod_event:
if !ok {
return
}
event_table.ProcessServerMetadataModificationEvent(
ctx, config_obj, event)

case event, ok := <-events:
if !ok {
return
Expand Down
8 changes: 4 additions & 4 deletions services/server_artifacts/collection_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func NewCollectionContextManager(

flow_id := collection_context.SessionId
sub_ctx, cancel := context.WithCancel(ctx)
log_writer, err := NewServerLogWriter(ctx, config_obj, flow_id)
log_writer, err := NewServerLogWriter(sub_ctx, config_obj, flow_id)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -259,15 +259,15 @@ func (self *contextManager) Cancel(ctx context.Context, principal string) {
}
self.mu.Unlock()

self.maybeSendCompletionMessage(ctx)
self.cancel()

self.wg.Wait()
self.maybeSendCompletionMessage(ctx)
}

func (self *contextManager) Close(ctx context.Context) {
self.wg.Wait()
self.maybeSendCompletionMessage(ctx)
self.cancel()
self.wg.Wait()
}

// Called when each query is completed. Will send the message once for
Expand Down
10 changes: 7 additions & 3 deletions services/server_artifacts/server_artifacts.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,20 +57,22 @@ func (self *ServerArtifactRunner) LaunchServerArtifact(
req *crypto_proto.FlowRequest,
collection_context *flows_proto.ArtifactCollectorContext) error {

sub_ctx, cancel := context.WithCancel(self.ctx)
collection_context_manager, err := NewCollectionContextManager(
sub_ctx, self.wg, config_obj, req, collection_context)
self.ctx, self.wg, config_obj, req, collection_context)
if err != nil {
return err
}

sub_ctx, cancel := context.WithCancel(self.ctx)

collection_context_manager.StartRefresh(self.wg)

self.wg.Add(1)
go func() {
defer self.wg.Done()
defer cancel()
defer collection_context_manager.Save()
defer collection_context_manager.Close(self.ctx)

self.ProcessTask(sub_ctx, config_obj,
session_id, collection_context_manager, req)
}()
Expand Down Expand Up @@ -100,6 +102,8 @@ func (self *ServerArtifactRunner) ProcessTask(
collection_context CollectionContextManager,
req *crypto_proto.FlowRequest) error {

defer collection_context.Close(ctx)

self.mu.Lock()
self.in_flight_collections[session_id] = collection_context
self.mu.Unlock()
Expand Down
34 changes: 33 additions & 1 deletion services/server_monitoring/server_monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,25 @@ func (self *EventTable) ProcessArtifactModificationEvent(

}

func (self *EventTable) ProcessServerMetadataModificationEvent(
ctx context.Context,
config_obj *config_proto.Config,
event *ordereddict.Dict) {

client_id, pres := event.GetString("client_id")
if !pres || client_id != "server" {
return
}

logger := logging.GetLogger(self.config_obj, &logging.FrontendComponent)
logger.Info("server_monitoring: Reloading table because server metadata was updated")

request := self.Get()

// Restart the queries
self.StartQueries(config_obj, request)
}

func (self *EventTable) Update(
config_obj *config_proto.Config,
principal string,
Expand All @@ -161,7 +180,7 @@ func (self *EventTable) Update(
}

logger := logging.GetLogger(self.config_obj, &logging.FrontendComponent)
logger.Info("server_monitoring: Updating monitoring table")
logger.Info("<green>server_monitoring</>: Updating monitoring table")

// Now store the monitoring table on disk.
db, err := datastore.GetDB(config_obj)
Expand Down Expand Up @@ -467,14 +486,20 @@ func NewServerMonitoringService(
if err != nil {
return nil, err
}

events, cancel := journal.Watch(
ctx, "Server.Internal.ArtifactModification",
"server_monitoring_service")

metadata_mod_event, metadata_mod_event_cancel := journal.Watch(
ctx, "Server.Internal.MetadataModifications",
"server_monitoring_service")

wg.Add(1)
go func() {
defer wg.Done()
defer cancel()
defer metadata_mod_event_cancel()

// Shut down all server queries in an orderly fasion
defer manager.Close()
Expand All @@ -484,6 +509,13 @@ func NewServerMonitoringService(
case <-ctx.Done():
return

case event, ok := <-metadata_mod_event:
if !ok {
return
}
manager.ProcessServerMetadataModificationEvent(
ctx, config_obj, event)

case event, ok := <-events:
if !ok {
return
Expand Down

0 comments on commit 533acab

Please sign in to comment.