Skip to content

Commit 0c5dc78

Browse files
authored
Merge pull request #7468 from dolthub/aaron/emit-metrics-while-running
go/libraries/events: Restructure events collection so that we can send events while the dolt process is running.
2 parents cbe6b0c + 82a20f2 commit 0c5dc78

File tree

7 files changed

+240
-46
lines changed

7 files changed

+240
-46
lines changed

go/cmd/dolt/cli/command.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,7 @@ func (hc SubCommandHandler) handleCommand(ctx context.Context, commandStr string
245245
ret := cmd.Exec(ctx, commandStr, args, dEnv, cliCtx)
246246

247247
if evt != nil {
248-
events.GlobalCollector.CloseEventAndAdd(evt)
248+
events.GlobalCollector().CloseEventAndAdd(evt)
249249
}
250250

251251
return ret

go/cmd/dolt/commands/sqlserver/server.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -622,6 +622,8 @@ func newHeartbeatService(version string, dEnv *env.DoltEnv) *heartbeatService {
622622
return &heartbeatService{} // will be defunct on Run()
623623
}
624624

625+
events.SetGlobalCollector(events.NewCollector(version, emitter))
626+
625627
return &heartbeatService{
626628
version: version,
627629
eventEmitter: emitter,
@@ -647,7 +649,7 @@ func (h *heartbeatService) Run(ctx context.Context) {
647649
return
648650
case <-ticker.C:
649651
t := events.NowTimestamp()
650-
err := h.eventEmitter.LogEvents(h.version, []*eventsapi.ClientEvent{
652+
err := h.eventEmitter.LogEvents(ctx, h.version, []*eventsapi.ClientEvent{
651653
{
652654
Id: uuid.New().String(),
653655
StartTime: t,

go/cmd/dolt/dolt.go

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -477,6 +477,16 @@ func runMain() int {
477477
return 1
478478
}
479479

480+
strMetricsDisabled := dEnv.Config.GetStringOrDefault(config.MetricsDisabled, "false")
481+
var metricsEmitter events.Emitter
482+
metricsEmitter = events.NewFileEmitter(homeDir, dbfactory.DoltDir)
483+
metricsDisabled, err := strconv.ParseBool(strMetricsDisabled)
484+
if err != nil || metricsDisabled {
485+
metricsEmitter = events.NullEmitter{}
486+
}
487+
488+
events.SetGlobalCollector(events.NewCollector(Version, metricsEmitter))
489+
480490
if dEnv.CfgLoadErr != nil {
481491
cli.PrintErrln(color.RedString("Failed to load the global config. %v", dEnv.CfgLoadErr))
482492
return 1
@@ -519,7 +529,7 @@ func runMain() int {
519529
return 1
520530
}
521531

522-
defer emitUsageEvents(dEnv, homeDir, args)
532+
defer emitUsageEvents(metricsEmitter, args)
523533

524534
if needsWriteAccess(subcommandName) {
525535
err = reconfigIfTempFileMoveFails(dEnv)
@@ -794,16 +804,11 @@ func seedGlobalRand() {
794804
// 1. The config key |metrics.disabled|, when set to |true|, disables all metrics emission
795805
// 2. The environment key |DOLT_DISABLE_EVENT_FLUSH| allows writing events to disk but not sending them to the server.
796806
// This is mostly used for testing.
797-
func emitUsageEvents(dEnv *env.DoltEnv, homeDir string, args []string) {
798-
metricsDisabled := dEnv.Config.GetStringOrDefault(config.MetricsDisabled, "false")
799-
disabled, err := strconv.ParseBool(metricsDisabled)
800-
if err != nil || disabled {
801-
return
802-
}
803-
807+
func emitUsageEvents(emitter events.Emitter, args []string) {
804808
// write events
805-
emitter := events.NewFileEmitter(homeDir, dbfactory.DoltDir)
806-
_ = emitter.LogEvents(Version, events.GlobalCollector.Close())
809+
collector := events.GlobalCollector()
810+
ctx := context.Background()
811+
_ = emitter.LogEvents(ctx, Version, collector.Close())
807812

808813
// flush events
809814
if !eventFlushDisabled && len(args) > 0 && shouldFlushEvents(args[0]) {

go/libraries/doltcore/dbfactory/grpc.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ func (fact DoltRemoteFactory) newChunkStore(ctx context.Context, nbf *types.Noms
117117
return nil, err
118118
}
119119

120-
opts := append(cfg.DialOptions, grpc.WithChainUnaryInterceptor(remotestorage.EventsUnaryClientInterceptor(events.GlobalCollector)))
120+
opts := append(cfg.DialOptions, grpc.WithChainUnaryInterceptor(remotestorage.EventsUnaryClientInterceptor(events.GlobalCollector())))
121121
opts = append(opts, grpc.WithChainUnaryInterceptor(remotestorage.RetryingUnaryClientInterceptor))
122122

123123
conn, err := grpc.Dial(cfg.Endpoint, opts...)

go/libraries/events/collector.go

Lines changed: 130 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,11 @@
1515
package events
1616

1717
import (
18+
"context"
1819
"sync"
19-
"sync/atomic"
20+
"time"
2021

22+
"github.com/cenkalti/backoff/v4"
2123
"github.com/denisbrodbeck/machineid"
2224

2325
eventsapi "github.com/dolthub/dolt/go/gen/proto/dolt/services/eventsapi/v1alpha1"
@@ -40,34 +42,62 @@ func getMachineID() string {
4042
}
4143

4244
// GlobalCollector is an instance of a collector where all events should be sent via the CloseEventAndAdd function
43-
var GlobalCollector = NewCollector()
45+
var globalCollector = NewCollector("invalid", nil)
46+
var globalMu *sync.Mutex = &sync.Mutex{}
47+
48+
func GlobalCollector() *Collector {
49+
globalMu.Lock()
50+
defer globalMu.Unlock()
51+
return globalCollector
52+
}
53+
54+
func SetGlobalCollector(c *Collector) {
55+
globalMu.Lock()
56+
defer globalMu.Unlock()
57+
cur := globalCollector
58+
globalCollector = c
59+
toTransfer := cur.Close()
60+
for _, e := range toTransfer {
61+
globalCollector.evtCh <- e
62+
}
63+
}
4464

4565
const collChanBufferSize = 32
66+
const maxBatchedEvents = 64
4667

4768
// Collector collects and stores Events later to be sent to an Emitter.
4869
type Collector struct {
49-
val *atomic.Value
50-
wg *sync.WaitGroup
51-
evtCh chan *eventsapi.ClientEvent
70+
events []*eventsapi.ClientEvent
71+
wg sync.WaitGroup
72+
evtCh chan *eventsapi.ClientEvent
73+
st *sendingThread
5274
}
5375

5476
// NewCollector creates a new instance of a collector
55-
func NewCollector() *Collector {
56-
wg := &sync.WaitGroup{}
77+
func NewCollector(version string, emitter Emitter) *Collector {
5778
evtCh := make(chan *eventsapi.ClientEvent, collChanBufferSize)
5879

59-
c := &Collector{&atomic.Value{}, wg, evtCh}
80+
c := &Collector{
81+
evtCh: evtCh,
82+
st: newSendingThread(version, emitter),
83+
}
6084

61-
wg.Add(1)
85+
c.st.start()
86+
c.wg.Add(1)
6287
go func() {
63-
defer wg.Done()
64-
65-
var events []*eventsapi.ClientEvent
66-
for evt := range evtCh {
67-
events = append(events, evt)
88+
defer c.wg.Done()
89+
for evt := range c.evtCh {
90+
c.events = append(c.events, evt)
91+
if len(c.events) >= maxBatchedEvents {
92+
c.st.batchCh <- c.events
93+
c.events = nil
94+
}
6895
}
69-
70-
c.val.Store(events)
96+
if len(c.events) > 0 {
97+
c.st.batchCh <- c.events
98+
c.events = nil
99+
}
100+
c.events = c.st.stop()
71101
}()
72102

73103
return c
@@ -85,7 +115,89 @@ func (c *Collector) Close() []*eventsapi.ClientEvent {
85115

86116
c.wg.Wait()
87117

88-
interf := c.val.Load()
118+
return c.events
119+
}
120+
121+
type sendingThread struct {
122+
logCtx context.Context
123+
cancelF func()
124+
125+
batchCh chan []*eventsapi.ClientEvent
126+
unsent []*eventsapi.ClientEvent
127+
version string
128+
129+
emitter Emitter
130+
131+
wg sync.WaitGroup
132+
}
133+
134+
func newSendingThread(version string, emitter Emitter) *sendingThread {
135+
ctx, cancel := context.WithCancel(context.Background())
136+
return &sendingThread{
137+
logCtx: ctx,
138+
cancelF: cancel,
139+
batchCh: make(chan []*eventsapi.ClientEvent, 8),
140+
version: version,
141+
emitter: emitter,
142+
}
143+
}
144+
145+
func (s *sendingThread) start() {
146+
s.wg.Add(1)
147+
go s.run()
148+
}
149+
150+
func (s *sendingThread) stop() []*eventsapi.ClientEvent {
151+
s.cancelF()
152+
close(s.batchCh)
153+
s.wg.Wait()
154+
return s.unsent
155+
}
156+
157+
func (s *sendingThread) run() {
158+
defer s.wg.Done()
159+
160+
var timer *time.Timer
89161

90-
return interf.([]*eventsapi.ClientEvent)
162+
bo := backoff.NewExponentialBackOff()
163+
bo.InitialInterval = time.Second
164+
bo.MaxInterval = time.Minute
165+
bo.MaxElapsedTime = 0
166+
167+
for {
168+
var timerCh <-chan time.Time
169+
if timer != nil {
170+
timerCh = timer.C
171+
}
172+
select {
173+
case batch, ok := <-s.batchCh:
174+
if !ok {
175+
if s.emitter != nil && len(s.unsent) > 0 {
176+
err := s.emitter.LogEvents(s.logCtx, s.version, s.unsent)
177+
if err == nil {
178+
s.unsent = nil
179+
}
180+
}
181+
return
182+
}
183+
s.unsent = append(s.unsent, batch...)
184+
if s.emitter != nil {
185+
if timer != nil && !timer.Stop() {
186+
<-timer.C
187+
timer.Reset(0)
188+
} else {
189+
timer = time.NewTimer(0)
190+
}
191+
}
192+
case <-timerCh:
193+
err := s.emitter.LogEvents(s.logCtx, s.version, s.unsent)
194+
if err == nil {
195+
s.unsent = nil
196+
bo.Reset()
197+
timer = nil
198+
} else {
199+
timer.Reset(bo.NextBackOff())
200+
}
201+
}
202+
}
91203
}

go/libraries/events/emitter.go

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ const DefaultMetricsPort = "443"
5555
// Emitter is an interface used for processing a batch of events
5656
type Emitter interface {
5757
// LogEvents emits a batch of events
58-
LogEvents(version string, evts []*eventsapi.ClientEvent) error
58+
LogEvents(ctx context.Context, version string, evts []*eventsapi.ClientEvent) error
5959
// LogEventsRequest emits a batch of events wrapped in a request object, with other metadata
6060
LogEventsRequest(ctx context.Context, req *eventsapi.LogEventsRequest) error
6161
}
@@ -64,7 +64,7 @@ type Emitter interface {
6464
type NullEmitter struct{}
6565

6666
// LogEvents takes a batch of events and processes them. In this case it just drops them
67-
func (ne NullEmitter) LogEvents(version string, evts []*eventsapi.ClientEvent) error {
67+
func (ne NullEmitter) LogEvents(ctx context.Context, version string, evts []*eventsapi.ClientEvent) error {
6868
return nil
6969
}
7070

@@ -80,7 +80,7 @@ type WriterEmitter struct {
8080

8181
// LogEvents takes a batch of events and processes them. In this case the text encoding of the events is written to
8282
// the writer
83-
func (we WriterEmitter) LogEvents(version string, evts []*eventsapi.ClientEvent) error {
83+
func (we WriterEmitter) LogEvents(ctx context.Context, version string, evts []*eventsapi.ClientEvent) error {
8484
for i, evt := range evts {
8585
header := fmt.Sprintf("event%03d: <\n", i)
8686

@@ -124,8 +124,8 @@ func NewGrpcEmitter(conn *grpc.ClientConn) *GrpcEmitter {
124124
return &GrpcEmitter{client}
125125
}
126126

127-
func (em *GrpcEmitter) LogEvents(version string, evts []*eventsapi.ClientEvent) error {
128-
ctx, cnclFn := context.WithDeadline(context.Background(), time.Now().Add(time.Second+500*time.Millisecond))
127+
func (em *GrpcEmitter) LogEvents(ctx context.Context, version string, evts []*eventsapi.ClientEvent) error {
128+
ctx, cnclFn := context.WithDeadline(ctx, time.Now().Add(time.Second+500*time.Millisecond))
129129
defer cnclFn()
130130

131131
var plat eventsapi.Platform
@@ -138,25 +138,23 @@ func (em *GrpcEmitter) LogEvents(version string, evts []*eventsapi.ClientEvent)
138138
plat = eventsapi.Platform_WINDOWS
139139
}
140140

141-
req := eventsapi.LogEventsRequest{
141+
req := &eventsapi.LogEventsRequest{
142142
MachineId: getMachineID(),
143143
Version: version,
144144
Platform: plat,
145145
Events: evts,
146146
App: Application,
147147
}
148148

149-
_, err := em.client.LogEvents(ctx, &req)
150-
151-
return err
149+
return em.sendLogEventsRequest(ctx, req)
152150
}
153151

154152
func (em *GrpcEmitter) LogEventsRequest(ctx context.Context, req *eventsapi.LogEventsRequest) error {
155-
return em.SendLogEventsRequest(ctx, req)
153+
return em.sendLogEventsRequest(ctx, req)
156154
}
157155

158156
// SendLogEventsRequest sends a request using the grpc client
159-
func (em *GrpcEmitter) SendLogEventsRequest(ctx context.Context, req *eventsapi.LogEventsRequest) error {
157+
func (em *GrpcEmitter) sendLogEventsRequest(ctx context.Context, req *eventsapi.LogEventsRequest) error {
160158
_, err := em.client.LogEvents(ctx, req)
161159
if err != nil {
162160
return err
@@ -177,7 +175,7 @@ func NewFileEmitter(userHomeDir string, doltDir string) *FileEmitter {
177175
}
178176

179177
// LogEvents implements the Emitter interface and writes events requests to files
180-
func (fe *FileEmitter) LogEvents(version string, evts []*eventsapi.ClientEvent) error {
178+
func (fe *FileEmitter) LogEvents(ctx context.Context, version string, evts []*eventsapi.ClientEvent) error {
181179
if err := fe.fbp.WriteEvents(version, evts); err != nil {
182180
return err
183181
}
@@ -198,10 +196,10 @@ type LoggerEmitter struct {
198196
logLevel logrus.Level
199197
}
200198

201-
func (l LoggerEmitter) LogEvents(version string, evts []*eventsapi.ClientEvent) error {
199+
func (l LoggerEmitter) LogEvents(ctx context.Context, version string, evts []*eventsapi.ClientEvent) error {
202200
sb := &strings.Builder{}
203201
wr := WriterEmitter{Wr: sb}
204-
err := wr.LogEvents(version, evts)
202+
err := wr.LogEvents(ctx, version, evts)
205203
if err != nil {
206204
return err
207205
}

0 commit comments

Comments
 (0)