Skip to content

Commit

Permalink
Memory Ring buffer implementation deadlocks (Velocidex#796)
Browse files Browse the repository at this point in the history
The Memory Ring Buffer implementation suffers from a potential
deadlock that may occur when the buffer is full causing calls to
AddResponse() to block. Additionally, the memory ring buffer was not
properly filtering out cancelled messages (like the file ring buffer).

This creates problems in cancelling large hunts and could potentially
lead to client deadlocks.
  • Loading branch information
scudette authored Dec 6, 2020
1 parent 757eac0 commit 4ecd422
Show file tree
Hide file tree
Showing 15 changed files with 152 additions and 74 deletions.
6 changes: 3 additions & 3 deletions actions/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func (self UpdateEventTable) Run(
// Make a new table.
table, err := update(config_obj, responder, arg)
if err != nil {
responder.Log("Error updating global event table: %v", err)
responder.Log(ctx, "Error updating global event table: %v", err)
}

logger := logging.GetLogger(config_obj, &logging.ClientComponent)
Expand Down Expand Up @@ -172,9 +172,9 @@ func (self UpdateEventTable) Run(
config_copy.Writeback.EventQueries = event_copy
err = config.UpdateWriteback(config_copy)
if err != nil {
responder.RaiseError(fmt.Sprintf(
responder.RaiseError(ctx, fmt.Sprintf(
"Unable to write events to writeback: %v", err))
}

responder.Return()
responder.Return(ctx)
}
4 changes: 2 additions & 2 deletions actions/foreman.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ func (self UpdateForeman) Run(
config_obj.Writeback.HuntLastTimestamp = arg.LastHuntTimestamp
err := config.UpdateWriteback(config_obj)
if err != nil {
responder.RaiseError(err.Error())
responder.RaiseError(ctx, err.Error())
return
}
}
responder.Return()
responder.Return(ctx)
}
30 changes: 18 additions & 12 deletions actions/vql.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,12 @@ import (
type LogWriter struct {
config_obj *config_proto.Config
responder *responder.Responder
ctx context.Context
}

func (self *LogWriter) Write(b []byte) (int, error) {
logging.GetLogger(self.config_obj, &logging.ClientComponent).Info("%v", string(b))
self.responder.Log("%s", string(b))
self.responder.Log(self.ctx, "%s", string(b))
return len(b), nil
}

Expand Down Expand Up @@ -91,23 +92,23 @@ func (self VQLClientAction) StartQuery(
defer cancel()

if arg.Query == nil {
responder.RaiseError("Query should be specified.")
responder.RaiseError(ctx, "Query should be specified.")
return
}

// Clients do not have a copy of artifacts so they need to be
// sent all artifacts from the server.
manager, err := services.GetRepositoryManager()
if err != nil {
responder.RaiseError(fmt.Sprintf("%v", err))
responder.RaiseError(ctx, fmt.Sprintf("%v", err))
return
}

repository := manager.NewRepository()
for _, artifact := range arg.Artifacts {
_, err := repository.LoadProto(artifact, false /* validate */)
if err != nil {
responder.RaiseError(fmt.Sprintf(
responder.RaiseError(ctx, fmt.Sprintf(
"Failed to compile artifact %v.", artifact.Name))
return
}
Expand All @@ -124,7 +125,7 @@ func (self VQLClientAction) StartQuery(
Env: ordereddict.NewDict(),
Uploader: uploader,
Repository: repository,
Logger: log.New(&LogWriter{config_obj, responder}, "vql: ", 0),
Logger: log.New(&LogWriter{config_obj, responder, ctx}, "vql: ", 0),
}

for _, env_spec := range arg.Env {
Expand All @@ -145,15 +146,20 @@ func (self VQLClientAction) StartQuery(

vfilter.InstallThrottler(scope, vfilter.NewTimeThrottler(float64(rate)))

start := time.Now()

// If we panic we need to recover and report this to the
// server.
defer func() {

r := recover()
if r != nil {
msg := string(debug.Stack())
scope.Log(msg)
responder.RaiseError(msg)
responder.RaiseError(ctx, msg)
}

scope.Log("Collection is done after %v", time.Since(start))
}()

// All the queries will use the same scope. This allows one
Expand All @@ -162,7 +168,7 @@ func (self VQLClientAction) StartQuery(
query_start := uint64(time.Now().UTC().UnixNano() / 1000)
vql, err := vfilter.Parse(query.VQL)
if err != nil {
responder.RaiseError(err.Error())
responder.RaiseError(ctx, err.Error())
return
}

Expand All @@ -180,7 +186,7 @@ func (self VQLClientAction) StartQuery(
scope.Log(msg)

// Queries that time out are an error on the server.
responder.RaiseError(msg)
responder.RaiseError(ctx, msg)

// Cancel the sub ctx but do not exit
// - we need to wait for the sub query
Expand Down Expand Up @@ -211,7 +217,7 @@ func (self VQLClientAction) StartQuery(
}
// Don't log empty VQL statements.
if query.Name != "" {
responder.Log(
responder.Log(ctx,
"Time %v: %s: Sending response part %d %s (%d rows).",
(response.Timestamp-query_start)/1000000,
query.Name,
Expand All @@ -221,15 +227,15 @@ func (self VQLClientAction) StartQuery(
)
}
response.Columns = result.Columns
responder.AddResponse(&crypto_proto.GrrMessage{
responder.AddResponse(ctx, &crypto_proto.GrrMessage{
VQLResponse: response})
}
}
}

if uploader.Count > 0 {
responder.Log("Uploaded %v files.", uploader.Count)
responder.Log(ctx, "Uploaded %v files.", uploader.Count)
}

responder.Return()
responder.Return(ctx)
}
16 changes: 4 additions & 12 deletions api/notebooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,23 +444,15 @@ func (self *ApiServer) GetNotebookCell(

// Cell does not exist, make it a default cell.
if err == io.EOF {
notebook = &api_proto.NotebookCell{
return &api_proto.NotebookCell{
Input: "",
Output: "",
Data: "{}",
CellId: notebook.CellId,
Type: "Markdown",
}

// And store it for next time.
err = db.SetSubject(self.config,
notebook_path_manager.Cell(in.CellId).Path(),
notebook)
if err != nil {
return nil, err
}

} else if err != nil {
}, nil
}
if err != nil {
return nil, err
}

Expand Down
18 changes: 9 additions & 9 deletions artifacts/definitions/Windows/KapeFiles/Targets.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -775,8 +775,8 @@ parameters:
153,SDB Files x64,Executables,Windows\apppatch\Custom\Custom64/*.sdb,lazy_ntfs,
154,SDB Files x64,Executables,Windows.old\Windows\apppatch\Custom\Custom64/*.sdb,lazy_ntfs,
155,WindowsIndexSearch,FileKnowledge,programdata\microsoft\search\data\applications\windows/Windows.edb,lazy_ntfs,
156,$LogFile,FileSystem,$LogFile,lazy_ntfs,
157,$Boot,FileSystem,$Boot,lazy_ntfs,
156,$LogFile,FileSystem,$LogFile,ntfs,
157,$Boot,FileSystem,$Boot,ntfs,
158,NTUSER.DAT registry hive XP,Registry,Documents and Settings\*/NTUSER.DAT,lazy_ntfs,
159,NTUSER.DAT registry hive,Registry,Users\*/NTUSER.DAT,lazy_ntfs,
160,NTUSER.DAT registry transaction files,Registry,Users\*/NTUSER.DAT.LOG*,lazy_ntfs,
Expand All @@ -786,7 +786,7 @@ parameters:
164,NTUSER.DAT DEFAULT transaction files,Registry,Windows.old\Windows\System32\config/DEFAULT.LOG*,lazy_ntfs,
165,UsrClass.dat registry hive,Registry,Users\*\AppData\Local\Microsoft\Windows/UsrClass.dat,lazy_ntfs,
166,UsrClass.dat registry transaction files,Registry,Users\*\AppData\Local\Microsoft\Windows/UsrClass.dat.LOG*,lazy_ntfs,
167,$MFTMirr,FileSystem,$MFTMirr,lazy_ntfs,$MFTMirr is a redundant copy of the first four (4) records of the MFT.
167,$MFTMirr,FileSystem,$MFTMirr,ntfs,$MFTMirr is a redundant copy of the first four (4) records of the MFT.
168,Word Autosave Location,FileKnowledge,Users\*\AppData\Roaming\Microsoft\Word/**10,lazy_ntfs,
169,Excel Autosave Location,ApplicationCompatibility,Users\*\AppData\Roaming\Microsoft\Excel/**10,lazy_ntfs,
170,Powerpoint Autosave Location,FileKnowledge,Users\*\AppData\Roaming\Microsoft\Powerpoint/**10,lazy_ntfs,
Expand Down Expand Up @@ -824,8 +824,8 @@ parameters:
202,Event logs Win7+,EventLogs,Windows\System32\winevt\Logs/Microsoft-Windows-TerminalServices-RemoteConnectionManager%4Operational.evtx,lazy_ntfs,
203,RecentFileCache,ApplicationCompatability,Windows\AppCompat\Programs/RecentFileCache.bcf,lazy_ntfs,
204,RecentFileCache,ApplicationCompatability,Windows.old\Windows\AppCompat\Programs/RecentFileCache.bcf,lazy_ntfs,
205,$MFT,FileSystem,$MFT,lazy_ntfs,
206,$Recycle.Bin,Deleted Files,$Recycle.Bin/**10,lazy_ntfs,
205,$MFT,FileSystem,$MFT,ntfs,
206,$Recycle.Bin,Deleted Files,$Recycle.Bin/**10,ntfs,
207,RECYCLER WinXP,Deleted Files,RECYCLER/**10,lazy_ntfs,
208,hiberfil.sys,Memory,hiberfil.sys,lazy_ntfs,
209,pagefile.sys,Memory,pagefile.sys,lazy_ntfs,
Expand Down Expand Up @@ -917,20 +917,20 @@ parameters:
295,Syscache,Program Execution,System Volume Information/Syscache.hve,lazy_ntfs,
296,Syscache transaction files,Program Execution,System Volume Information/Syscache.hve.LOG*,lazy_ntfs,
297,PowerShell Console Log,PowerShellConsleLog,Users\*\AppData\Roaming\Microsoft\Windows\PowerShell\PSReadline/ConsoleHost_history.txt,lazy_ntfs,
298,$MFT,FileSystem,$MFT,lazy_ntfs,
299,$LogFile,FileSystem,$LogFile,lazy_ntfs,
298,$MFT,FileSystem,$MFT,ntfs,
299,$LogFile,FileSystem,$LogFile,ntfs,
300,$J,FileSystem,$Extend/$UsnJrnl:$J,ntfs,
301,$Max,FileSystem,$Extend/$UsnJrnl:$Max,ntfs,
302,$SDS,FileSystem,$Secure:$SDS,ntfs,
303,$Boot,FileSystem,$Boot,lazy_ntfs,
303,$Boot,FileSystem,$Boot,ntfs,
304,$T,FileSystem,$Extend\$RmMetadata\$TxfLog/$Tops:$T,ntfs,
305,LNK files from Recent,LNKFiles,Users\*\AppData\Roaming\Microsoft\Windows\Recent/**10,lazy_ntfs,Also includes automatic and custom jumplist directories
306,LNK files from Microsoft Office Recent,LNKFiles,Users\*\AppData\Roaming\Microsoft\Office\Recent/**10,lazy_ntfs,
307,LNK files from Recent (XP),LNKFiles,Documents and Settings\*\Recent/**10,lazy_ntfs,
308,Desktop LNK files XP,LNKFiles,Documents and Settings\*\Desktop/*.LNK,lazy_ntfs,
309,Desktop LNK files,LNKFiles,Users\*\Desktop/*.LNK,lazy_ntfs,
310,Restore point LNK files XP,LNKFiles,System Volume Information\_restore*\RP*/*.LNK,lazy_ntfs,
311,$Recycle.Bin,Deleted Files,$Recycle.Bin/**10,lazy_ntfs,
311,$Recycle.Bin,Deleted Files,$Recycle.Bin/**10,ntfs,
312,RECYCLER WinXP,Deleted Files,RECYCLER/**10,lazy_ntfs,
313,SAM registry transaction files,Registry,Windows\System32\config/SAM.LOG*,lazy_ntfs,
314,SAM registry transaction files,Registry,Windows.old\Windows\System32\config/SAM.LOG*,lazy_ntfs,
Expand Down
2 changes: 2 additions & 0 deletions artifacts/definitions/Windows/Sysinternals/Autoruns.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ description: |
tools:
- name: Autorun_x86
url: https://live.sysinternals.com/tools/autorunsc.exe
serve_locally: true

- name: Autorun_amd64
url: https://live.sysinternals.com/tools/autorunsc64.exe
serve_locally: true

precondition: SELECT OS From info() where OS = 'windows'

Expand Down
1 change: 1 addition & 0 deletions bin/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func RunClient(
WithEmbedded().
WithEnvLoader("VELOCIRAPTOR_CONFIG").
WithCustomValidator(initFilestoreAccessor).
WithCustomValidator(initDebugServer).
WithLogFile(*logging_flag).
WithRequiredClient().
WithRequiredLogging().
Expand Down
44 changes: 30 additions & 14 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,22 +107,26 @@ type ClientExecutor struct {
concurrency *utils.Concurrency
}

func (self *ClientExecutor) Cancel(flow_id string, responder *responder.Responder) bool {
self.mu.Lock()
defer self.mu.Unlock()

func (self *ClientExecutor) Cancel(
ctx context.Context, flow_id string, responder *responder.Responder) bool {
if Canceller.IsCancelled(flow_id) {
return false
}

self.mu.Lock()
contexts, ok := self.in_flight[flow_id]
if ok {
responder.Log("Cancelling %v in flight queries", len(contexts))
contexts = contexts[:]
}
self.mu.Unlock()

if ok {
// Cancel all existing queries.
Canceller.Cancel(flow_id)
for _, flow_ctx := range contexts {
flow_ctx.cancel()
}

Canceller.Cancel(flow_id)
return true
}

Expand Down Expand Up @@ -154,6 +158,8 @@ func (self *ClientExecutor) _FlowContext(flow_id string) (context.Context, *_Flo
}

// _CloseContext removes the flow_context from the in_flight map.
// Note: There are multiple queries tied to the same flow id but all
// of them need to be cancelled when the flow is cancelled.
func (self *ClientExecutor) _CloseContext(flow_context *_FlowContext) {
self.mu.Lock()
defer self.mu.Unlock()
Expand Down Expand Up @@ -196,8 +202,18 @@ func (self *ClientExecutor) ReadResponse() <-chan *crypto_proto.GrrMessage {
return self.Outbound
}

func makeErrorResponse(req *crypto_proto.GrrMessage, message string) *crypto_proto.GrrMessage {
return &crypto_proto.GrrMessage{
func makeErrorResponse(output chan *crypto_proto.GrrMessage,
req *crypto_proto.GrrMessage, message string) {
output <- &crypto_proto.GrrMessage{
SessionId: req.SessionId,
RequestId: constants.LOG_SINK,
LogMessage: &crypto_proto.LogMessage{
Message: message,
Timestamp: uint64(time.Now().UTC().UnixNano() / 1000),
},
}

output <- &crypto_proto.GrrMessage{
SessionId: req.SessionId,
RequestId: req.RequestId,
ResponseId: 1,
Expand Down Expand Up @@ -227,7 +243,7 @@ func (self *ClientExecutor) processRequestPlugin(
// Never serve unauthenticated requests.
if req.AuthState != crypto_proto.GrrMessage_AUTHENTICATED {
log.Printf("Unauthenticated")
self.Outbound <- makeErrorResponse(
makeErrorResponse(self.Outbound,
req, fmt.Sprintf("Unauthenticated message received: %v.", req))
return
}
Expand All @@ -242,7 +258,7 @@ func (self *ClientExecutor) processRequestPlugin(
if !req.Urgent {
err := self.concurrency.StartConcurrencyControl(ctx)
if err != nil {
responder.RaiseError(fmt.Sprintf("%v", err))
responder.RaiseError(ctx, fmt.Sprintf("%v", err))
return
}
defer self.concurrency.EndConcurrencyControl()
Expand All @@ -266,15 +282,15 @@ func (self *ClientExecutor) processRequestPlugin(

if req.Cancel != nil {
// Only log when the flow is not already cancelled.
if self.Cancel(req.SessionId, responder) {
self.Outbound <- makeErrorResponse(
req, fmt.Sprintf("Cancelled all inflight queries: %v",
if self.Cancel(ctx, req.SessionId, responder) {
makeErrorResponse(self.Outbound,
req, fmt.Sprintf("Cancelled all inflight queries for flow %v",
req.SessionId))
}
return
}

self.Outbound <- makeErrorResponse(
makeErrorResponse(self.Outbound,
req, fmt.Sprintf("Unsupported payload for message: %v", req))
}

Expand Down
2 changes: 1 addition & 1 deletion executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (self *ExecutorTestSuite) TestCancellation() {

wg.Wait()

// The cancel message should generate a log + a status
// The cancel message should generate 1 log + a status
// message. This should only be done once, no matter how many
// cancellations are sent.
require.Equal(t, len(received_messages), 2)
Expand Down
2 changes: 1 addition & 1 deletion gui/velociraptor/src/components/forms/form.js
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ export default class VeloForm extends React.Component {
if (e.currentTarget.checked) {
this.props.setValue("Y");
} else {
this.props.setValue(undefined);
this.props.setValue("N");
}
}}
checked={this.props.value === "Y"}
Expand Down
Loading

0 comments on commit 4ecd422

Please sign in to comment.