forked from Velocidex/velociraptor
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpool.go
398 lines (326 loc) · 11.1 KB
/
pool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
/*
The pool client pretends to be a large number of clients in order
to exert a large load on the server. In reality each client is
running in a go routine in parallel.
Therefore when we do a hunt, each pool client goroutine will
receive the same VQL query and run the same code. This reduces the
load the pool client can impart since it is busy running the same
query multiple times.
This pool executor memoizes the results from each query in memory
so each query is run only once but the results are returned from
each goroutine fake client as if it was unique. This increases the
total number of pool clients we can support since most of the work
is pushed out to the comms.
*/
package executor
import (
"context"
"fmt"
"sync"
"google.golang.org/protobuf/proto"
"www.velocidex.com/golang/velociraptor/actions"
actions_proto "www.velocidex.com/golang/velociraptor/actions/proto"
config_proto "www.velocidex.com/golang/velociraptor/config/proto"
crypto_proto "www.velocidex.com/golang/velociraptor/crypto/proto"
"www.velocidex.com/golang/velociraptor/json"
"www.velocidex.com/golang/velociraptor/responder"
"www.velocidex.com/golang/velociraptor/utils"
)
var (
pool_mu sync.Mutex
// The global client executor which is wrapped by the various pool
// clients.
rootClientExecutor *poolClientMux
)
type transaction struct {
Request *crypto_proto.VeloMessage
Responses []*crypto_proto.VeloMessage
// While the transaction is running we need to make other
// threads wait until it is done.
IsDone chan bool
Done bool
}
// A wrapper around the standard client executor for use of pool
// clients. When multiple requests come in for the same query and
// parameters, we cache the results when the first request comes in
// and then feed the results to all other requests from memory. This
// allows us to increase the load on the server simulating a large
// fleet of independent clients.
type poolClientMux struct {
*ClientExecutor
mu sync.Mutex
// Get transactions by session id
transaction_by_session_id map[string]*transaction
// Get transactions by a unique key for the FlowRequest message.
transaction_by_flow_key map[string]*transaction
// A list of all pool clients that feed off us.
clients []*PoolClientExecutor
}
func newPoolClientMux(ctx context.Context, config_obj *config_proto.Config) (*poolClientMux, error) {
exe, err := NewClientExecutor(ctx, "C.Root", config_obj)
if err != nil {
return nil, err
}
self := &poolClientMux{
ClientExecutor: exe,
transaction_by_session_id: make(map[string]*transaction),
transaction_by_flow_key: make(map[string]*transaction),
}
go func() {
for msg := range self.ClientExecutor.ReadResponse() {
// Maybe cache the results in a transaction.
self.maybeCacheResult(msg)
// Below are just monitoring messages since regular
// collections are always cached in a transaction.
if msg.SessionId != "F.Monitoring" {
continue
}
// Forward all the event messages to all clients.
snapshot := []*PoolClientExecutor{}
self.mu.Lock()
for _, client := range self.clients {
snapshot = append(snapshot, client)
}
self.mu.Unlock()
for _, client := range snapshot {
select {
case <-ctx.Done():
return
case client.Outbound <- msg:
}
}
}
}()
return self, nil
}
// Inspect the response and transform it if needed. Currently we only
// need to replace the Hostname with the pool client's ID so it
// appears to be a different client.
func maybeTransformResponse(response *actions_proto.VQLResponse, id int) *actions_proto.VQLResponse {
if response != nil {
// We need to make the Hostname unique so if the response
// contains a Hostname we need to transform it. This
// specifically targets Generic.Client.Info interrogation.
if utils.InString(response.Columns, "Hostname") {
rows, err := utils.ParseJsonToDicts([]byte(response.JSONLResponse))
if err != nil || len(rows) == 0 {
return response
}
// Replace the Hostname
hostname, pres := rows[0].Get("Hostname")
if !pres {
return response
}
new_hostname := fmt.Sprintf("%s-%d", hostname, id)
rows[0].Set("Fqdn", new_hostname)
rows[0].Set("Hostname", new_hostname)
new_rows, err := json.MarshalJsonl(rows)
if err != nil {
return response
}
result := proto.Clone(response).(*actions_proto.VQLResponse)
result.JSONLResponse = string(new_rows)
return result
}
}
return response
}
// Inspect the request and derive a unique session key for it
func (self *poolClientMux) getRequestKey(req *crypto_proto.FlowRequest) string {
key := ""
for _, action := range req.VQLClientActions {
for _, query := range action.Query {
key += query.Name
}
// Cache it under the query name and the serialized
// parameters. This way when any of the parameters change we
// recalculate the query.
key += json.MustMarshalString(action.Env)
}
return key
}
// Compare messages from the real client executor against the cached
// transactions and add them to the transactions. If we detect the
// flow is complete we mark the transactions as done and other clients
// may replay it.
func (self *poolClientMux) maybeCacheResult(response *crypto_proto.VeloMessage) {
self.mu.Lock()
defer self.mu.Unlock()
session_id := response.SessionId
// Check if the transaction is tracked
tran, pres := self.transaction_by_session_id[session_id]
if pres {
fmt.Printf("%v\n", response)
tran.Responses = append(tran.Responses, response)
// Determine if the flow is completed by looking at the FlowStat
if !tran.Done && isFlowComplete(response) {
fmt.Printf("Completing transaction for session_id %v\n",
session_id)
// The transaction is now done.
close(tran.IsDone)
tran.Done = true
}
}
}
// Gets the transaction for this request or create a new transaction.
func (self *poolClientMux) getCompletedTransaction(
ctx context.Context, message *crypto_proto.VeloMessage) *transaction {
self.mu.Lock()
defer self.mu.Unlock()
// We only cache FlowRequest messages.
if message.FlowRequest == nil {
return nil
}
key := self.getRequestKey(message.FlowRequest)
// Do not cache empty queries.
if key == "" {
return nil
}
result, pres := self.transaction_by_flow_key[key]
// Transaction fully cached and completed.
if pres {
return result
}
// There is no transaction there yet so build one ready for
// the results.
trans := &transaction{
Request: message,
IsDone: make(chan bool),
}
// Cache it for the next
self.transaction_by_flow_key[key] = trans
self.transaction_by_session_id[message.SessionId] = trans
fmt.Printf("Starting transaction for %v\n", message.SessionId)
// Delegate the actual request for processing, the transaction
// will be filled in by maybeCacheResult()
self.ClientExecutor.ProcessRequest(ctx, message)
return trans
}
func (self *poolClientMux) maybeUpdateEventTable(
ctx context.Context, req *crypto_proto.VeloMessage) {
self.mu.Lock()
defer self.mu.Unlock()
// Only update newer tables.
if req.UpdateEventTable.Version <= self.event_manager.Version() {
return
}
// In practice each client receives its own event table
// version which is the timestamp of the last table update. In
// the pool client we do not want to refresh the table too
// much so we set the version far into the future. This means
// that it is impossible to update the pool client's event
// table without a restart.
req.UpdateEventTable.Version += 6000 * 1000000000
self.event_manager.UpdateEventTable(
self.ctx, self.wg,
self.config_obj,
self.Outbound, req.UpdateEventTable)
}
type PoolClientExecutor struct {
delegate *poolClientMux
Outbound chan *crypto_proto.VeloMessage
id int
client_id string
}
func (self *PoolClientExecutor) Nanny() *NannyService {
return Nanny
}
func (self *PoolClientExecutor) FlowManager() *responder.FlowManager {
return self.delegate.FlowManager()
}
func (self *PoolClientExecutor) EventManager() *actions.EventTable {
return self.delegate.EventManager()
}
func (self *PoolClientExecutor) ClientId() string {
return self.client_id
}
func (self *PoolClientExecutor) SendToServer(message *crypto_proto.VeloMessage) {
self.Outbound <- message
}
func (self *PoolClientExecutor) GetClientInfo() *actions_proto.ClientInfo {
result := self.delegate.GetClientInfo()
result.Hostname = fmt.Sprintf("%v-%d", result.Hostname, self.id)
result.Fqdn = fmt.Sprintf("%v-%d", result.Fqdn, self.id)
return result
}
func (self *PoolClientExecutor) ReadResponse() <-chan *crypto_proto.VeloMessage {
return self.Outbound
}
// Feed a server request to the executor for execution.
func (self *PoolClientExecutor) ProcessRequest(
ctx context.Context,
message *crypto_proto.VeloMessage) {
// Handle FlowStatsRequest specially - we just pretend this client
// does not support this feature.
if message.FlowStatsRequest != nil {
responder.MakeErrorResponse(self.Outbound, message.SessionId,
"Unsupported in Pool Client")
return
}
if message.UpdateEventTable != nil {
self.delegate.maybeUpdateEventTable(ctx, message)
return
}
tran := self.delegate.getCompletedTransaction(ctx, message)
if tran != nil {
// Wait until the transaction is done.
<-tran.IsDone
fmt.Printf("Getting %v responses from cache\n", len(tran.Responses))
// Replay the transaction into the output channel but swap the
// session id to be from thie request.
for _, resp := range tran.Responses {
// Copy the original response and change it to appear as
// if it came from this session.
response := proto.Clone(resp).(*crypto_proto.VeloMessage)
response.SessionId = message.SessionId
response.RequestId = message.RequestId
response.VQLResponse = maybeTransformResponse(resp.VQLResponse, self.id)
select {
case <-ctx.Done():
return
case self.Outbound <- response:
}
}
return
}
// If we get here there is no cached transaction - just forward to
// the normal executor.
self.delegate.ProcessRequest(ctx, message)
}
// A Pool Client is a virtualized client running in a goroutine which
// emulates a full blown client. Flow Requests are cached globally in
// a transaction so they can be replayed back for all clients. This
// allows us to calculate any query once but return all the results at
// once from other clients immediately therefore increasing the load
// on the server.
func NewPoolClientExecutor(
ctx context.Context,
client_id string,
config_obj *config_proto.Config, id int) (result *PoolClientExecutor, err error) {
pool_mu.Lock()
defer pool_mu.Unlock()
if rootClientExecutor == nil {
rootClientExecutor, err = newPoolClientMux(ctx, config_obj)
if err != nil {
return nil, err
}
}
result = &PoolClientExecutor{
delegate: rootClientExecutor,
id: id,
Outbound: make(chan *crypto_proto.VeloMessage, 10),
client_id: client_id,
}
rootClientExecutor.mu.Lock()
rootClientExecutor.clients = append(rootClientExecutor.clients, result)
rootClientExecutor.mu.Unlock()
return result, nil
}
// Detect if this is a FlowStats message which represents the flow is
// compelte.
func isFlowComplete(message *crypto_proto.VeloMessage) bool {
if message.FlowStats == nil {
return false
}
return message.FlowStats.FlowComplete
}