Skip to content

Commit

Permalink
DeleteFlow: Do not always rebuild flow index from scratch (#4016)
Browse files Browse the repository at this point in the history
Filter out the deleted flow id, write index back to storage.

(If the index can't be opened, still rebuild from scratch.)

Close #4015

---------

Co-authored-by: Mike Cohen <mike@velocidex.com>
  • Loading branch information
hillu and scudette authored Jan 20, 2025
1 parent 3ce075b commit 24a9b57
Show file tree
Hide file tree
Showing 10 changed files with 264 additions and 27 deletions.
6 changes: 5 additions & 1 deletion artifacts/definitions/Server/Utils/DeleteFlow.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,12 @@ parameters:
- name: ReallyDoIt
description: If you really want to delete the collection, check this.
type: bool
- name: Sync
description: If specified we ensure delete happens immediately
type: bool

sources:
- query: |
SELECT Type, Data.VFSPath AS VFSPath, Error
FROM delete_flow(flow_id=FlowId, client_id=ClientId, really_do_it=ReallyDoIt)
FROM delete_flow(flow_id=FlowId,
client_id=ClientId, really_do_it=ReallyDoIt, sync=Sync)
1 change: 1 addition & 0 deletions gui/velociraptor/src/components/flows/flows-list.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ export class DeleteFlowDialog extends React.PureComponent {
"Server.Utils.DeleteFlow",
{FlowId: flow_id,
ClientId: client_id,
Sync: "Y",
ReallyDoIt: "Y"}, ()=>{
this.props.onClose();
this.setState({loading: false});
Expand Down
21 changes: 18 additions & 3 deletions services/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,13 @@ const (
// When the principal is set to this below we avoid audit logging
// the call.
NoAuditLogging = ""
DryRunOnly = false
)

var (
FlowNotFoundError = utils.Wrap(os.ErrNotExist, "Flow not found")
DryRunOnly = DeleteFlowOptions{
ReallyDoIt: false,
}
)

type DeleteFlowResponse struct {
Expand All @@ -93,6 +95,19 @@ type GetFlowOptions struct {
Downloads bool
}

type DeleteFlowOptions struct {
// If this is not set, we do a dry run to indicate which files
// will be deleted within the flow but do not actually delete the
// files.
ReallyDoIt bool

// If this is set the delete will be synchronous and index updated
// immediately. This is much slower but it is necessary when
// results need to be available immediately. When False, we delete
// asynchronously and update the index at a later time.
Sync bool
}

type CompilerOptions struct {
// Should names be obfuscated in the resulting VQL?
ObfuscateNames bool
Expand Down Expand Up @@ -136,7 +151,7 @@ type FlowStorer interface {
ctx context.Context,
config_obj *config_proto.Config,
client_id string, flow_id string, principal string,
really_do_it bool) ([]*DeleteFlowResponse, error)
options DeleteFlowOptions) ([]*DeleteFlowResponse, error)

LoadCollectionContext(
ctx context.Context,
Expand Down Expand Up @@ -246,5 +261,5 @@ type Launcher interface {
config_obj *config_proto.Config,
principal, artifact, client_id string,
start_time, end_time time.Time,
really_do_it bool) ([]*DeleteFlowResponse, error)
options DeleteFlowOptions) ([]*DeleteFlowResponse, error)
}
31 changes: 19 additions & 12 deletions services/launcher/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ func (self *FlowStorageManager) DeleteFlow(
ctx context.Context,
config_obj *config_proto.Config,
client_id string, flow_id string, principal string,
really_do_it bool) ([]*services.DeleteFlowResponse, error) {
options services.DeleteFlowOptions) (
[]*services.DeleteFlowResponse, error) {

launcher, err := services.GetLauncher(config_obj)
if err != nil {
Expand All @@ -46,7 +47,7 @@ func (self *FlowStorageManager) DeleteFlow(
return nil, nil
}

if really_do_it && principal != "" {
if options.ReallyDoIt && principal != "" {
services.LogAudit(ctx,
config_obj, principal, "delete_flow",
ordereddict.NewDict().
Expand All @@ -60,7 +61,7 @@ func (self *FlowStorageManager) DeleteFlow(
upload_metadata_path := flow_path_manager.UploadMetadata()

r := &reporter{
really_do_it: really_do_it,
really_do_it: options.ReallyDoIt,
ctx: ctx,
config_obj: config_obj,
seen: make(map[string]bool),
Expand Down Expand Up @@ -151,11 +152,16 @@ func (self *FlowStorageManager) DeleteFlow(
r.emit_fs("NotebookItem", path)
return nil
})
// Rebuild the flow index to ensure GUI paging works
// properly. This is pretty slow but we do not expect to delete
// flows that often.
if really_do_it {
err = self.buildFlowIndexFromLegacy(ctx, config_obj, client_id)
if options.ReallyDoIt {
// User specified the flow must be removed immediately.
if options.Sync {
err = self.removeFlowFromIndex(ctx, config_obj, client_id, flow_id)
} else {
// Otherwise we just mark the index as pending a rebuild and move on.
self.mu.Lock()
self.pendingIndexes = append(self.pendingIndexes, client_id)
self.mu.Unlock()
}
}
r.pool.StopAndWait()

Expand Down Expand Up @@ -268,7 +274,8 @@ func (self *Launcher) DeleteEvents(
config_obj *config_proto.Config,
principal, artifact, client_id string,
start_time, end_time time.Time,
really_do_it bool) ([]*services.DeleteFlowResponse, error) {
options services.DeleteFlowOptions) (
[]*services.DeleteFlowResponse, error) {

path_manager, err := artifacts.NewArtifactPathManager(ctx,
config_obj, client_id, "", artifact)
Expand All @@ -287,7 +294,7 @@ func (self *Launcher) DeleteEvents(
f.StartTime.Before(end_time) {
var error_message string

if really_do_it {
if options.ReallyDoIt {
err := file_store_factory.Delete(f.Path)
if err != nil {
error_message = fmt.Sprintf(
Expand Down Expand Up @@ -323,7 +330,7 @@ func (self *Launcher) DeleteEvents(
f.StartTime.Before(end_time) {
var error_message string

if really_do_it {
if options.ReallyDoIt {
err := file_store_factory.Delete(f.Path)
if err != nil {
error_message = fmt.Sprintf(
Expand All @@ -350,7 +357,7 @@ func (self *Launcher) DeleteEvents(
}

// Log into the audit log
if really_do_it {
if options.ReallyDoIt {
return result, services.LogAudit(ctx, config_obj, principal, "DeleteEvents",
ordereddict.NewDict().
Set("artifact", artifact).
Expand Down
7 changes: 4 additions & 3 deletions services/launcher/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -761,7 +761,8 @@ func NewLauncherService(
wg *sync.WaitGroup,
config_obj *config_proto.Config) (services.Launcher, error) {

return &Launcher{
Storage_: &FlowStorageManager{},
}, nil
res := &Launcher{
Storage_: NewFlowStorageManager(ctx, config_obj, wg),
}
return res, nil
}
109 changes: 109 additions & 0 deletions services/launcher/launcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ import (

"github.com/Velocidex/ordereddict"
"github.com/go-errors/errors"
"www.velocidex.com/golang/velociraptor/constants"
"www.velocidex.com/golang/velociraptor/file_store"
"www.velocidex.com/golang/velociraptor/result_sets"
"www.velocidex.com/golang/velociraptor/utils"
"www.velocidex.com/golang/velociraptor/vtesting/goldie"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -66,6 +70,13 @@ sources:
SELECT * FROM info()
`

func (self *LauncherTestSuite) SetupTest() {
self.ConfigObj = self.TestSuite.LoadConfig()
self.ConfigObj.Services.ServerArtifacts = true

self.TestSuite.SetupTest()
}

func (self *LauncherTestSuite) TestCompilingWithTools() {
// Our tool binary and its hash.
message := []byte("Hello world")
Expand Down Expand Up @@ -1318,6 +1329,104 @@ func getReqName(in *actions_proto.VQLCollectorArgs) string {
return ""
}

func (self *LauncherTestSuite) TestDelete() {
launcher, err := services.GetLauncher(self.ConfigObj)
assert.NoError(self.T(), err)

flow_id := "F.FlowId123"
user := "admin"

manager, _ := services.GetRepositoryManager(self.ConfigObj)
repository, _ := manager.GetGlobalRepository(self.ConfigObj)
acl_manager := acl_managers.NullACLManager{}

defer utils.SetFlowIdForTests(flow_id)()

// Schedule a job for the server runner.
flow_id, err = launcher.ScheduleArtifactCollection(
self.Ctx, self.ConfigObj, acl_manager,
repository, &flows_proto.ArtifactCollectorArgs{
Creator: user,
ClientId: "server",
Artifacts: []string{"Generic.Client.Info"},
}, utils.SyncCompleter)

assert.NoError(self.T(), err)

res, err := launcher.GetFlows(self.Ctx, self.ConfigObj, "server",
result_sets.ResultSetOptions{}, 0, 10)
assert.NoError(self.T(), err)
assert.Equal(self.T(), len(res.Items), 1)
assert.Equal(self.T(), res.Items[0].SessionId, flow_id)

// Now delete the flow asyncronously
_, err = launcher.Storage().DeleteFlow(
self.Ctx, self.ConfigObj, "server",
flow_id, constants.PinnedServerName,
services.DeleteFlowOptions{
ReallyDoIt: true,
Sync: false,
})
assert.NoError(self.T(), err)

// Index is not updated yet
idx := self.getIndex("server")
assert.Equal(self.T(), len(idx), 1)
idx_flow_id, _ := idx[0].GetString("FlowId")
assert.Equal(self.T(), flow_id, idx_flow_id)

// However GetFlows omits the deleted flow immediately because it
// can not find it (The actual flow object is removed but the
// index is out of step).
res, err = launcher.GetFlows(self.Ctx, self.ConfigObj, "server",
result_sets.ResultSetOptions{}, 0, 10)
assert.NoError(self.T(), err)
assert.Equal(self.T(), len(res.Items), 0)

// Create the flow again
new_flow_id, err := launcher.ScheduleArtifactCollection(
self.Ctx, self.ConfigObj, acl_manager,
repository, &flows_proto.ArtifactCollectorArgs{
Creator: user,
ClientId: "server",
Artifacts: []string{"Generic.Client.Info"},
}, utils.SyncCompleter)
assert.NoError(self.T(), err)
assert.Equal(self.T(), new_flow_id, flow_id)

// Now delete the flow syncronously
_, err = launcher.Storage().DeleteFlow(
self.Ctx, self.ConfigObj, "server",
flow_id, constants.PinnedServerName,
services.DeleteFlowOptions{
ReallyDoIt: true,
Sync: true,
})
assert.NoError(self.T(), err)

// This time the index is reset immediately.
idx = self.getIndex("server")
assert.Equal(self.T(), len(idx), 0)
}

func (self *LauncherTestSuite) getIndex(client_id string) (
res []*ordereddict.Dict) {

client_path_manager := paths.NewClientPathManager(client_id)
file_store_factory := file_store.GetFileStore(self.ConfigObj)
rs_reader, err := result_sets.NewResultSetReader(file_store_factory,
client_path_manager.FlowIndex())
if err != nil {
return nil
}
defer rs_reader.Close()

for r := range rs_reader.Rows(self.Ctx) {
res = append(res, r)
}
return res
}

func TestLauncher(t *testing.T) {
suite.Run(t, &LauncherTestSuite{})
}
Loading

0 comments on commit 24a9b57

Please sign in to comment.