forked from Velocidex/velociraptor
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathreplication.go
147 lines (124 loc) · 3.67 KB
/
replication.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
package api
import (
"fmt"
"github.com/Velocidex/ordereddict"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/sirupsen/logrus"
context "golang.org/x/net/context"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"www.velocidex.com/golang/velociraptor/acls"
api_proto "www.velocidex.com/golang/velociraptor/api/proto"
config_proto "www.velocidex.com/golang/velociraptor/config/proto"
"www.velocidex.com/golang/velociraptor/json"
"www.velocidex.com/golang/velociraptor/logging"
"www.velocidex.com/golang/velociraptor/services"
)
var (
replicationReceiveHistorgram = promauto.NewHistogramVec(
prometheus.HistogramOpts{
Name: "replication_master_send_latency",
Help: "Latency for the master to send replication messages to the minion.",
Buckets: prometheus.LinearBuckets(0.1, 1, 10),
},
[]string{"status"},
)
)
func streamEvents(
ctx context.Context,
config_obj *config_proto.Config,
in *api_proto.EventRequest,
stream api_proto.API_WatchEventServer,
peer_name string) (err error) {
logger := logging.GetLogger(config_obj, &logging.APICmponent)
logger.WithFields(logrus.Fields{
"arg": in,
"user": peer_name,
}).Info("Replicating Events")
journal, err := services.GetJournal(config_obj)
if err != nil {
return err
}
// Special case this so the caller can immediately initialize the
// watchers.
if in.Queue == "Server.Internal.MasterRegistrations" {
result := ordereddict.NewDict().Set("Events", journal.GetWatchers())
serialized, _ := result.MarshalJSON()
stream.Send(&api_proto.EventResponse{
Jsonl: serialized,
})
}
// The API service is running on the master only! This means
// the journal service is local.
output_chan, cancel := journal.Watch(
ctx, in.Queue, "replication-"+in.WatcherName)
defer cancel()
for {
select {
case <-ctx.Done():
return
case event := <-output_chan:
serialized, err := json.Marshal(event)
if err != nil {
continue
}
response := &api_proto.EventResponse{
Jsonl: serialized,
}
timer := prometheus.NewTimer(
prometheus.ObserverFunc(func(v float64) {
replicationReceiveHistorgram.WithLabelValues("").Observe(v)
}))
err = stream.Send(response)
timer.ObserveDuration()
if err != nil {
continue
}
}
}
return nil
}
// NOTE: The API server is only running on the master node.
func (self *ApiServer) WatchEvent(
in *api_proto.EventRequest,
stream api_proto.API_WatchEventServer) error {
// Get the TLS context from the peer and verify its
// certificate.
ctx := stream.Context()
users := services.GetUserManager()
user_record, config_obj, err := users.GetUserFromContext(ctx)
if err != nil {
return err
}
peer_name := user_record.Name
// Check that the principal is allowed to issue queries.
permissions := acls.ANY_QUERY
ok, err := services.CheckAccess(config_obj, peer_name, permissions)
if err != nil {
return status.Error(codes.PermissionDenied,
fmt.Sprintf("User %v is not allowed to run queries.",
peer_name))
}
if !ok {
return status.Error(codes.PermissionDenied, fmt.Sprintf(
"Permission denied: User %v requires permission %v to run queries",
peer_name, permissions))
}
// Wait here for orderly shutdown of event streams.
self.wg.Add(1)
defer self.wg.Done()
// The call can access the datastore from any org becuase it is a
// server->server call.
org_manager, err := services.GetOrgManager()
if err != nil {
return err
}
org_config_obj, err := org_manager.GetOrgConfig(in.OrgId)
if err != nil {
return err
}
// Cert is good enough for us, run the query.
return streamEvents(
ctx, org_config_obj, in, stream, peer_name)
}