Skip to content

Commit

Permalink
Refactor client info manager (Velocidex#1700)
Browse files Browse the repository at this point in the history
* Refactor client info manager

* Uses larger mutations in order to minimize number of updates.
* Flushes client stats less frequently to minimize IO

* Added test
  • Loading branch information
scudette authored Apr 5, 2022
1 parent fef8e91 commit 9eed302
Show file tree
Hide file tree
Showing 37 changed files with 1,456 additions and 1,033 deletions.
9 changes: 5 additions & 4 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,10 @@ import (

type ApiServer struct {
proto.UnimplementedAPIServer
config *config_proto.Config
server_obj *server.Server
ca_pool *x509.CertPool

config *config_proto.Config
server_obj *server.Server
ca_pool *x509.CertPool
wg *sync.WaitGroup
api_client_factory grpc_client.APIClientFactory
}

Expand Down Expand Up @@ -984,6 +984,7 @@ func startAPIServer(
server_obj: server_obj,
ca_pool: CA_Pool,
api_client_factory: grpc_client.GRPCAPIClient{},
wg: wg,
},
)
// Register reflection service.
Expand Down
792 changes: 401 additions & 391 deletions api/proto/api.pb.go

Large diffs are not rendered by default.

28 changes: 14 additions & 14 deletions api/proto/api.pb.gw.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions api/proto/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ message EventRequest {

// The node who is requesting the event stream.
string node = 2;

string watcher_name = 3;
}

message EventResponse {
Expand Down
54 changes: 33 additions & 21 deletions api/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,28 +63,35 @@ func streamEvents(

// The API service is running on the master only! This means
// the journal service is local.
output_chan, cancel := journal.Watch(ctx, in.Queue)
output_chan, cancel := journal.Watch(
ctx, in.Queue, "replication-"+in.WatcherName)
defer cancel()

for event := range output_chan {
serialized, err := json.Marshal(event)
if err != nil {
continue
}
response := &api_proto.EventResponse{
Jsonl: serialized,
}

timer := prometheus.NewTimer(
prometheus.ObserverFunc(func(v float64) {
replicationReceiveHistorgram.WithLabelValues("").Observe(v)
}))

err = stream.Send(response)
timer.ObserveDuration()

if err != nil {
continue
for {
select {
case <-ctx.Done():
return

case event := <-output_chan:
serialized, err := json.Marshal(event)
if err != nil {
continue
}
response := &api_proto.EventResponse{
Jsonl: serialized,
}

timer := prometheus.NewTimer(
prometheus.ObserverFunc(func(v float64) {
replicationReceiveHistorgram.WithLabelValues("").Observe(v)
}))

err = stream.Send(response)
timer.ObserveDuration()

if err != nil {
continue
}
}
}

Expand Down Expand Up @@ -140,8 +147,13 @@ func (self *ApiServer) WatchEvent(

// return the first good match
if true {
// Wait here for orderly shutdown of event streams.
self.wg.Add(1)
defer self.wg.Done()

// Cert is good enough for us, run the query.
return streamEvents(ctx, self.config, in, stream, peer_name)
return streamEvents(
ctx, self.config, in, stream, peer_name)
}
}

Expand Down
1 change: 1 addition & 0 deletions artifacts/definitions/Server/Import/PreviousReleases.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ parameters:
description: |
The Velociraptor Release to import.
type: choices
default: v0.6.3
choices:
- v0.6.0
- v0.6.1
Expand Down
Loading

0 comments on commit 9eed302

Please sign in to comment.