Skip to content

Commit

Permalink
Make an enrollment service to rate limit enrollment requests. (Veloci…
Browse files Browse the repository at this point in the history
…dex#379)

 This prevents multiple interrogations from running on each host.
  • Loading branch information
scudette authored May 23, 2020
1 parent 2705dc7 commit 9072141
Show file tree
Hide file tree
Showing 36 changed files with 1,125 additions and 511 deletions.
210 changes: 109 additions & 101 deletions actions/proto/vql.pb.go

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions actions/proto/vql.proto
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ message VQLEventTable {
}

message ClientInfo {
string client_id = 1;
string hostname = 3;
string fqdn = 4;
string system = 5;
Expand Down
2 changes: 1 addition & 1 deletion api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func (self *ApiServer) CollectArtifact(
}
}

flow_id, err := flows.ScheduleArtifactCollection(
flow_id, err := artifacts.ScheduleArtifactCollection(
self.config, in.Creator, in)
if err != nil {
return nil, err
Expand Down
1 change: 0 additions & 1 deletion api/reflect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/golang/protobuf/jsonpb"
assert "github.com/stretchr/testify/assert"
artifacts_proto "www.velocidex.com/golang/velociraptor/artifacts/proto"
// utils "www.velocidex.com/golang/velociraptor/testing"
)

func TestDescriptor(t *testing.T) {
Expand Down
14 changes: 14 additions & 0 deletions artifacts/artifacts.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,20 @@ func (self *Repository) LoadProto(artifact *artifacts_proto.Artifact, validate b
}
}

if source.Query != "" {
multi_vql, err := vfilter.MultiParse(source.Query)
if err != nil {
return nil, err
}

scope := vql_subsystem.MakeScope()

// Append the queries to the query list.
for _, vql := range multi_vql {
source.Queries = append(source.Queries, vql.ToString(scope))
}
}

if len(source.Queries) == 0 {
return nil, errors.New(fmt.Sprintf(
"Source %s in artifact %s contains no queries!",
Expand Down
39 changes: 38 additions & 1 deletion artifacts/assets/ab0x.go

Large diffs are not rendered by default.

3 changes: 1 addition & 2 deletions artifacts/definitions/Generic/Client/Info.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@ sources:
description: |
This source is used internally to populate agent info. Do not
remove this query.
queries:
- |
query: |
SELECT config.Version.Name AS Name,
config.Version.BuildTime as BuildTime,
config.Labels AS Labels,
Expand Down
9 changes: 9 additions & 0 deletions artifacts/definitions/Server/Internal/Enrollment.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
name: Server.Internal.Enrollment
description: |
This event artifact is an internal event stream over which client
enrollments are sent. You can watch this event queues to be notified
on any new clients enrolling for the first time.
Note: This is an automated system artifact. You do not need to start it.
type: SERVER_EVENT
182 changes: 182 additions & 0 deletions artifacts/schedule.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
package artifacts

import (
"crypto/rand"
"encoding/base32"
"encoding/binary"
"time"

errors "github.com/pkg/errors"
actions_proto "www.velocidex.com/golang/velociraptor/actions/proto"
api_proto "www.velocidex.com/golang/velociraptor/api/proto"
artifacts_proto "www.velocidex.com/golang/velociraptor/artifacts/proto"
config_proto "www.velocidex.com/golang/velociraptor/config/proto"
"www.velocidex.com/golang/velociraptor/constants"
crypto_proto "www.velocidex.com/golang/velociraptor/crypto/proto"
"www.velocidex.com/golang/velociraptor/datastore"
flows_proto "www.velocidex.com/golang/velociraptor/flows/proto"
"www.velocidex.com/golang/velociraptor/paths"
)

func CompileCollectorArgs(
config_obj *config_proto.Config,
principal string,
collector_request *flows_proto.ArtifactCollectorArgs) (
*actions_proto.VQLCollectorArgs, error) {
repository, err := GetGlobalRepository(config_obj)
if err != nil {
return nil, err
}

// Update the flow's artifacts list.
vql_collector_args := &actions_proto.VQLCollectorArgs{
OpsPerSecond: collector_request.OpsPerSecond,
Timeout: collector_request.Timeout,
MaxRow: 1000,
}
for _, name := range collector_request.Artifacts {
var artifact *artifacts_proto.Artifact = nil
if collector_request.AllowCustomOverrides {
artifact, _ = repository.Get("Custom." + name)
}

if artifact == nil {
artifact, _ = repository.Get(name)
}

if artifact == nil {
return nil, errors.New("Unknown artifact " + name)
}

err := repository.CheckAccess(config_obj, artifact, principal)
if err != nil {
return nil, err
}

err = repository.Compile(artifact, vql_collector_args)
if err != nil {
return nil, err
}
}

// Add any artifact dependencies.
err = repository.PopulateArtifactsVQLCollectorArgs(vql_collector_args)
if err != nil {
return nil, err
}

err = AddArtifactCollectorArgs(
config_obj, vql_collector_args, collector_request)
if err != nil {
return nil, err
}

err = Obfuscate(config_obj, vql_collector_args)
return vql_collector_args, err
}

func ScheduleArtifactCollection(
config_obj *config_proto.Config,
principal string,
collector_request *flows_proto.ArtifactCollectorArgs) (string, error) {

args := collector_request.CompiledCollectorArgs
if args == nil {
var err error
args, err = CompileCollectorArgs(
config_obj, principal, collector_request)
if err != nil {
return "", err
}
}

return ScheduleArtifactCollectionFromCollectorArgs(
config_obj, collector_request, args)
}

func ScheduleArtifactCollectionFromCollectorArgs(
config_obj *config_proto.Config,
collector_request *flows_proto.ArtifactCollectorArgs,
vql_collector_args *actions_proto.VQLCollectorArgs) (string, error) {

client_id := collector_request.ClientId
if client_id == "" {
return "", errors.New("Client id not provided.")
}

// Generate a new collection context.
collection_context := &flows_proto.ArtifactCollectorContext{
SessionId: NewFlowId(client_id),
CreateTime: uint64(time.Now().UnixNano() / 1000),
State: flows_proto.ArtifactCollectorContext_RUNNING,
Request: collector_request,
ClientId: client_id,
}
db, err := datastore.GetDB(config_obj)
if err != nil {
return "", err
}

// Save the collection context.
flow_path_manager := paths.NewFlowPathManager(client_id,
collection_context.SessionId)
err = db.SetSubject(config_obj,
flow_path_manager.Path(),
collection_context)
if err != nil {
return "", err
}

// The task we will schedule for the client.
task := &crypto_proto.GrrMessage{
SessionId: collection_context.SessionId,
RequestId: constants.ProcessVQLResponses,
VQLClientAction: vql_collector_args}

// Send an urgent request to the client.
if collector_request.Urgent {
task.Urgent = true
}

// Record the tasks for provenance of what we actually did.
err = db.SetSubject(config_obj,
flow_path_manager.Task().Path(),
&api_proto.ApiFlowRequestDetails{
Items: []*crypto_proto.GrrMessage{task}})
if err != nil {
return "", err
}

return collection_context.SessionId, db.QueueMessageForClient(
config_obj, client_id, task)
}

// Adds any parameters set in the ArtifactCollectorArgs into the
// VQLCollectorArgs.
func AddArtifactCollectorArgs(
config_obj *config_proto.Config,
vql_collector_args *actions_proto.VQLCollectorArgs,
collector_args *flows_proto.ArtifactCollectorArgs) error {

// Add any Environment Parameters from the request.
if collector_args.Parameters == nil {
return nil
}

for _, item := range collector_args.Parameters.Env {
vql_collector_args.Env = append(vql_collector_args.Env,
&actions_proto.VQLEnv{Key: item.Key, Value: item.Value})
}

return nil
}

func NewFlowId(client_id string) string {
buf := make([]byte, 8)
rand.Read(buf)

binary.BigEndian.PutUint32(buf, uint32(time.Now().Unix()))
result := base32.HexEncoding.EncodeToString(buf)[:13]

return constants.FLOW_PREFIX + result
}
2 changes: 2 additions & 0 deletions bin/artifacts.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,8 @@ func valid_parameter(param_name string, repository *artifacts.Repository) bool {

func doArtifactCollect() {
config_obj := load_config_or_default()
load_config_artifacts(config_obj)

repository, err := getRepository(config_obj)
kingpin.FatalIfError(err, "Loading extra artifacts")

Expand Down
3 changes: 1 addition & 2 deletions bin/golden.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
artifacts "www.velocidex.com/golang/velociraptor/artifacts"
config_proto "www.velocidex.com/golang/velociraptor/config/proto"
"www.velocidex.com/golang/velociraptor/constants"
"www.velocidex.com/golang/velociraptor/flows"
flows_proto "www.velocidex.com/golang/velociraptor/flows/proto"
"www.velocidex.com/golang/velociraptor/reporting"
vql_subsystem "www.velocidex.com/golang/velociraptor/vql"
Expand Down Expand Up @@ -74,7 +73,7 @@ func vqlCollectorArgsFromFixture(
}

vql_collector_args := &actions_proto.VQLCollectorArgs{}
err := flows.AddArtifactCollectorArgs(
err := artifacts.AddArtifactCollectorArgs(
config_obj,
vql_collector_args,
artifact_collector_args)
Expand Down
6 changes: 3 additions & 3 deletions datastore/filebased.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,18 +56,18 @@ import (
crypto_proto "www.velocidex.com/golang/velociraptor/crypto/proto"
"www.velocidex.com/golang/velociraptor/logging"
"www.velocidex.com/golang/velociraptor/paths"
"www.velocidex.com/golang/velociraptor/testing"
"www.velocidex.com/golang/velociraptor/utils"
"www.velocidex.com/golang/velociraptor/vtesting"
)

var (
file_based_imp = &FileBaseDataStore{
clock: testing.RealClock{},
clock: vtesting.RealClock{},
}
)

type FileBaseDataStore struct {
clock testing.Clock
clock vtesting.Clock
}

func (self *FileBaseDataStore) GetClientTasks(
Expand Down
4 changes: 3 additions & 1 deletion datastore/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/golang/protobuf/ptypes/empty"
config_proto "www.velocidex.com/golang/velociraptor/config/proto"
crypto_proto "www.velocidex.com/golang/velociraptor/crypto/proto"
"www.velocidex.com/golang/velociraptor/utils"
)

var (
Expand Down Expand Up @@ -116,7 +117,8 @@ func (self *TestDataStore) ListChildren(
if strings.HasPrefix(k, urn) {
k = strings.TrimLeft(strings.TrimPrefix(k, urn), "/")
components := strings.Split(k, "/")
if len(components) > 0 {
if len(components) > 0 &&
!utils.InString(result, components[0]) {
result = append(result, components[0])
}
}
Expand Down
4 changes: 2 additions & 2 deletions datastore/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ import (
errors "github.com/pkg/errors"
config_proto "www.velocidex.com/golang/velociraptor/config/proto"
crypto_proto "www.velocidex.com/golang/velociraptor/crypto/proto"
"www.velocidex.com/golang/velociraptor/testing"
"www.velocidex.com/golang/velociraptor/third_party/cache"
"www.velocidex.com/golang/velociraptor/utils"
"www.velocidex.com/golang/velociraptor/vtesting"
)

var (
Expand Down Expand Up @@ -306,7 +306,7 @@ func NewMySQLDataStore(config_obj *config_proto.Config) (DataStore, error) {
}
}

return &MySQLDataStore{FileBaseDataStore{clock: testing.RealClock{}}}, nil
return &MySQLDataStore{FileBaseDataStore{clock: vtesting.RealClock{}}}, nil
}

func initializeDatabase(
Expand Down
2 changes: 1 addition & 1 deletion file_store/memory/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (self *QueuePool) Register(vfs_path string) (<-chan *ordereddict.Dict, func

registrations, _ := self.registrations[vfs_path]
new_registration := &Listener{
Channel: make(chan *ordereddict.Dict, 1000),
Channel: make(chan *ordereddict.Dict, 100000),
id: time.Now().UnixNano(),
}
registrations = append(registrations, new_registration)
Expand Down
19 changes: 19 additions & 0 deletions file_store/test_utils/testing.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package test_utils

import (
"testing"

"github.com/stretchr/testify/require"
config_proto "www.velocidex.com/golang/velociraptor/config/proto"
"www.velocidex.com/golang/velociraptor/file_store"
"www.velocidex.com/golang/velociraptor/file_store/memory"
)

func GetMemoryFileStore(
t *testing.T,
config_obj *config_proto.Config) *memory.MemoryFileStore {
file_store_factory, ok := file_store.GetFileStore(config_obj).(*memory.MemoryFileStore)
require.True(t, ok)

return file_store_factory
}
Loading

0 comments on commit 9072141

Please sign in to comment.