Skip to content

Commit

Permalink
Made gRPC pool parameters tunable. (Velocidex#309)
Browse files Browse the repository at this point in the history
Also removed many of the internal calls to reduce internal gRPC call load.
  • Loading branch information
scudette authored Apr 14, 2020
1 parent 1bcaff5 commit 33e07c8
Show file tree
Hide file tree
Showing 31 changed files with 819 additions and 669 deletions.
2 changes: 1 addition & 1 deletion .appveyor_cache_clear
Original file line number Diff line number Diff line change
@@ -1 +1 @@
3
4
17 changes: 6 additions & 11 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
api_proto "www.velocidex.com/golang/velociraptor/api/proto"
"www.velocidex.com/golang/velociraptor/artifacts"
artifacts_proto "www.velocidex.com/golang/velociraptor/artifacts/proto"
"www.velocidex.com/golang/velociraptor/clients"
config_proto "www.velocidex.com/golang/velociraptor/config/proto"
"www.velocidex.com/golang/velociraptor/constants"
"www.velocidex.com/golang/velociraptor/datastore"
Expand All @@ -50,6 +51,7 @@ import (
"www.velocidex.com/golang/velociraptor/grpc_client"
"www.velocidex.com/golang/velociraptor/logging"
"www.velocidex.com/golang/velociraptor/server"
"www.velocidex.com/golang/velociraptor/services"
users "www.velocidex.com/golang/velociraptor/users"
)

Expand Down Expand Up @@ -171,13 +173,7 @@ func (self *ApiServer) CollectArtifact(

result.FlowId = flow_id

// Notify the client if it is listenning.
client, cancel := self.api_client_factory.GetAPIClient(ctx, self.config)
defer cancel()

_, err = client.NotifyClients(ctx, &api_proto.NotificationRequest{
ClientId: in.ClientId,
})
err = services.NotifyClient(in.ClientId)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -376,7 +372,6 @@ func (self *ApiServer) ListClients(
func (self *ApiServer) NotifyClients(
ctx context.Context,
in *api_proto.NotificationRequest) (*empty.Empty, error) {

user_name := GetGRPCUserInfo(self.config, ctx).Name
permissions := acls.COLLECT_CLIENT
perm, err := acls.CheckAccess(self.config, user_name, permissions)
Expand All @@ -386,10 +381,10 @@ func (self *ApiServer) NotifyClients(

if in.NotifyAll {
self.server_obj.Info("sending notification to everyone")
self.server_obj.NotificationPool.NotifyAll()
services.NotifyAll()
} else if in.ClientId != "" {
self.server_obj.Info("sending notification to %s", in.ClientId)
self.server_obj.NotificationPool.Notify(in.ClientId)
services.NotifyClient(in.ClientId)
} else {
return nil, errors.New("client id should be specified")
}
Expand All @@ -406,7 +401,7 @@ func (self *ApiServer) LabelClients(
if !perm || err != nil {
return nil, errors.New("User is not allowed to label clients.")
}
result, err := LabelClients(self.config, in)
result, err := clients.LabelClients(self.config, in)
if err != nil {
return nil, err
}
Expand Down
46 changes: 0 additions & 46 deletions api/clients.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,52 +114,6 @@ func GetApiClient(
return result, nil
}

func LabelClients(
config_obj *config_proto.Config,
in *api_proto.LabelClientsRequest) (*api_proto.APIResponse, error) {
db, err := datastore.GetDB(config_obj)
if err != nil {
return nil, err
}

index_func := db.SetIndex
switch in.Operation {
case "remove":
index_func = db.UnsetIndex
case "check":
index_func = db.CheckIndex
case "set":
// default.
default:
return nil, errors.New(
"unknown label operation. Must be set, check or remove")
}

for _, label := range in.Labels {
for _, client_id := range in.ClientIds {
if !strings.HasPrefix(label, "label:") {
label = "label:" + label
}
err = index_func(
config_obj,
constants.CLIENT_INDEX_URN,
client_id, []string{label})
if err != nil {
return nil, err
}
err = index_func(
config_obj,
constants.CLIENT_INDEX_URN,
label, []string{client_id})
if err != nil {
return nil, err
}
}
}

return &api_proto.APIResponse{}, nil
}

func _is_ip_in_ranges(remote string, ranges []string) bool {
for _, ip_range := range ranges {
_, ipNet, err := net.ParseCIDR(ip_range)
Expand Down
27 changes: 21 additions & 6 deletions bin/fuse.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,10 @@ func (self *VFSFs) fetchDir(
ctx context.Context,
vfs_name string) ([]*api.FileInfoRow, error) {
self.logger.Info(fmt.Sprintf("Fetching dir %v from %v", vfs_name, self.client_id))
client, closer := grpc_client.Factory.GetAPIClient(ctx, self.config_obj)
client, closer, err := grpc_client.Factory.GetAPIClient(ctx, self.config_obj)
if err != nil {
return nil, err
}
defer closer()

response, err := client.VFSRefreshDirectory(ctx,
Expand Down Expand Up @@ -92,7 +95,10 @@ func (self *VFSFs) fetchFile(
vfs_name string) error {
self.logger.Info("Fetching file %v", vfs_name)

client, closer := grpc_client.Factory.GetAPIClient(ctx, self.config_obj)
client, closer, err := grpc_client.Factory.GetAPIClient(ctx, self.config_obj)
if err != nil {
return err
}
defer closer()

client_path, accessor := api.GetClientPath(vfs_name)
Expand Down Expand Up @@ -183,7 +189,10 @@ func (self *VFSFs) getDir(
return rows, nil
}

client, closer := grpc_client.Factory.GetAPIClient(ctx, self.config_obj)
client, closer, err := grpc_client.Factory.GetAPIClient(ctx, self.config_obj)
if err != nil {
return nil, err
}
defer closer()

request := &flows_proto.VFSListRequest{
Expand Down Expand Up @@ -239,10 +248,13 @@ func (self *VFSFs) Open(fs_name string, flags uint32, fcontext *fuse.Context) (

vfs_name := fsPathToVFS(fs_name)

client, closer := grpc_client.Factory.GetAPIClient(fcontext, self.config_obj)
client, closer, err := grpc_client.Factory.GetAPIClient(fcontext, self.config_obj)
if err != nil {
return nil, fuse.EIO
}
defer closer()

_, err := client.VFSGetBuffer(context.Background(),
_, err = client.VFSGetBuffer(context.Background(),
&api_proto.VFSFileBuffer{
ClientId: self.client_id,
VfsPath: vfs_name,
Expand Down Expand Up @@ -295,8 +307,11 @@ func (self *VFSFileReader) GetAttr(out *fuse.Attr) fuse.Status {
func (self *VFSFileReader) Read(dest []byte, off int64) (
fuse.ReadResult, fuse.Status) {

client, closer := grpc_client.Factory.GetAPIClient(
client, closer, err := grpc_client.Factory.GetAPIClient(
context.Background(), self.config_obj)
if err != nil {
return nil, fuse.EIO
}
defer closer()

response, err := client.VFSGetBuffer(context.Background(),
Expand Down
39 changes: 33 additions & 6 deletions bin/fuse_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"time"

"github.com/Velocidex/cgofuse/fuse"
kingpin "gopkg.in/alecthomas/kingpin.v2"
"www.velocidex.com/golang/velociraptor/api"
api_proto "www.velocidex.com/golang/velociraptor/api/proto"
config_proto "www.velocidex.com/golang/velociraptor/config/proto"
Expand Down Expand Up @@ -140,7 +141,11 @@ func (self *VFSFs) Read(file_path string, buff []byte, off int64, fd uint64) (n
// We need to fetch the file from the client.
ctx := context.Background()
self.logger.Info("Fetching file %v", vfs_name)
client, closer := grpc_client.Factory.GetAPIClient(ctx, self.config_obj)
client, closer, err := grpc_client.Factory.GetAPIClient(ctx, self.config_obj)
if err != nil {
self.logger.Err("Fetching Error %s: %v", vfs_name, err)
return -fuse.EIO
}
defer closer()

client_path, accessor := api.GetClientPath(vfs_name)
Expand Down Expand Up @@ -173,7 +178,12 @@ func (self *VFSFs) Read(file_path string, buff []byte, off int64, fd uint64) (n

func (self *VFSFs) read_buffer(vfs_name string, buff []byte, off int64, fh uint64) (n int) {
ctx := context.Background()
client, closer := grpc_client.Factory.GetAPIClient(ctx, self.config_obj)
client, closer, err := grpc_client.Factory.GetAPIClient(ctx, self.config_obj)
if err != nil {
self.logger.Err("Fetching Error %s: %v", vfs_name, err)
return -fuse.EIO
}

defer closer()

response, err := client.VFSGetBuffer(context.Background(),
Expand Down Expand Up @@ -275,7 +285,12 @@ func (self *VFSFs) GetDir(vfs_name string) ([]*api.FileInfoRow, int) {

// Not there - initiate a new client flow.
ctx := context.Background()
client, closer := grpc_client.Factory.GetAPIClient(ctx, self.config_obj)
client, closer, err := grpc_client.Factory.GetAPIClient(ctx, self.config_obj)
if err != nil {
self.logger.Err("Fetching Error %s: %v", vfs_name, err)
return nil, -fuse.EIO
}

defer closer()

response, err := client.VFSRefreshDirectory(context.Background(),
Expand Down Expand Up @@ -318,7 +333,12 @@ func (self *VFSFs) GetDir(vfs_name string) ([]*api.FileInfoRow, int) {
func (self *VFSFs) isFlowComplete(flow_id, vfs_name string) (bool, int) {
// Check if the flow is completed yet.
ctx := context.Background()
client, closer := grpc_client.Factory.GetAPIClient(ctx, self.config_obj)
client, closer, err := grpc_client.Factory.GetAPIClient(ctx, self.config_obj)
if err != nil {
self.logger.Err("Fetching Error %s: %v", vfs_name, err)
return false, -fuse.EIO
}

defer closer()

req := &api_proto.ApiFlowRequest{
Expand All @@ -340,7 +360,12 @@ func (self *VFSFs) isFlowComplete(flow_id, vfs_name string) (bool, int) {

func (self *VFSFs) getDir(vfs_name string) ([]*api.FileInfoRow, error) {
ctx := context.Background()
client, closer := grpc_client.Factory.GetAPIClient(ctx, self.config_obj)
client, closer, err := grpc_client.Factory.GetAPIClient(ctx, self.config_obj)
if err != nil {
self.logger.Err("Fetching Error %s: %v", vfs_name, err)
return nil, err
}

defer closer()

request := &flows_proto.VFSListRequest{
Expand Down Expand Up @@ -403,7 +428,9 @@ func doFuse() {

// Connect one time to make sure we can.
ctx := context.Background()
_, closer := grpc_client.Factory.GetAPIClient(ctx, config_obj)
_, closer, err := grpc_client.Factory.GetAPIClient(ctx, config_obj)
kingpin.FatalIfError(err, "Unable to get grpc client")

closer()

args := []string{*fuse_command_mnt_point,
Expand Down
11 changes: 9 additions & 2 deletions bin/shell.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,11 @@ func shell_executor(config_obj *config_proto.Config,
}

fmt.Printf("Running %v on %v\n", t, client_id)
client, closer := grpc_client.Factory.GetAPIClient(ctx, config_obj)
client, closer, err := grpc_client.Factory.GetAPIClient(ctx, config_obj)
if err != nil {
fmt.Printf("ERROR: %v\n", err)
return
}
defer closer()

response, err := client.CollectArtifact(ctx,
Expand Down Expand Up @@ -130,7 +134,10 @@ func completer(t prompt.Document) []prompt.Suggest {
}

func getClientInfo(config_obj *config_proto.Config, ctx context.Context) (*api_proto.ApiClient, error) {
client, closer := grpc_client.Factory.GetAPIClient(ctx, config_obj)
client, closer, err := grpc_client.Factory.GetAPIClient(ctx, config_obj)
if err != nil {
return nil, err
}
defer closer()

return client.GetClient(ctx, &api_proto.GetClientRequest{
Expand Down
57 changes: 57 additions & 0 deletions clients/clients.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package clients

import (
"strings"

errors "github.com/pkg/errors"
api_proto "www.velocidex.com/golang/velociraptor/api/proto"
config_proto "www.velocidex.com/golang/velociraptor/config/proto"
constants "www.velocidex.com/golang/velociraptor/constants"
"www.velocidex.com/golang/velociraptor/datastore"
)

func LabelClients(
config_obj *config_proto.Config,
in *api_proto.LabelClientsRequest) (*api_proto.APIResponse, error) {
db, err := datastore.GetDB(config_obj)
if err != nil {
return nil, err
}

index_func := db.SetIndex
switch in.Operation {
case "remove":
index_func = db.UnsetIndex
case "check":
index_func = db.CheckIndex
case "set":
// default.
default:
return nil, errors.New(
"unknown label operation. Must be set, check or remove")
}

for _, label := range in.Labels {
for _, client_id := range in.ClientIds {
if !strings.HasPrefix(label, "label:") {
label = "label:" + label
}
err = index_func(
config_obj,
constants.CLIENT_INDEX_URN,
client_id, []string{label})
if err != nil {
return nil, err
}
err = index_func(
config_obj,
constants.CLIENT_INDEX_URN,
label, []string{client_id})
if err != nil {
return nil, err
}
}
}

return &api_proto.APIResponse{}, nil
}
2 changes: 2 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@ func GetDefaultConfig() *config_proto.Config {
"Generic.Client.Stats",
},
ExpectedClients: 10000,
GRPCPoolMaxSize: 100,
GRPCPoolMaxWait: 60,
PublicPath: "/var/tmp/velociraptor/public",
},
Datastore: &config_proto.DatastoreConfig{
Expand Down
Loading

0 comments on commit 33e07c8

Please sign in to comment.