forked from Velocidex/velociraptor
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Refactored in memory datastore to be more efficient. (Velocidex#1353)
- Loading branch information
Showing
54 changed files
with
1,133 additions
and
1,096 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,93 @@ | ||
// Manage the client task queues | ||
|
||
package clients | ||
|
||
import ( | ||
"sync/atomic" | ||
|
||
config_proto "www.velocidex.com/golang/velociraptor/config/proto" | ||
crypto_proto "www.velocidex.com/golang/velociraptor/crypto/proto" | ||
"www.velocidex.com/golang/velociraptor/datastore" | ||
"www.velocidex.com/golang/velociraptor/paths" | ||
"www.velocidex.com/golang/velociraptor/utils" | ||
) | ||
|
||
var ( | ||
Clock utils.Clock = &utils.RealClock{} | ||
g_id uint64 | ||
) | ||
|
||
func GetClientTasks( | ||
config_obj *config_proto.Config, | ||
client_id string, | ||
do_not_lease bool) ([]*crypto_proto.VeloMessage, error) { | ||
result := []*crypto_proto.VeloMessage{} | ||
|
||
db, err := datastore.GetDB(config_obj) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
client_path_manager := paths.NewClientPathManager(client_id) | ||
tasks, err := db.ListChildren( | ||
config_obj, client_path_manager.TasksDirectory()) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
for _, task_urn := range tasks { | ||
// Here we read the task from the task_urn and remove | ||
// it from the queue. | ||
message := &crypto_proto.VeloMessage{} | ||
err = db.GetSubject(config_obj, task_urn, message) | ||
if err != nil { | ||
continue | ||
} | ||
|
||
if !do_not_lease { | ||
err = db.DeleteSubject(config_obj, task_urn) | ||
if err != nil { | ||
return nil, err | ||
} | ||
} | ||
result = append(result, message) | ||
} | ||
return result, nil | ||
} | ||
|
||
func UnQueueMessageForClient( | ||
config_obj *config_proto.Config, | ||
client_id string, | ||
message *crypto_proto.VeloMessage) error { | ||
db, err := datastore.GetDB(config_obj) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
client_path_manager := paths.NewClientPathManager(client_id) | ||
return db.DeleteSubject(config_obj, | ||
client_path_manager.Task(message.TaskId)) | ||
} | ||
|
||
func currentTaskId() uint64 { | ||
id := atomic.AddUint64(&g_id, 1) | ||
return uint64(Clock.Now().UTC().UnixNano()&0x7fffffffffff0000) | (id & 0xFFFF) | ||
} | ||
|
||
func QueueMessageForClient( | ||
config_obj *config_proto.Config, | ||
client_id string, | ||
req *crypto_proto.VeloMessage) error { | ||
|
||
// Task ID is related to time. | ||
req.TaskId = currentTaskId() | ||
|
||
db, err := datastore.GetDB(config_obj) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
client_path_manager := paths.NewClientPathManager(client_id) | ||
return db.SetSubject(config_obj, | ||
client_path_manager.Task(req.TaskId), req) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,81 @@ | ||
package clients_test | ||
|
||
import ( | ||
"fmt" | ||
"sort" | ||
"testing" | ||
|
||
"github.com/alecthomas/assert" | ||
"github.com/stretchr/testify/suite" | ||
"google.golang.org/protobuf/proto" | ||
"www.velocidex.com/golang/velociraptor/clients" | ||
crypto_proto "www.velocidex.com/golang/velociraptor/crypto/proto" | ||
"www.velocidex.com/golang/velociraptor/file_store/test_utils" | ||
) | ||
|
||
type ClientTasksTestSuite struct { | ||
test_utils.TestSuite | ||
|
||
client_id string | ||
} | ||
|
||
func (self *ClientTasksTestSuite) TestQueueMessages() { | ||
client_id := "C.1236" | ||
|
||
message1 := &crypto_proto.VeloMessage{Source: "Server", SessionId: "1"} | ||
err := clients.QueueMessageForClient(self.ConfigObj, client_id, message1) | ||
assert.NoError(self.T(), err) | ||
|
||
// Now retrieve all messages. | ||
tasks, err := clients.GetClientTasks( | ||
self.ConfigObj, client_id, true /* do_not_lease */) | ||
assert.NoError(self.T(), err) | ||
assert.Equal(self.T(), 1, len(tasks)) | ||
assert.True(self.T(), proto.Equal(tasks[0], message1)) | ||
|
||
// We did not lease, so the tasks are not removed, but this | ||
// time we will lease. | ||
tasks, err = clients.GetClientTasks( | ||
self.ConfigObj, client_id, false /* do_not_lease */) | ||
assert.NoError(self.T(), err) | ||
assert.Equal(self.T(), len(tasks), 1) | ||
|
||
// No tasks available. | ||
tasks, err = clients.GetClientTasks( | ||
self.ConfigObj, client_id, false /* do_not_lease */) | ||
assert.NoError(self.T(), err) | ||
assert.Equal(self.T(), len(tasks), 0) | ||
} | ||
|
||
func (self *ClientTasksTestSuite) TestFastQueueMessages() { | ||
client_id := "C.1235" | ||
|
||
written := []*crypto_proto.VeloMessage{} | ||
|
||
for i := 0; i < 10; i++ { | ||
message := &crypto_proto.VeloMessage{Source: "Server", SessionId: fmt.Sprintf("%d", i)} | ||
err := clients.QueueMessageForClient(self.ConfigObj, client_id, message) | ||
assert.NoError(self.T(), err) | ||
|
||
written = append(written, message) | ||
} | ||
|
||
// Now retrieve all messages. | ||
tasks, err := clients.GetClientTasks( | ||
self.ConfigObj, client_id, true /* do_not_lease */) | ||
assert.NoError(self.T(), err) | ||
assert.Equal(self.T(), 10, len(tasks)) | ||
|
||
// Does not have to return in sorted form. | ||
sort.Slice(tasks, func(i, j int) bool { | ||
return tasks[i].SessionId < tasks[j].SessionId | ||
}) | ||
|
||
for i := 0; i < 10; i++ { | ||
assert.True(self.T(), proto.Equal(tasks[i], written[i])) | ||
} | ||
} | ||
|
||
func TestClientTasksService(t *testing.T) { | ||
suite.Run(t, &ClientTasksTestSuite{}) | ||
} |
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.