Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

0.6.7 sync3 #2256

Merged
merged 13 commits into from
Nov 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 21 additions & 12 deletions accessors/collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ func (self StatWrapper) OSPath() *accessors.OSPath {
type CollectorAccessor struct {
*zip.ZipFileSystemAccessor
scope vfilter.Scope

// If set we automatically pad out sparse files.
expandSparse bool
}

func (self *CollectorAccessor) New(scope vfilter.Scope) (accessors.FileSystemAccessor, error) {
Expand Down Expand Up @@ -319,17 +322,18 @@ func (self *CollectorAccessor) OpenWithOSPath(
return nil, err
}

index, err := self.getIndex(updated_full_path)
if err == nil {
return &rangedReader{
delegate: &utils.RangedReader{
ReaderAt: utils.MakeReaderAtter(reader),
Index: index,
},
fd: reader,
}, nil
if self.expandSparse {
index, err := self.getIndex(updated_full_path)
if err == nil {
return &rangedReader{
delegate: &utils.RangedReader{
ReaderAt: utils.MakeReaderAtter(reader),
Index: index,
},
fd: reader,
}, nil
}
}

return reader, nil
}

Expand Down Expand Up @@ -394,6 +398,11 @@ func (self *CollectorAccessor) ReadDirWithOSPath(
}

func init() {
accessors.Register("collector", &CollectorAccessor{},
`Open a collector zip file as if it was a directory.`)
accessors.Register("collector", &CollectorAccessor{
expandSparse: true,
}, `Open a collector zip file as if it was a directory - automatically expand sparse files.`)

accessors.Register("collector_sparse", &CollectorAccessor{
expandSparse: false,
}, `Open a collector zip file as if it was a directory - does not expand sparse files.`)
}
182 changes: 102 additions & 80 deletions actions/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,31 +41,36 @@ import (
)

var (
GlobalEventTable = &EventTable{
ctx: context.Background(),
}
mu sync.Mutex
// Keep track of inflight queries for shutdown. This wait group
// belongs to the client's service manager. As we issue queries we
// increment it and when queries are done we decrement it. The
// service manager will wait for all inflight queries to exit
// before exiting allowing the client to shut down in an orderly
// fashion.
mu sync.Mutex
service_wg *sync.WaitGroup
service_ctx context.Context = context.Background()

GlobalEventTable *EventTable
)

type EventTable struct {
Events []*actions_proto.VQLCollectorArgs
version uint64
mu sync.Mutex

ctx context.Context
config_obj *config_proto.Config
// Context for cancelling all inflight queries in this event
// table.
Ctx context.Context
cancel func()
wg sync.WaitGroup

// This will be closed to signal that we need to abort the
// current event queries.
Done chan bool
wg sync.WaitGroup
// The event table currently running
Events []*actions_proto.VQLCollectorArgs

// Keep track of inflight queries for shutdown. This wait
// group belongs to the client's service manager. As we issue
// queries we increment it and when queries are done we
// decrement it. The service manager will wait for this before
// exiting allowing the client to shut down in an orderly
// fashion.
service_wg *sync.WaitGroup
// The version of this event table - we only update from the
// server if the server's event table is newer.
version uint64

config_obj *config_proto.Config
}

// Determine if the current table is the same as the new set of
Expand Down Expand Up @@ -105,59 +110,73 @@ func (self *EventTable) equal(events []*actions_proto.VQLCollectorArgs) bool {

// Teardown all the current quries. Blocks until they all shut down.
func (self *EventTable) Close() {
logger := logging.GetLogger(self.config_obj, &logging.ClientComponent)
logger.Info("Closing EventTable\n")
self.mu.Lock()
defer self.mu.Unlock()

self.close()
}

// Actually close the table without lock
func (self *EventTable) close() {
if self.config_obj != nil {
logger := logging.GetLogger(self.config_obj, &logging.ClientComponent)
logger.Info("Closing EventTable\n")
}

self.cancel()

close(self.Done)
// Wait until the queries have completed.
self.wg.Wait()

// Clear the list of events we are tracking - we are an empty
// event table right now - so further updates will restart the
// queries again.
self.Events = nil
}

func GlobalEventTableVersion() uint64 {
mu.Lock()
defer mu.Unlock()
func (self *EventTable) Version() uint64 {
self.mu.Lock()
defer self.mu.Unlock()

return self.version
}

return GlobalEventTable.version
func GlobalEventTableVersion() uint64 {
return GlobalEventTable.Version()
}

func update(
func (self *EventTable) Update(
config_obj *config_proto.Config,
responder *responder.Responder,
table *actions_proto.VQLEventTable) (*EventTable, error, bool) {

mu.Lock()
defer mu.Unlock()
self.mu.Lock()
defer self.mu.Unlock()

// Only update the event table if we need to.
if table.Version <= GlobalEventTable.version {
return GlobalEventTable, nil, false
if table.Version <= self.version {
return self, nil, false
}

// If the new update is identical to the old queries we wont
// restart. This can happen e.g. if the server changes label
// groups and recaculates the table version but the actual
// queries dont end up changing.
if GlobalEventTable.equal(table.Event) {
if self.equal(table.Event) {
logger := logging.GetLogger(config_obj, &logging.ClientComponent)
logger.Info("Client event query update %v did not "+
"change queries, skipping", table.Version)

// Update the version only but keep queries the same.
GlobalEventTable.version = table.Version
return GlobalEventTable, nil, false
}

// Close the old table.
if GlobalEventTable.Done != nil {
GlobalEventTable.Close()
self.version = table.Version
return self, nil, false
}

// Reset the table.
GlobalEventTable.Events = table.Event
GlobalEventTable.version = table.Version
GlobalEventTable.Done = make(chan bool)
GlobalEventTable.config_obj = config_obj
GlobalEventTable.service_wg = &sync.WaitGroup{}
// Close the old table and wait for it to finish.
self.close()

// Reset the table with the new queries.
GlobalEventTable = NewEventTable(config_obj, responder, table)
return GlobalEventTable, nil, true /* changed */
}

Expand All @@ -169,8 +188,8 @@ func (self UpdateEventTable) Run(
responder *responder.Responder,
arg *actions_proto.VQLEventTable) {

// Make a new table.
table, err, changed := update(config_obj, responder, arg)
// Make a new table if needed.
table, err, changed := GlobalEventTable.Update(config_obj, responder, arg)
if err != nil {
responder.RaiseError(ctx, fmt.Sprintf(
"Error updating global event table: %v", err))
Expand All @@ -192,32 +211,17 @@ func (self UpdateEventTable) Run(

logger := logging.GetLogger(config_obj, &logging.ClientComponent)

// Make a context for the VQL query. It will be destroyed on shut
// down when the global event table is done.
new_ctx, cancel := context.WithCancel(GlobalEventTable.ctx)

// Cancel the context when the cancel channel is closed.
go func() {
mu.Lock()
done := table.Done
mu.Unlock()

<-done
logger.Info("UpdateEventTable: Closing all contexts")
cancel()
}()

// Start a new query for each event.
action_obj := &VQLClientAction{}
table.wg.Add(len(table.Events))
table.service_wg.Add(len(table.Events))
service_wg.Add(len(table.Events))

for _, event := range table.Events {
query_responder := responder.Copy()

go func(event *actions_proto.VQLCollectorArgs) {
defer table.wg.Done()
defer table.service_wg.Done()
defer service_wg.Done()

// Name of the query we are running.
name := ""
Expand All @@ -243,8 +247,10 @@ func (self UpdateEventTable) Run(
event.Heartbeat = 300 // 5 minutes
}

// Start the query - if it is an event query this will
// never complete until it is cancelled.
action_obj.StartQuery(
config_obj, new_ctx, query_responder, event)
config_obj, table.Ctx, query_responder, event)
if name != "" {
logger.Info("Finished monitoring query %s", name)
}
Expand Down Expand Up @@ -274,45 +280,61 @@ func update_writeback(
}

func NewEventTable(
ctx context.Context,
config_obj *config_proto.Config,
responder *responder.Responder,
table *actions_proto.VQLEventTable) *EventTable {

sub_ctx, cancel := context.WithCancel(service_ctx)

result := &EventTable{
Events: table.Event,
version: table.Version,
Done: make(chan bool),
Ctx: sub_ctx,
cancel: cancel,
config_obj: config_obj,
ctx: ctx,
}

return result
}

// Called by the service manager to initialize the global event table.
func InitializeEventTable(
// This is the context of the service - its lifetime represents
// the lifetime of the entire application.
ctx context.Context,
config_obj *config_proto.Config,
output_chan chan *crypto_proto.VeloMessage,
service_wg *sync.WaitGroup) {
wg *sync.WaitGroup) {

mu.Lock()
service_ctx = ctx
service_wg = wg

// Remove any old tables if needed.
if GlobalEventTable != nil {
GlobalEventTable.Close()
}

// Create an empty table
GlobalEventTable = NewEventTable(
ctx, config_obj,
config_obj,
responder.NewResponder(
config_obj, &crypto_proto.VeloMessage{}, output_chan),
&actions_proto.VQLEventTable{})
GlobalEventTable.service_wg = service_wg
mu.Unlock()

// When the context is finished, tear down the event table.
go func() {
go func(table *EventTable, ctx context.Context) {
<-ctx.Done()
table.Close()
}(GlobalEventTable, service_ctx)

mu.Lock()
if GlobalEventTable.Done != nil {
close(GlobalEventTable.Done)
GlobalEventTable.Done = nil
}
mu.Unlock()
}()
mu.Unlock()
}

func init() {
ctx, cancel := context.WithCancel(context.Background())
GlobalEventTable = &EventTable{
Ctx: ctx,
cancel: cancel,
}
}
1 change: 0 additions & 1 deletion actions/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ func (self *EventsTestSuite) SetupTest() {
self.responder = responder.TestResponder()

actions.GlobalEventTable = actions.NewEventTable(
context.Background(),
self.ConfigObj, self.responder,
&actions_proto.VQLEventTable{})
}
Expand Down
6 changes: 4 additions & 2 deletions actions/throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ var (
// A Global stats collector is always running. When throttlers
// register with it they can read the data.
stats *statsCollector

throttle_mu sync.Mutex
)

type sample struct {
Expand Down Expand Up @@ -252,8 +254,8 @@ func NewThrottler(
return &DummyThrottler{}
}

mu.Lock()
defer mu.Unlock()
throttle_mu.Lock()
defer throttle_mu.Unlock()

if stats == nil {
var err error
Expand Down
11 changes: 10 additions & 1 deletion api/csv.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func getTable(

rows := uint64(0)
if in.Rows == 0 {
in.Rows = 500
in.Rows = 2000
}

result := &api_proto.GetTableResponse{
Expand Down Expand Up @@ -198,6 +198,15 @@ func getPathSpec(
return paths.NewNotebookPathManager(in.NotebookId).Cell(
in.CellId).Logs(), nil

// Handle dashboards specially. Dashboards are kind of
// non-interactive notebook stored in a special notebook ID
// called "Dashboards". Cells within the dashboard correspond
// to different artifacts. Dashboard cells are recalculated
// each time they are viewed.
} else if in.NotebookId == "Dashboards" && in.CellId != "" {
return paths.NewDashboardPathManager(in.Type, in.CellId, in.ClientId).
QueryStorage(in.TableId).Path(), nil

} else if in.NotebookId != "" && in.CellId != "" {
return paths.NewNotebookPathManager(in.NotebookId).Cell(
in.CellId).QueryStorage(in.TableId).Path(), nil
Expand Down
Loading