Skip to content

Commit

Permalink
Fix context management in event table updates. (Velocidex#2252)
Browse files Browse the repository at this point in the history
Fixes a deadlock with the pool client that restarts the event table
lot.
  • Loading branch information
scudette authored Nov 17, 2022
1 parent cacb5e2 commit ea04be3
Show file tree
Hide file tree
Showing 8 changed files with 186 additions and 128 deletions.
181 changes: 99 additions & 82 deletions actions/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,31 +41,36 @@ import (
)

var (
GlobalEventTable = &EventTable{
ctx: context.Background(),
}
mu sync.Mutex
// Keep track of inflight queries for shutdown. This wait group
// belongs to the client's service manager. As we issue queries we
// increment it and when queries are done we decrement it. The
// service manager will wait for all inflight queries to exit
// before exiting allowing the client to shut down in an orderly
// fashion.
mu sync.Mutex
service_wg *sync.WaitGroup
service_ctx context.Context = context.Background()

GlobalEventTable *EventTable
)

type EventTable struct {
Events []*actions_proto.VQLCollectorArgs
version uint64
mu sync.Mutex

ctx context.Context
config_obj *config_proto.Config
// Context for cancelling all inflight queries in this event
// table.
Ctx context.Context
cancel func()
wg sync.WaitGroup

// This will be closed to signal that we need to abort the
// current event queries.
Done chan bool
wg sync.WaitGroup
// The event table currently running
Events []*actions_proto.VQLCollectorArgs

// Keep track of inflight queries for shutdown. This wait
// group belongs to the client's service manager. As we issue
// queries we increment it and when queries are done we
// decrement it. The service manager will wait for this before
// exiting allowing the client to shut down in an orderly
// fashion.
service_wg *sync.WaitGroup
// The version of this event table - we only update from the
// server if the server's event table is newer.
version uint64

config_obj *config_proto.Config
}

// Determine if the current table is the same as the new set of
Expand Down Expand Up @@ -105,59 +110,68 @@ func (self *EventTable) equal(events []*actions_proto.VQLCollectorArgs) bool {

// Teardown all the current quries. Blocks until they all shut down.
func (self *EventTable) Close() {
logger := logging.GetLogger(self.config_obj, &logging.ClientComponent)
logger.Info("Closing EventTable\n")
self.mu.Lock()
defer self.mu.Unlock()

self.close()
}

// Actually close the table without lock
func (self *EventTable) close() {
if self.config_obj != nil {
logger := logging.GetLogger(self.config_obj, &logging.ClientComponent)
logger.Info("Closing EventTable\n")
}

self.cancel()

close(self.Done)
// Wait until the queries have completed.
self.wg.Wait()
}

func GlobalEventTableVersion() uint64 {
mu.Lock()
defer mu.Unlock()
func (self *EventTable) Version() uint64 {
self.mu.Lock()
defer self.mu.Unlock()

return GlobalEventTable.version
return self.version
}

func GlobalEventTableVersion() uint64 {
return GlobalEventTable.Version()
}

func update(
func (self *EventTable) Update(
config_obj *config_proto.Config,
responder *responder.Responder,
table *actions_proto.VQLEventTable) (*EventTable, error, bool) {

mu.Lock()
defer mu.Unlock()
self.mu.Lock()
defer self.mu.Unlock()

// Only update the event table if we need to.
if table.Version <= GlobalEventTable.version {
return GlobalEventTable, nil, false
if table.Version <= self.version {
return self, nil, false
}

// If the new update is identical to the old queries we wont
// restart. This can happen e.g. if the server changes label
// groups and recaculates the table version but the actual
// queries dont end up changing.
if GlobalEventTable.equal(table.Event) {
if self.equal(table.Event) {
logger := logging.GetLogger(config_obj, &logging.ClientComponent)
logger.Info("Client event query update %v did not "+
"change queries, skipping", table.Version)

// Update the version only but keep queries the same.
GlobalEventTable.version = table.Version
return GlobalEventTable, nil, false
}

// Close the old table.
if GlobalEventTable.Done != nil {
GlobalEventTable.Close()
self.version = table.Version
return self, nil, false
}

// Reset the table.
GlobalEventTable.Events = table.Event
GlobalEventTable.version = table.Version
GlobalEventTable.Done = make(chan bool)
GlobalEventTable.config_obj = config_obj
GlobalEventTable.service_wg = &sync.WaitGroup{}
// Close the old table and wait for it to finish.
self.close()

// Reset the table with the new queries.
GlobalEventTable = NewEventTable(config_obj, responder, table)
return GlobalEventTable, nil, true /* changed */
}

Expand All @@ -169,8 +183,8 @@ func (self UpdateEventTable) Run(
responder *responder.Responder,
arg *actions_proto.VQLEventTable) {

// Make a new table.
table, err, changed := update(config_obj, responder, arg)
// Make a new table if needed.
table, err, changed := GlobalEventTable.Update(config_obj, responder, arg)
if err != nil {
responder.RaiseError(ctx, fmt.Sprintf(
"Error updating global event table: %v", err))
Expand All @@ -192,32 +206,17 @@ func (self UpdateEventTable) Run(

logger := logging.GetLogger(config_obj, &logging.ClientComponent)

// Make a context for the VQL query. It will be destroyed on shut
// down when the global event table is done.
new_ctx, cancel := context.WithCancel(GlobalEventTable.ctx)

// Cancel the context when the cancel channel is closed.
go func() {
mu.Lock()
done := table.Done
mu.Unlock()

<-done
logger.Info("UpdateEventTable: Closing all contexts")
cancel()
}()

// Start a new query for each event.
action_obj := &VQLClientAction{}
table.wg.Add(len(table.Events))
table.service_wg.Add(len(table.Events))
service_wg.Add(len(table.Events))

for _, event := range table.Events {
query_responder := responder.Copy()

go func(event *actions_proto.VQLCollectorArgs) {
defer table.wg.Done()
defer table.service_wg.Done()
defer service_wg.Done()

// Name of the query we are running.
name := ""
Expand All @@ -243,8 +242,10 @@ func (self UpdateEventTable) Run(
event.Heartbeat = 300 // 5 minutes
}

// Start the query - if it is an event query this will
// never complete until it is cancelled.
action_obj.StartQuery(
config_obj, new_ctx, query_responder, event)
config_obj, table.Ctx, query_responder, event)
if name != "" {
logger.Info("Finished monitoring query %s", name)
}
Expand All @@ -253,12 +254,12 @@ func (self UpdateEventTable) Run(

err = update_writeback(config_obj, arg)
if err != nil {
responder.RaiseError(new_ctx, fmt.Sprintf(
responder.RaiseError(ctx, fmt.Sprintf(
"Unable to write events to writeback: %v", err))
return
}

responder.Return(new_ctx)
responder.Return(ctx)
}

func update_writeback(
Expand All @@ -274,45 +275,61 @@ func update_writeback(
}

func NewEventTable(
ctx context.Context,
config_obj *config_proto.Config,
responder *responder.Responder,
table *actions_proto.VQLEventTable) *EventTable {

sub_ctx, cancel := context.WithCancel(service_ctx)

result := &EventTable{
Events: table.Event,
version: table.Version,
Done: make(chan bool),
Ctx: sub_ctx,
cancel: cancel,
config_obj: config_obj,
ctx: ctx,
}

return result
}

// Called by the service manager to initialize the global event table.
func InitializeEventTable(
// This is the context of the service - its lifetime represents
// the lifetime of the entire application.
ctx context.Context,
config_obj *config_proto.Config,
output_chan chan *crypto_proto.VeloMessage,
service_wg *sync.WaitGroup) {
wg *sync.WaitGroup) {

mu.Lock()
service_ctx = ctx
service_wg = wg

// Remove any old tables if needed.
if GlobalEventTable != nil {
GlobalEventTable.Close()
}

// Create an empty table
GlobalEventTable = NewEventTable(
ctx, config_obj,
config_obj,
responder.NewResponder(
config_obj, &crypto_proto.VeloMessage{}, output_chan),
&actions_proto.VQLEventTable{})
GlobalEventTable.service_wg = service_wg
mu.Unlock()

// When the context is finished, tear down the event table.
go func() {
go func(table *EventTable, ctx context.Context) {
<-ctx.Done()
table.Close()
}(GlobalEventTable, service_ctx)

mu.Lock()
if GlobalEventTable.Done != nil {
close(GlobalEventTable.Done)
GlobalEventTable.Done = nil
}
mu.Unlock()
}()
mu.Unlock()
}

func init() {
ctx, cancel := context.WithCancel(context.Background())
GlobalEventTable = &EventTable{
Ctx: ctx,
cancel: cancel,
}
}
1 change: 0 additions & 1 deletion actions/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ func (self *EventsTestSuite) SetupTest() {
self.responder = responder.TestResponder()

actions.GlobalEventTable = actions.NewEventTable(
context.Background(),
self.ConfigObj, self.responder,
&actions_proto.VQLEventTable{})
}
Expand Down
6 changes: 4 additions & 2 deletions actions/throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ var (
// A Global stats collector is always running. When throttlers
// register with it they can read the data.
stats *statsCollector

throttle_mu sync.Mutex
)

type sample struct {
Expand Down Expand Up @@ -252,8 +254,8 @@ func NewThrottler(
return &DummyThrottler{}
}

mu.Lock()
defer mu.Unlock()
throttle_mu.Lock()
defer throttle_mu.Unlock()

if stats == nil {
var err error
Expand Down
8 changes: 6 additions & 2 deletions executor/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,11 @@ func (self *PoolClientExecutor) ProcessRequest(
LogMessage: resp.LogMessage,
Status: resp.Status,
}
self.Outbound <- response
select {
case <-ctx.Done():
return
case self.Outbound <- response:
}
}
return
}
Expand Down Expand Up @@ -236,7 +240,7 @@ func NewPoolClientExecutor(
}

// Register the new executor with the global pool responder.
g_responder := responder.GlobalPoolEventResponder
g_responder := responder.GetPoolEventResponder(ctx)
g_responder.RegisterPoolClientResponder(id, exe.Outbound)

output := make(chan *crypto_proto.VeloMessage, 10)
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ require (
howett.net/plist v1.0.0
www.velocidex.com/golang/evtx v0.2.1-0.20220404133451-1fdf8be7325e
www.velocidex.com/golang/go-ese v0.1.1-0.20220107095505-c38622559671
www.velocidex.com/golang/go-ntfs v0.1.2-0.20221111030807-38ad1c5b571b
www.velocidex.com/golang/go-ntfs v0.1.2-0.20221117122413-b97c856cb140
www.velocidex.com/golang/go-pe v0.1.1-0.20220506020923-9fac492a9b0d
www.velocidex.com/golang/go-prefetch v0.0.0-20220801101854-338dbe61982a
www.velocidex.com/golang/oleparse v0.0.0-20220617011920-94df2342d0b7
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1187,8 +1187,8 @@ www.velocidex.com/golang/evtx v0.2.1-0.20220404133451-1fdf8be7325e h1:AhcXPgNKhJ
www.velocidex.com/golang/evtx v0.2.1-0.20220404133451-1fdf8be7325e/go.mod h1:ykEQ7AUF9AL+mfCefDmLwmZOnU2So6wM3qKM8xdsHhU=
www.velocidex.com/golang/go-ese v0.1.1-0.20220107095505-c38622559671 h1:pfvo7NFo0eJj6Zr7d+4vMx/Zr2JriMMPEWRHUf1YjUw=
www.velocidex.com/golang/go-ese v0.1.1-0.20220107095505-c38622559671/go.mod h1:qnzHyB9yD2khtYO+wf3ck9FQxX3wFhXeJHFBnuUIZcc=
www.velocidex.com/golang/go-ntfs v0.1.2-0.20221111030807-38ad1c5b571b h1:O9VAiPNf9DLAfzsH9IUduDQSIsPfexMjt9OO94Q5cSk=
www.velocidex.com/golang/go-ntfs v0.1.2-0.20221111030807-38ad1c5b571b/go.mod h1:itvbHQcnLdTVIDY6fI3lR0zeBwXwBYBdUFtswE0x1vc=
www.velocidex.com/golang/go-ntfs v0.1.2-0.20221117122413-b97c856cb140 h1:cRKPEhkLDHT98D3lS7SvRGgLkvhpiHJFGgQXRlCOJ9w=
www.velocidex.com/golang/go-ntfs v0.1.2-0.20221117122413-b97c856cb140/go.mod h1:itvbHQcnLdTVIDY6fI3lR0zeBwXwBYBdUFtswE0x1vc=
www.velocidex.com/golang/go-pe v0.1.1-0.20220107093716-e91743c801de/go.mod h1:j9Xy8Z9wxzY2SCB8CqDkkoSzy+eUwevnOrRm/XM2q/A=
www.velocidex.com/golang/go-pe v0.1.1-0.20220506020923-9fac492a9b0d h1:OQKwxK0O4a/8YTmfkQNzUspyrvlpRbLi318L08DC0oY=
www.velocidex.com/golang/go-pe v0.1.1-0.20220506020923-9fac492a9b0d/go.mod h1:TPJ3phbAuZIu7XuPyNqgoP2k3P+eNHfHHGcivhcsxaA=
Expand Down
Loading

0 comments on commit ea04be3

Please sign in to comment.