Skip to content

Commit

Permalink
Deduplicate glob hits (Velocidex#2490)
Browse files Browse the repository at this point in the history
Where multiple glob expressions were used and the same file was found
by multiple expressions the path was not properly deduplicated.

Also improved locking on the upload deduplication. When one thread is
uploading, other threads should wait on it to complete.

Raw Registry accessor repeated the names of keys that have values of
the same name (i.e. default value for a key name). This go dudplicated
leading to loss of the value name. This fix returns a RawRegValueInfo
object which also behaves as a key (looks like a directory) so it can
be recursed into but also can be read as a Value (file).

Also this PR refactores the server artifact runner so it does not
require scheduling a task but it gets launched immediately.
  • Loading branch information
scudette authored Mar 3, 2023
1 parent b28dfd2 commit 408cc1d
Show file tree
Hide file tree
Showing 20 changed files with 352 additions and 227 deletions.
32 changes: 31 additions & 1 deletion accessors/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"io"
"os"
"strings"
"sync"
"time"

"github.com/Velocidex/ordereddict"
Expand Down Expand Up @@ -39,6 +40,8 @@ type PathManipulator interface {
}

type OSPath struct {
mu sync.Mutex

Components []string

// Some paths need more information. They store an additional path
Expand All @@ -49,7 +52,10 @@ type OSPath struct {
}

// Make a copy of the OSPath
func (self OSPath) Copy() *OSPath {
func (self *OSPath) Copy() *OSPath {
self.mu.Lock()
defer self.mu.Unlock()

pathspec := self.pathspec
if pathspec != nil {
pathspec = pathspec.Copy()
Expand All @@ -62,15 +68,24 @@ func (self OSPath) Copy() *OSPath {
}

func (self *OSPath) SetPathSpec(pathspec *PathSpec) {
self.mu.Lock()
defer self.mu.Unlock()

self.Manipulator.PathParse(pathspec.Path, self)
self.pathspec = pathspec
}

func (self *OSPath) PathSpec() *PathSpec {
self.mu.Lock()
defer self.mu.Unlock()

return self.Manipulator.AsPathSpec(self)
}

func (self *OSPath) DelegatePath() string {
self.mu.Lock()
defer self.mu.Unlock()

pathspec := self.Manipulator.AsPathSpec(self)
if pathspec.DelegatePath == "" && pathspec.Delegate != nil {
pathspec.DelegatePath = json.MustMarshalString(pathspec.Delegate)
Expand All @@ -79,14 +94,23 @@ func (self *OSPath) DelegatePath() string {
}

func (self *OSPath) DelegateAccessor() string {
self.mu.Lock()
defer self.mu.Unlock()

return self.Manipulator.AsPathSpec(self).DelegateAccessor
}

func (self *OSPath) Path() string {
self.mu.Lock()
defer self.mu.Unlock()

return self.Manipulator.AsPathSpec(self).Path
}

func (self *OSPath) String() string {
self.mu.Lock()
defer self.mu.Unlock()

// Cache it if we need to.
if self.serialized != nil {
return *self.serialized
Expand All @@ -99,6 +123,9 @@ func (self *OSPath) String() string {
}

func (self *OSPath) Parse(path string) (*OSPath, error) {
self.mu.Lock()
defer self.mu.Unlock()

result := &OSPath{
Manipulator: self.Manipulator,
}
Expand All @@ -108,6 +135,9 @@ func (self *OSPath) Parse(path string) (*OSPath, error) {
}

func (self *OSPath) Basename() string {
self.mu.Lock()
defer self.mu.Unlock()

if len(self.Components) > 0 {
return self.Components[len(self.Components)-1]
}
Expand Down
60 changes: 44 additions & 16 deletions accessors/raw_registry/raw_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,18 +121,28 @@ type RawRegValueInfo struct {
// Containing key
*RawRegKeyInfo
value *regparser.CM_KEY_VALUE

// The windows registry can store a value inside a reg key. This
// makes the key act both as a directory and as a file
// (i.e. ReadDir() will list the key) but Open() will read the
// value.
is_default_value bool
}

func (self *RawRegValueInfo) Name() string {
return self.value.ValueName()
}

func (self *RawRegValueInfo) IsDir() bool {
return false
// We are also a key so act as a directory.
return self.is_default_value
}

func (self *RawRegValueInfo) Mode() os.FileMode {
return 0755
if self.is_default_value {
return 0755
}
return 0644
}

func (self *RawRegValueInfo) Size() int64 {
Expand All @@ -141,8 +151,12 @@ func (self *RawRegValueInfo) Size() int64 {

func (self *RawRegValueInfo) Data() *ordereddict.Dict {
value_data := self.value.ValueData()
value_type := self.value.TypeString()
if self.is_default_value {
value_type += "/Key"
}
result := ordereddict.NewDict().
Set("type", self.value.TypeString()).
Set("type", value_type).
Set("data_len", len(value_data.Data))

switch value_data.Type {
Expand Down Expand Up @@ -320,22 +334,36 @@ func (self *RawRegFileSystemAccessor) ReadDirWithOSPath(
return nil, errors.New("Key not found")
}

for _, subkey := range key.Subkeys() {
result = append(result,
&RawRegKeyInfo{
key: subkey,
_full_path: full_path.Append(subkey.Name()),
})
seen := make(map[string]int)
for idx, subkey := range key.Subkeys() {
basename := subkey.Name()
subkey := &RawRegKeyInfo{
key: subkey,
_full_path: full_path.Append(basename),
}
seen[basename] = idx
result = append(result, subkey)
}

for _, value := range key.Values() {
result = append(result,
&RawRegValueInfo{
&RawRegKeyInfo{
key: key,
_full_path: full_path.Append(value.ValueName()),
}, value,
})
basename := value.ValueName()
value_obj := &RawRegValueInfo{
RawRegKeyInfo: &RawRegKeyInfo{
key: key,
_full_path: full_path.Append(basename),
},
value: value,
}

// Does this value have the same name as one of the keys?
idx, pres := seen[basename]
if pres {
// Replace the old object with the value object
value_obj.is_default_value = true
result[idx] = value_obj
} else {
result = append(result, value_obj)
}
}

return result, nil
Expand Down
6 changes: 4 additions & 2 deletions artifacts/testdata/manual/Sleep.yaml
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
name: Test.Sleep
description: |
Used to test cancellation and artifact timeout on client
collections.
collections. Change to SERVER to test server cancellation.
type: CLIENT

sources:
- query: |
SELECT sleep(time=60) FROM scope()
SELECT sleep(time=10)
FROM range(end=100)
WHERE log(message=format(format="Sent data %v", args=now()))
3 changes: 3 additions & 0 deletions glob/fixtures/TestGlobWithContext.golden
Original file line number Diff line number Diff line change
Expand Up @@ -62,5 +62,8 @@
"011 Match masked by two matches /usr/bin , /usr/*/diff": [
"/usr/bin",
"/usr/bin/diff"
],
"012 Multiple globs matching same file /bin/bash , /bin/ba*": [
"/bin/bash"
]
}
14 changes: 11 additions & 3 deletions glob/glob.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,8 @@ func (self *Globber) ExpandWithContext(
// For each file that matched, we check which component
// would match it.
for _, f := range files {
basename := f.Name()

for filterer, next := range self.filters {
if !filterer.Match(f) {
continue
Expand All @@ -282,13 +284,12 @@ func (self *Globber) ExpandWithContext(

// Only recurse into directories.
if self.is_dir_or_link(f, accessor, 0) {
name := f.Name()
item := []*Globber{next}
prev_item, pres := children[name]
prev_item, pres := children[basename]
if pres {
item = append(prev_item, next)
}
children[name] = item
children[basename] = item
}
}
}
Expand All @@ -299,12 +300,19 @@ func (self *Globber) ExpandWithContext(
result[i].OSPath().Basename(),
result[j].OSPath().Basename())
})
var last string
for _, f := range result {
basename := f.OSPath().Basename()
if last == basename {
continue
}

select {
case <-ctx.Done():
return

case output_chan <- f:
last = basename
}
}

Expand Down
1 change: 1 addition & 0 deletions glob/glob_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ var _GlobFixture = []struct {
{"Recursive matches zero or more", []string{"/usr/bin/X11/**/diff"}},
{"Recursive matches none at end", []string{"/bin/bash/**"}},
{"Match masked by two matches", []string{"/usr/bin", "/usr/*/diff"}},
{"Multiple globs matching same file", []string{"/bin/bash", "/bin/ba*"}},
}

func GetMockFileSystemAccessor() accessors.FileSystemAccessor {
Expand Down
3 changes: 2 additions & 1 deletion reporting/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,8 @@ func (self *Container) Upload(
store_as_name = filename
}

cached, pres := uploads.DeduplicateUploads(scope, store_as_name)
cached, pres, closer := uploads.DeduplicateUploads(scope, store_as_name)
defer closer()
if pres {
return cached, nil
}
Expand Down
15 changes: 15 additions & 0 deletions services/launcher/flows.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,21 @@ func (self *Launcher) CancelFlow(
return &api_proto.StartFlowResponse{}, nil
}

// Handle server collections especially via the server artifact
// runner.
if client_id == "server" {
server_artifacts_service, err := services.GetServerArtifactRunner(
config_obj)
if err != nil {
return nil, err
}

server_artifacts_service.Cancel(ctx, flow_id, username)
return &api_proto.StartFlowResponse{
FlowId: flow_id,
}, nil
}

collection_context, err := LoadCollectionContext(
config_obj, client_id, flow_id)
if err == nil {
Expand Down
40 changes: 30 additions & 10 deletions services/launcher/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -620,6 +620,36 @@ func (self *Launcher) ScheduleArtifactCollectionFromCollectorArgs(
OutstandingRequests: int64(len(vql_collector_args)),
}

// Record the tasks for provenance of what we actually did.
err = db.SetSubjectWithCompletion(config_obj,
flow_path_manager.Task(),
&api_proto.ApiFlowRequestDetails{
Items: []*crypto_proto.VeloMessage{task},
}, nil)
if err != nil {
return "", err
}

// Run server artifacts inline.
if client_id == "server" {
server_artifacts_service, err := services.GetServerArtifactRunner(
config_obj)
if err != nil {
return "", err
}

err = db.SetSubjectWithCompletion(config_obj,
flow_path_manager.Path(),
collection_context, func() {})
if err != nil {
return "", err
}

err = server_artifacts_service.LaunchServerArtifact(
config_obj, session_id, task.FlowRequest, collection_context)
return collection_context.SessionId, err
}

// Store the collection_context first, then queue all the tasks.
err = db.SetSubjectWithCompletion(config_obj,
flow_path_manager.Path(),
Expand All @@ -635,16 +665,6 @@ func (self *Launcher) ScheduleArtifactCollectionFromCollectorArgs(
return "", err
}

// Record the tasks for provenance of what we actually did.
err = db.SetSubjectWithCompletion(config_obj,
flow_path_manager.Task(),
&api_proto.ApiFlowRequestDetails{
Items: []*crypto_proto.VeloMessage{task},
}, nil)
if err != nil {
return "", err
}

return collection_context.SessionId, nil
}

Expand Down
3 changes: 3 additions & 0 deletions services/notifications/notifications.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,9 @@ func (self *Notifier) NotifyListenerAsync(
}

func (self *Notifier) IsClientDirectlyConnected(client_id string) bool {
self.mu.Lock()
defer self.mu.Unlock()

if self.notification_pool == nil {
return false
}
Expand Down
1 change: 1 addition & 0 deletions services/orgs.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ type ServiceContainer interface {
NotebookManager() (NotebookManager, error)
ClientEventManager() (ClientEventTable, error)
ServerEventManager() (ServerEventManager, error)
ServerArtifactRunner() (ServerArtifactRunner, error)
Notifier() (Notifier, error)
ACLManager() (ACLManager, error)
}
Expand Down
Loading

0 comments on commit 408cc1d

Please sign in to comment.