Skip to content

Commit

Permalink
Implemented active client side cancellation. (Velocidex#132)
Browse files Browse the repository at this point in the history
  • Loading branch information
scudette authored Oct 18, 2019
1 parent a755f02 commit 6dcfda9
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 12 deletions.
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,7 @@ build_release: build_docker
debug:
dlv debug --build-flags="-tags 'release server_vql extras'" \
./bin/ -- frontend -v

debug_client:
dlv debug --build-flags="-tags 'release server_vql extras'" \
./bin/ -- client -v
4 changes: 2 additions & 2 deletions artifacts/assets/ab0x.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions config/ab0x.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

103 changes: 95 additions & 8 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ import (
"fmt"
"log"
"runtime/debug"
"sync"

"www.velocidex.com/golang/velociraptor/actions"

config_proto "www.velocidex.com/golang/velociraptor/config/proto"
crypto_proto "www.velocidex.com/golang/velociraptor/crypto/proto"
"www.velocidex.com/golang/velociraptor/logging"
Expand All @@ -46,9 +48,82 @@ type Executor interface {

// A concerete implementation of a client executor.

// _FlowContext keeps track of all the queries running as part of a
// given flow. When the flow is cancelled we cancel all these queries.
type _FlowContext struct {
cancel func()
id int
flow_id string
}

type ClientExecutor struct {
Inbound chan *crypto_proto.GrrMessage
Outbound chan *crypto_proto.GrrMessage

// Map all the contexts with the flow id.
mu sync.Mutex
config_obj *config_proto.Config
in_flight map[string][]*_FlowContext
next_id int
}

func (self *ClientExecutor) Cancel(flow_id string, responder *responder.Responder) {
self.mu.Lock()
defer self.mu.Unlock()

contexts, ok := self.in_flight[flow_id]
if ok {
responder.Log("Cancelling %v in flight queries", len(contexts))
for _, flow_ctx := range contexts {
flow_ctx.cancel()
}
}
}

func (self *ClientExecutor) _FlowContext(flow_id string) (context.Context, *_FlowContext) {
self.mu.Lock()
defer self.mu.Unlock()

ctx, cancel := context.WithCancel(context.Background())

result := &_FlowContext{
flow_id: flow_id,
cancel: cancel,
id: self.next_id,
}
self.next_id++

contexts, ok := self.in_flight[flow_id]
if ok {
contexts = append(contexts, result)
} else {
contexts = []*_FlowContext{result}
}
self.in_flight[flow_id] = contexts

return ctx, result
}

// _CloseContext removes the flow_context from the in_flight map.
func (self *ClientExecutor) _CloseContext(flow_context *_FlowContext) {
self.mu.Lock()
defer self.mu.Unlock()

contexts, ok := self.in_flight[flow_context.flow_id]
if ok {
new_context := make([]*_FlowContext, 0, len(contexts))
for i := 0; i < len(contexts); i++ {
if contexts[i].id != flow_context.id {
new_context = append(new_context, contexts[i])
}
}

if len(new_context) == 0 {
delete(self.in_flight, flow_context.flow_id)
} else {
self.in_flight[flow_context.flow_id] = new_context
}
}
}

// Blocks until a request is received from the server. Called by the
Expand Down Expand Up @@ -112,31 +187,41 @@ func (self *ClientExecutor) processRequestPlugin(
responder := responder.NewResponder(config_obj, req, self.Outbound)

if req.VQLClientAction != nil {
go actions.VQLClientAction{}.StartQuery(
actions.VQLClientAction{}.StartQuery(
config_obj, ctx, responder, req.VQLClientAction)
return
}

if req.UpdateEventTable != nil {
go actions.UpdateEventTable{}.Run(
actions.UpdateEventTable{}.Run(
config_obj, ctx, responder, req.UpdateEventTable)
return
}

if req.UpdateForeman != nil {
go actions.UpdateForeman{}.Run(
actions.UpdateForeman{}.Run(
config_obj, ctx, responder, req.UpdateForeman)
return
}

if req.Cancel != nil {
self.Cancel(req.SessionId, responder)
self.Outbound <- makeErrorResponse(
req, fmt.Sprintf("Cancelled all inflight queries: %v",
req.SessionId))
return
}

self.Outbound <- makeErrorResponse(
req, fmt.Sprintf("Unsupported payload for message: %v", req))
}

func NewClientExecutor(config_obj *config_proto.Config) (*ClientExecutor, error) {
result := &ClientExecutor{
Inbound: make(chan *crypto_proto.GrrMessage),
Outbound: make(chan *crypto_proto.GrrMessage),
Inbound: make(chan *crypto_proto.GrrMessage),
Outbound: make(chan *crypto_proto.GrrMessage),
in_flight: make(map[string][]*_FlowContext),
config_obj: config_obj,
}

go func() {
Expand All @@ -151,11 +236,13 @@ func NewClientExecutor(config_obj *config_proto.Config) (*ClientExecutor, error)
// server should never send us those.
if req.AuthState == crypto_proto.GrrMessage_AUTHENTICATED {
// Each request has its own context.
ctx := context.Background()
ctx, flow_context := result._FlowContext(req.SessionId)
logger.Info("Received request: %v", req)

// Process the request asynchronously.
go result.processRequestPlugin(config_obj, ctx, req)
go func() {
result.processRequestPlugin(config_obj, ctx, req)
result._CloseContext(flow_context)
}()
}
}
}()
Expand Down

0 comments on commit 6dcfda9

Please sign in to comment.