Skip to content

Commit

Permalink
Initial implementation of a replication service. (Velocidex#999)
Browse files Browse the repository at this point in the history
This allows Velociraptor to start multiple frontends. There are two types of frontends - a master and slave.

By default the master will start up - giving the same experience for single frontend deployments as in previous versions.

Start the master:
```
velociraptor -v frontend
```

To start slave frontends you can use the --slave option:
```
velociraptor frontend -v --slave --config.frontend-bind-port 8001 --config.monitoring-bind-port 8004
```
Note that you can override arbitrary config options using the command line now. You will need this in order to specify dynamic ports for the master and slave (e.g. in kubernetes configs).

Notes:
*  Slaves connect to the master over the API port so if they are going to run on a different machine you will need to change the API.bind_address to "0.0.0.0"
* If master and slave are running on different machines you need to share the filesystem using EFS or NFS.
  • Loading branch information
scudette authored Apr 3, 2021
1 parent fda79a0 commit 4a5b2fb
Show file tree
Hide file tree
Showing 69 changed files with 3,228 additions and 1,984 deletions.
2 changes: 1 addition & 1 deletion actions/proto/vql.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion actions/proto/vql.proto
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ message VQLTypeMap {
}

message VQLResponse {
// Response is encoded in a json array of rows.
// DEPRECATED: Response is encoded in a json array of rows.
string Response = 1 [(sem_type) = {
description: "JSON encoded response.",
}];
Expand Down
72 changes: 3 additions & 69 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ import (
"www.velocidex.com/golang/velociraptor/server"
"www.velocidex.com/golang/velociraptor/services"
users "www.velocidex.com/golang/velociraptor/users"
"www.velocidex.com/golang/velociraptor/utils"
vql_subsystem "www.velocidex.com/golang/velociraptor/vql"
"www.velocidex.com/golang/vfilter"
)
Expand Down Expand Up @@ -445,7 +444,9 @@ func (self *ApiServer) ListClients(
result.Names = append(result.Names, client_id)
} else {
api_client, err := GetApiClient(
self.config, self.server_obj, client_id, false)
ctx, self.config,
self.server_obj, client_id,
false /* detailed */)
if err != nil {
continue
}
Expand Down Expand Up @@ -890,73 +891,6 @@ func (self *ApiServer) SetArtifactFile(
return &api_proto.APIResponse{}, nil
}

func (self *ApiServer) WriteEvent(
ctx context.Context,
in *actions_proto.VQLResponse) (*empty.Empty, error) {

// Get the TLS context from the peer and verify its
// certificate.
peer, ok := peer.FromContext(ctx)
if !ok {
return nil, status.Error(codes.InvalidArgument, "cant get peer info")
}

tlsInfo, ok := peer.AuthInfo.(credentials.TLSInfo)
if !ok {
return nil, status.Error(codes.InvalidArgument, "unable to get credentials")
}

// Authenticate API clients using certificates.
for _, peer_cert := range tlsInfo.State.PeerCertificates {
chains, err := peer_cert.Verify(
x509.VerifyOptions{Roots: self.ca_pool})
if err != nil {
return nil, err
}

if len(chains) == 0 {
return nil, status.Error(codes.InvalidArgument, "no chains verified")
}

peer_name := crypto.GetSubjectName(peer_cert)

token, err := acls.GetEffectivePolicy(self.config, peer_name)
if err != nil {
return nil, err
}

// Check that the principal is allowed to push to the queue.
ok, err := acls.CheckAccessWithToken(token, acls.PUBLISH, in.Query.Name)
if err != nil {
return nil, err
}

if !ok {
return nil, status.Error(codes.PermissionDenied,
"Permission denied: PUBLISH "+peer_name+" to "+in.Query.Name)
}

rows, err := utils.ParseJsonToDicts([]byte(in.Response))
if err != nil {
return nil, err
}

// Only return the first row
if true {
journal, err := services.GetJournal()
if err != nil {
return nil, err
}

err = journal.PushRowsToArtifact(self.config,
rows, in.Query.Name, peer_name, "")
return &empty.Empty{}, err
}
}

return nil, status.Error(codes.InvalidArgument, "no peer certs?")
}

func (self *ApiServer) Query(
in *actions_proto.VQLCollectorArgs,
stream api_proto.API_QueryServer) error {
Expand Down
10 changes: 5 additions & 5 deletions api/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ func StartFrontendHttps(

err := server.ListenAndServeTLS("", "")
if err != nil && err != http.ErrServerClosed {
server_obj.Error("Frontend server error", err)
server_obj.Error("Frontend server error %v", err)
}
}()

Expand All @@ -353,7 +353,7 @@ func StartFrontendHttps(

err := server.Shutdown(time_ctx)
if err != nil {
server_obj.Error("Frontend server error", err)
server_obj.Error("Frontend server error %v", err)
}
}()

Expand Down Expand Up @@ -398,7 +398,7 @@ func StartFrontendPlainHttp(

err := server.ListenAndServe()
if err != nil && err != http.ErrServerClosed {
server_obj.Error("Frontend server error", err)
server_obj.Error("Frontend server error %v", err)
}
}()

Expand All @@ -419,12 +419,12 @@ func StartFrontendPlainHttp(
if notifier != nil {
err := notifier.NotifyAllListeners(config_obj)
if err != nil {
server_obj.Error("Frontend server error", err)
server_obj.Error("Frontend server error %v", err)
}
}
err := server.Shutdown(time_ctx)
if err != nil {
server_obj.Error("Frontend server error", err)
server_obj.Error("Frontend server error %v", err)
}
}()

Expand Down
39 changes: 7 additions & 32 deletions api/clients.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,13 @@
package api

import (
"context"
"errors"
"net"
"os"
"regexp"
"strings"
"time"

"github.com/golang/protobuf/ptypes/empty"
context "golang.org/x/net/context"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"www.velocidex.com/golang/velociraptor/acls"
Expand All @@ -43,6 +41,7 @@ import (
)

func GetApiClient(
ctx context.Context,
config_obj *config_proto.Config,
server_obj *server.Server,
client_id string, detailed bool) (
Expand Down Expand Up @@ -110,40 +109,16 @@ func GetApiClient(
result.LastSeenAt = client_info.Ping
result.LastIp = client_info.IpAddress

// Update the time to now if the client is currently actually
// connected.
if server_obj != nil &&
services.GetNotifier().IsClientConnected(client_id) {
if server_obj != nil && detailed &&
// Wait up to 2 seconds to find out if clients are connected.
services.GetNotifier().IsClientConnected(ctx,
config_obj, client_id, 2) {
result.LastSeenAt = uint64(time.Now().UnixNano() / 1000)
}

remote_address := strings.Split(result.LastIp, ":")[0]
if _is_ip_in_ranges(remote_address, config_obj.GUI.InternalCidr) {
result.LastIpClass = api_proto.ApiClient_INTERNAL
} else if _is_ip_in_ranges(remote_address, config_obj.GUI.InternalCidr) {
result.LastIpClass = api_proto.ApiClient_VPN
} else {
result.LastIpClass = api_proto.ApiClient_EXTERNAL
}

return result, nil
}

func _is_ip_in_ranges(remote string, ranges []string) bool {
for _, ip_range := range ranges {
_, ipNet, err := net.ParseCIDR(ip_range)
if err != nil {
return false
}

if ipNet.Contains(net.ParseIP(remote)) {
return true
}
}

return false
}

func (self *ApiServer) GetClientMetadata(
ctx context.Context,
in *api_proto.GetClientRequest) (*api_proto.ClientMetadata, error) {
Expand Down Expand Up @@ -209,7 +184,7 @@ func (self *ApiServer) GetClient(
"User is not allowed to view clients.")
}

api_client, err := GetApiClient(
api_client, err := GetApiClient(ctx,
self.config,
self.server_obj,
in.ClientId,
Expand Down
155 changes: 155 additions & 0 deletions api/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
package api

import (
"crypto/x509"

"github.com/golang/protobuf/ptypes/empty"
context "golang.org/x/net/context"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/status"
"www.velocidex.com/golang/velociraptor/acls"
actions_proto "www.velocidex.com/golang/velociraptor/actions/proto"
api_proto "www.velocidex.com/golang/velociraptor/api/proto"
"www.velocidex.com/golang/velociraptor/crypto"
"www.velocidex.com/golang/velociraptor/services"
"www.velocidex.com/golang/velociraptor/utils"
)

func (self *ApiServer) PushEvents(
ctx context.Context,
in *api_proto.PushEventRequest) (*empty.Empty, error) {

// Get the TLS context from the peer and verify its
// certificate.
peer, ok := peer.FromContext(ctx)
if !ok {
return nil, status.Error(codes.InvalidArgument, "cant get peer info")
}

tlsInfo, ok := peer.AuthInfo.(credentials.TLSInfo)
if !ok {
return nil, status.Error(codes.InvalidArgument, "unable to get credentials")
}

// Authenticate API clients using certificates.
for _, peer_cert := range tlsInfo.State.PeerCertificates {
chains, err := peer_cert.Verify(
x509.VerifyOptions{Roots: self.ca_pool})
if err != nil {
return nil, err
}

if len(chains) == 0 {
return nil, status.Error(codes.InvalidArgument, "no chains verified")
}

peer_name := crypto.GetSubjectName(peer_cert)
if peer_name != self.config.Client.PinnedServerName {
token, err := acls.GetEffectivePolicy(self.config, peer_name)
if err != nil {
return nil, err
}

// Check that the principal is allowed to push to the queue.
ok, err := acls.CheckAccessWithToken(token, acls.PUBLISH, in.Artifact)
if err != nil {
return nil, err
}

if !ok {
return nil, status.Error(codes.PermissionDenied,
"Permission denied: PUBLISH "+peer_name+" to "+in.Artifact)
}
}

rows, err := utils.ParseJsonToDicts([]byte(in.Jsonl))
if err != nil {
return nil, err
}

// Only return the first row
if true {
journal, err := services.GetJournal()
if err != nil {
return nil, err
}

err = journal.PushRowsToArtifact(self.config,
rows, in.Artifact, in.ClientId, in.FlowId)
return &empty.Empty{}, err
}
}

return nil, status.Error(codes.InvalidArgument, "no peer certs?")
}

func (self *ApiServer) WriteEvent(
ctx context.Context,
in *actions_proto.VQLResponse) (*empty.Empty, error) {

// Get the TLS context from the peer and verify its
// certificate.
peer, ok := peer.FromContext(ctx)
if !ok {
return nil, status.Error(codes.InvalidArgument, "cant get peer info")
}

tlsInfo, ok := peer.AuthInfo.(credentials.TLSInfo)
if !ok {
return nil, status.Error(codes.InvalidArgument, "unable to get credentials")
}

// Authenticate API clients using certificates.
for _, peer_cert := range tlsInfo.State.PeerCertificates {
chains, err := peer_cert.Verify(
x509.VerifyOptions{Roots: self.ca_pool})
if err != nil {
return nil, err
}

if len(chains) == 0 {
return nil, status.Error(codes.InvalidArgument, "no chains verified")
}

peer_name := crypto.GetSubjectName(peer_cert)

token, err := acls.GetEffectivePolicy(self.config, peer_name)
if err != nil {
return nil, err
}

// Check that the principal is allowed to push to the queue.
ok, err := acls.CheckAccessWithToken(token,
acls.MACHINE_STATE, in.Query.Name)
if err != nil {
return nil, err
}

if !ok {
return nil, status.Error(codes.PermissionDenied,
"Permission denied: MACHINE_STATE "+
peer_name+" to "+in.Query.Name)
}

rows, err := utils.ParseJsonToDicts([]byte(in.Response))
if err != nil {
return nil, err
}

// Only return the first row
if true {
journal, err := services.GetJournal()
if err != nil {
return nil, err
}

err = journal.PushRowsToArtifact(self.config,
rows, in.Query.Name, peer_name, "")
return &empty.Empty{}, err
}
}

return nil, status.Error(codes.InvalidArgument, "no peer certs?")
}
Loading

0 comments on commit 4a5b2fb

Please sign in to comment.