Skip to content

Commit

Permalink
Update flow active time when the result set is completed (#1468)
Browse files Browse the repository at this point in the history
With the MemcacheFileStore, writes are delayed so they may be
combined. Previously the collection context was written out before the
full result set was committed to storage causing a refresh issue in
the gui - the gui would not update when the collection completed and
show an empty result set.
  • Loading branch information
scudette committed Jan 10, 2022
1 parent f957cdd commit b7ef980
Show file tree
Hide file tree
Showing 15 changed files with 520 additions and 300 deletions.
18 changes: 9 additions & 9 deletions api/proto/api.pb.gw.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

501 changes: 257 additions & 244 deletions config/proto/config.pb.go

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions config/proto/config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -634,6 +634,9 @@ message DatastoreConfig {
// filesystems (default 100).
int64 memcache_write_mutation_writers = 6;

// How long to delay writes so they can be combined (default 1 sec)
int64 memcache_write_mutation_max_age = 9;

// Experimental - do not set in configs yet!
string minion_implementation = 7;
string master_implementation = 8;
Expand Down
14 changes: 9 additions & 5 deletions datastore/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,15 @@ func GetDB(config_obj *config_proto.Config) (DataStore, error) {
return nil, errors.New("no datastore configured")
}

return getImpl(config_obj.Datastore.Implementation, config_obj)
implementation, err := GetImplementationName(config_obj)
if err != nil {
return nil, err
}

return getImpl(implementation)
}

func getImpl(implementation string,
config_obj *config_proto.Config) (DataStore, error) {
func getImpl(implementation string) (DataStore, error) {
switch implementation {
case "FileBaseDataStore":
return file_based_imp, nil
Expand Down Expand Up @@ -154,7 +158,7 @@ func getImpl(implementation string,

default:
return nil, errors.New("no datastore implementation " +
config_obj.Datastore.Implementation)
implementation)
}
}

Expand All @@ -164,6 +168,6 @@ func SetGlobalDatastore(
ds_mu.Lock()
defer ds_mu.Unlock()

g_impl, err = getImpl(implementation, config_obj)
g_impl, err = getImpl(implementation)
return err
}
34 changes: 34 additions & 0 deletions datastore/memcache_file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"www.velocidex.com/golang/velociraptor/file_store/path_specs"
flows_proto "www.velocidex.com/golang/velociraptor/flows/proto"
"www.velocidex.com/golang/velociraptor/paths"
"www.velocidex.com/golang/velociraptor/utils"
"www.velocidex.com/golang/velociraptor/vtesting"
)

Expand Down Expand Up @@ -133,6 +134,39 @@ func (self MemcacheFileTestSuite) TestListChildren() {
assert.Equal(self.T(), children[0].AsClientPath(), "/a/b")
}

func (self MemcacheFileTestSuite) TestSetSubjectAndListChildren() {
db, ok := self.datastore.(*MemcacheFileDataStore)
assert.True(self.T(), ok)

// Setting the data ends up on the filesystem
client_id := "C.1234"
client_record := &api_proto.ClientMetadata{
ClientId: client_id,
}

// Write the file to the filesystem
urn := path_specs.NewSafeDatastorePath("a", "b")
err := file_based_imp.SetSubject(self.config_obj, urn, client_record)
assert.NoError(self.T(), err)

urn2 := path_specs.NewSafeDatastorePath("a", "d")
err = file_based_imp.SetSubject(self.config_obj, urn2, client_record)
assert.NoError(self.T(), err)

// Now set a file in an existing directory.
intermediate := path_specs.NewSafeDatastorePath("a", "e")
new_record := &api_proto.ClientMetadata{}
err = db.SetSubject(self.config_obj, intermediate, new_record)
assert.NoError(self.T(), err)

// Now list the memcache
first_level := path_specs.NewSafeDatastorePath("a")
children, err := db.ListChildren(self.config_obj, first_level)
assert.NoError(self.T(), err)
assert.Equal(self.T(), len(children), 3)
utils.Debug(children)
}

func TestMemCacheFileDatastore(t *testing.T) {
suite.Run(t, &MemcacheFileTestSuite{BaseTestSuite: BaseTestSuite{
datastore: NewMemcacheFileDataStore(),
Expand Down
23 changes: 23 additions & 0 deletions datastore/utils.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package datastore

import (
"errors"
"sync"

"google.golang.org/protobuf/proto"
Expand Down Expand Up @@ -74,3 +75,25 @@ func Walk(config_obj *config_proto.Config,

return nil
}

func GetImplementationName(
config_obj *config_proto.Config) (string, error) {
if config_obj.Datastore == nil {
return "", errors.New("Invalid datastore config")
}

if config_obj.Frontend == nil {
return config_obj.Datastore.Implementation, nil
}

if config_obj.Frontend.IsMinion &&
config_obj.Datastore.MinionImplementation != "" {
return config_obj.Datastore.MinionImplementation, nil
}

if config_obj.Datastore.MasterImplementation != "" {
return config_obj.Datastore.MasterImplementation, nil
}

return config_obj.Datastore.Implementation, nil
}
15 changes: 13 additions & 2 deletions file_store/file_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"sync"

config_proto "www.velocidex.com/golang/velociraptor/config/proto"
"www.velocidex.com/golang/velociraptor/datastore"
"www.velocidex.com/golang/velociraptor/file_store/accessors"
"www.velocidex.com/golang/velociraptor/file_store/api"
"www.velocidex.com/golang/velociraptor/file_store/directory"
Expand Down Expand Up @@ -54,7 +55,12 @@ func GetFileStore(config_obj *config_proto.Config) api.FileStore {
return nil
}

res, _ := getImpl(config_obj.Datastore.Implementation, config_obj)
implementation, err := datastore.GetImplementationName(config_obj)
if err != nil {
panic(err)
}

res, _ := getImpl(implementation, config_obj)
return res
}

Expand Down Expand Up @@ -100,7 +106,12 @@ func GetFileStoreFileSystemAccessor(
return nil, errors.New("Datastore not configured")
}

switch config_obj.Datastore.Implementation {
implementation, err := datastore.GetImplementationName(config_obj)
if err != nil {
return nil, err
}

switch implementation {

case "MemcacheFileDataStore":
return accessors.NewFileStoreFileSystemAccessor(
Expand Down
6 changes: 5 additions & 1 deletion file_store/memcache/memcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,11 @@ func NewMemcacheFileStore(config_obj *config_proto.Config) *MemcacheFileStore {
data_cache: ttlcache.NewCache(),
}

result.data_cache.SetTTL(5 * time.Second)
ttl := config_obj.Datastore.MemcacheWriteMutationMaxAge
if ttl == 0 {
ttl = 1
}
result.data_cache.SetTTL(time.Duration(ttl) * time.Second)

result.data_cache.SetNewItemCallback(func(key string, value interface{}) {
metricDataLRU.Inc()
Expand Down
7 changes: 6 additions & 1 deletion file_store/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"

config_proto "www.velocidex.com/golang/velociraptor/config/proto"
"www.velocidex.com/golang/velociraptor/datastore"
"www.velocidex.com/golang/velociraptor/file_store/api"
"www.velocidex.com/golang/velociraptor/file_store/directory"
"www.velocidex.com/golang/velociraptor/file_store/memory"
Expand All @@ -18,8 +19,12 @@ func GetQueueManager(config_obj *config_proto.Config) (api.QueueManager, error)
}

file_store := GetFileStore(config_obj)
implementation, err := datastore.GetImplementationName(config_obj)
if err != nil {
return nil, err
}

switch config_obj.Datastore.Implementation {
switch implementation {

// For now everyone uses an in-memory queue manager.
case "Test":
Expand Down
6 changes: 6 additions & 0 deletions flows/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,12 @@ func GetFlowDetails(
return nil, err
}

ping := &flows_proto.PingContext{}
err = db.GetSubject(config_obj, flow_path_manager.Ping(), ping)
if err == nil && ping.ActiveTime > collection_context.ActiveTime {
collection_context.ActiveTime = ping.ActiveTime
}

availableDownloads, _ := availableDownloadFiles(config_obj, client_id, flow_id)
return &api_proto.FlowDetails{
Context: collection_context,
Expand Down
32 changes: 31 additions & 1 deletion flows/artifacts.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ func NewCollectionContext(config_obj *config_proto.Config) *CollectionContext {
self.mu.Lock()
defer self.mu.Unlock()

// Mark the collection as updated.
updateContext(config_obj, self.ClientId, self.SessionId)

if !self.send_update {
return
}
Expand All @@ -120,12 +123,34 @@ func NewCollectionContext(config_obj *config_proto.Config) *CollectionContext {
journal.PushRowsToArtifactAsync(
config_obj, row, "System.Flow.Completion")
}

})

return self
}

// Flush the context object to disk. This must happen AFTER all data
// is written
func updateContext(
config_obj *config_proto.Config,
client_id, flow_id string) error {

db, err := datastore.GetDB(config_obj)
if err != nil {
return err
}

ping_record := &flows_proto.PingContext{
ActiveTime: uint64(time.Now().UnixNano() / 1000),
}

flow_path_manager := paths.NewFlowPathManager(client_id, flow_id)

// Just a blind write.
return db.SetSubjectWithCompletion(
config_obj, flow_path_manager.Ping(),
ping_record, nil)
}

// closeContext is called after all messages from the clients are
// processed in this group. Client messages are sent in groups inside
// the same POST request. Most of the time they belong to the same
Expand Down Expand Up @@ -164,6 +189,7 @@ func closeContext(
collection_context.StartTime = uint64(time.Now().UnixNano() / 1000)
}

// Mark the flow as last active now.
collection_context.ActiveTime = uint64(time.Now().UnixNano() / 1000)

// Figure out if we will send a System.Flow.Completion after
Expand All @@ -183,6 +209,7 @@ func closeContext(

// Instruct the completion function to send the message.
collection_context.send_update = true
collection_context.Dirty = true
}

if len(collection_context.Logs) > 0 {
Expand All @@ -191,6 +218,7 @@ func closeContext(
if err != nil {
collection_context.State = flows_proto.ArtifactCollectorContext_ERROR
collection_context.Status = err.Error()
collection_context.Dirty = true
}
}

Expand All @@ -200,6 +228,7 @@ func closeContext(
if err != nil {
collection_context.State = flows_proto.ArtifactCollectorContext_ERROR
collection_context.Status = err.Error()
collection_context.Dirty = true
}
}

Expand All @@ -208,6 +237,7 @@ func closeContext(
if err != nil {
collection_context.State = flows_proto.ArtifactCollectorContext_ERROR
collection_context.Status = err.Error()
collection_context.Dirty = true
}
}

Expand Down
Loading

0 comments on commit b7ef980

Please sign in to comment.