Skip to content

Commit

Permalink
Added more uploader tests (Velocidex#2500)
Browse files Browse the repository at this point in the history
  • Loading branch information
scudette authored Mar 6, 2023
1 parent a206510 commit 46ffb9a
Show file tree
Hide file tree
Showing 7 changed files with 477 additions and 170 deletions.
2 changes: 1 addition & 1 deletion responder/responder.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (self *FlowResponder) updateStats(message *crypto_proto.VeloMessage) {
// number of files uploaded and set the expected length.
if message.FileBuffer.Offset == 0 {
self.status.UploadedFiles++
self.status.ExpectedUploadedBytes += int64(message.FileBuffer.StoredSize)
self.status.ExpectedUploadedBytes += int64(message.FileBuffer.Size)
}
}

Expand Down
2 changes: 1 addition & 1 deletion services/launcher/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ func (self *Launcher) CompileCollectorArgs(
vql_collector_args.Timeout = collector_request.Timeout
}

if config_obj.Defaults != nil {
if config_obj != nil && config_obj.Defaults != nil {
vql_collector_args.MaxRow = config_obj.Defaults.MaxRows
vql_collector_args.MaxRowBufferSize = config_obj.Defaults.MaxRowBufferSize
}
Expand Down
8 changes: 3 additions & 5 deletions uploads/client_uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,6 @@ func (self *VelociraptorUploader) Upload(
md5_sum := md5.New()
sha_sum := sha256.New()

BUFF_SIZE = int64(1024 * 1024)

for {
// Ensure there is a fresh allocation for every
// iteration to prevent overwriting in flight buffers.
Expand Down Expand Up @@ -106,7 +104,7 @@ func (self *VelociraptorUploader) Upload(
},
Offset: offset,
Size: uint64(expected_size),
StoredSize: uint64(expected_size),
StoredSize: offset + uint64(len(data)),
Mtime: mtime.UnixNano(),
Atime: atime.UnixNano(),
Ctime: ctime.UnixNano(),
Expand Down Expand Up @@ -243,7 +241,7 @@ func (self *VelociraptorUploader) maybeUploadSparse(
Accessor: accessor,
},
Size: uint64(real_size),
StoredSize: 0,
StoredSize: uint64(expected_size),
IsSparse: is_sparse,
Index: index,
Mtime: mtime.UnixNano(),
Expand Down Expand Up @@ -356,7 +354,7 @@ func (self *VelociraptorUploader) maybeUploadSparse(
Accessor: accessor,
},
Size: uint64(real_size),
StoredSize: uint64(expected_size),
StoredSize: uint64(write_offset),
IsSparse: is_sparse,
Offset: uint64(write_offset),
Index: index,
Expand Down
124 changes: 100 additions & 24 deletions uploads/client_uploader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"bytes"
"context"
"fmt"
"io/ioutil"
"os"
"runtime"
"testing"
"time"
Expand Down Expand Up @@ -42,7 +40,8 @@ func CombineOutput(name string, responses []*crypto_proto.VeloMessage) string {
result := []byte{}

for _, item := range responses {
if item.FileBuffer.Pathspec.Path == name {
if item.FileBuffer != nil &&
item.FileBuffer.Pathspec.Path == name {
result = append(result, item.FileBuffer.Data...)
}
}
Expand Down Expand Up @@ -137,7 +136,10 @@ func TestClientUploaderSparseWithEOF(t *testing.T) {
assert.Equal(t, CombineOutput("/foo", responses), "Hello hi")
}

func TestClientUploader(t *testing.T) {
func TestClientUploaderMultipleBuffers(t *testing.T) {
cancel := utils.MockTime(&utils.MockClock{MockNow: time.Unix(10, 10)})
defer cancel()

responder_obj := responder.TestResponderWithFlowId(
nil, "TestClientUploader")
uploader := &VelociraptorUploader{
Expand All @@ -146,31 +148,86 @@ func TestClientUploader(t *testing.T) {

BUFF_SIZE = 10

tmpfile, err := ioutil.TempFile("", "tmp*")
assert.NoError(t, err)
defer os.Remove(tmpfile.Name())
ctx := context.Background()
scope := vql_subsystem.MakeScope()

_, err = tmpfile.Write([]byte("Hello world"))
resp, err := uploader.Upload(
ctx, scope, accessors.MustNewLinuxOSPath("test.txt"),
"file", nil,

// Expected_size
1000, nilTime, nilTime, nilTime, nilTime,
bytes.NewBufferString("Hello world Hello world"))
assert.NoError(t, err)

name := tmpfile.Name()
tmpfile.Close()
responder_obj.Close()

fd, err := os.Open(name)
assert.NoError(t, err)
var messages []*crypto_proto.VeloMessage
vtesting.WaitUntil(time.Second*5, t, func() bool {
messages = responder_obj.Drain.Messages()
for _, r := range messages {
if r.FlowStats != nil {
return true
}
}
return false
})

golden := ordereddict.NewDict().
Set("VQLResponse", resp).
Set("Messages", messages)
goldie.Assert(t, "TestClientUploaderMultipleBuffers",
json.MustMarshalIndent(golden))
}

func TestClientUploaderMultipleUploads(t *testing.T) {
cancel := utils.MockTime(&utils.MockClock{MockNow: time.Unix(10, 10)})
defer cancel()

responder_obj := responder.TestResponderWithFlowId(
nil, "TestClientUploader")
uploader := &VelociraptorUploader{
Responder: responder_obj,
}

BUFF_SIZE = 1000

ctx := context.Background()
scope := vql_subsystem.MakeScope()

resp, err := uploader.Upload(
ctx, scope, getOSPath(name),
"file", nil, 1000,
nilTime, nilTime, nilTime, nilTime, fd)
assert.NoError(t, err)
assert.Equal(t, resp.Path, name)
assert.Equal(t, resp.Size, uint64(11))
assert.Equal(t, resp.StoredSize, uint64(11))
assert.Equal(t, resp.Error, "")
var resp interface{}
var err error

for i := 0; i < 2; i++ {
resp, err = uploader.Upload(
ctx, scope, accessors.MustNewLinuxOSPath(
fmt.Sprintf("test23%v.txt", i)),
"file", nil,

// Expected_size
1000, nilTime, nilTime, nilTime, nilTime,
bytes.NewBufferString("Hello world"))
assert.NoError(t, err)
}

responder_obj.Close()

var messages []*crypto_proto.VeloMessage
vtesting.WaitUntil(time.Second*5, t, func() bool {
messages = responder_obj.Drain.Messages()
for _, r := range messages {
if r.FlowStats != nil {
return true
}
}
return false
})

golden := ordereddict.NewDict().
Set("VQLResponse", resp).
Set("Messages", messages)
goldie.Assert(t, "TestClientUploaderMultipleUploads",
json.MustMarshalIndent(golden))
}

// Trying to upload a completely sparse file with no data but real
Expand Down Expand Up @@ -209,6 +266,9 @@ func TestClientUploaderCompletelySparse(t *testing.T) {
}

func TestClientUploaderSparseMultiBuffer(t *testing.T) {
cancel := utils.MockTime(&utils.MockClock{MockNow: time.Unix(10, 10)})
defer cancel()

resp := responder.TestResponderWithFlowId(
nil, fmt.Sprintf("Test%d", utils.GetId()))
uploader := &VelociraptorUploader{
Expand All @@ -217,6 +277,10 @@ func TestClientUploaderSparseMultiBuffer(t *testing.T) {

// 2 bytes per message
BUFF_SIZE = 2
defer func() {
BUFF_SIZE = 1000
}()

reader := &TestRangeReader{
Reader: bytes.NewReader([]byte(
"Hello world hello world")),
Expand All @@ -231,15 +295,19 @@ func TestClientUploaderSparseMultiBuffer(t *testing.T) {
ctx := context.Background()
scope := vql_subsystem.MakeScope()

uploader.maybeUploadSparse(ctx, scope,
upload_resp, err := uploader.maybeUploadSparse(ctx, scope,
filename, "ntfs", nil, 1000, nilTime,
resp.NextUploadId(),
range_reader)
assert.NoError(t, err)

resp.Close()

// Wait for the status message
var responses []*crypto_proto.VeloMessage
vtesting.WaitUntil(time.Second*5, t, func() bool {
responses = resp.Drain.Messages()
return CombineOutput("/foo", responses) == "Hello hello "
return len(responses) > 0 && responses[len(responses)-1].FlowStats != nil
})

assert.Equal(t, CombineOutput("/foo", responses), "Hello hello ")
Expand All @@ -248,8 +316,12 @@ func TestClientUploaderSparseMultiBuffer(t *testing.T) {
response.SessionId = ""
}

golden := ordereddict.NewDict().
Set("VQLResponse", upload_resp).
Set("Messages", responses)

goldie.Assert(t, "ClientUploaderSparseMultiBuffer",
json.MustMarshalIndent(responses))
json.MustMarshalIndent(golden))
}

// Upload multiple files.
Expand Down Expand Up @@ -349,6 +421,10 @@ func TestClientUploaderNoIndexIfNotSparse(t *testing.T) {

// 2 bytes per message
BUFF_SIZE = 2
defer func() {
BUFF_SIZE = 1000
}()

reader := &TestRangeReader{
Reader: bytes.NewReader([]byte(
"Hello world hello world")),
Expand Down
Loading

0 comments on commit 46ffb9a

Please sign in to comment.