Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go/cmd/dolt/cli/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ func (hc SubCommandHandler) handleCommand(ctx context.Context, commandStr string
ret := cmd.Exec(ctx, commandStr, args, dEnv, cliCtx)

if evt != nil {
events.GlobalCollector.CloseEventAndAdd(evt)
events.GlobalCollector().CloseEventAndAdd(evt)
}

return ret
Expand Down
4 changes: 3 additions & 1 deletion go/cmd/dolt/commands/sqlserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -622,6 +622,8 @@ func newHeartbeatService(version string, dEnv *env.DoltEnv) *heartbeatService {
return &heartbeatService{} // will be defunct on Run()
}

events.SetGlobalCollector(events.NewCollector(version, emitter))

return &heartbeatService{
version: version,
eventEmitter: emitter,
Expand All @@ -647,7 +649,7 @@ func (h *heartbeatService) Run(ctx context.Context) {
return
case <-ticker.C:
t := events.NowTimestamp()
err := h.eventEmitter.LogEvents(h.version, []*eventsapi.ClientEvent{
err := h.eventEmitter.LogEvents(ctx, h.version, []*eventsapi.ClientEvent{
{
Id: uuid.New().String(),
StartTime: t,
Expand Down
25 changes: 15 additions & 10 deletions go/cmd/dolt/dolt.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,16 @@ func runMain() int {
return 1
}

strMetricsDisabled := dEnv.Config.GetStringOrDefault(config.MetricsDisabled, "false")
var metricsEmitter events.Emitter
metricsEmitter = events.NewFileEmitter(homeDir, dbfactory.DoltDir)
metricsDisabled, err := strconv.ParseBool(strMetricsDisabled)
if err != nil || metricsDisabled {
metricsEmitter = events.NullEmitter{}
}

events.SetGlobalCollector(events.NewCollector(Version, metricsEmitter))

if dEnv.CfgLoadErr != nil {
cli.PrintErrln(color.RedString("Failed to load the global config. %v", dEnv.CfgLoadErr))
return 1
Expand Down Expand Up @@ -495,7 +505,7 @@ func runMain() int {
return 1
}

defer emitUsageEvents(dEnv, homeDir, args)
defer emitUsageEvents(metricsEmitter, args)

if needsWriteAccess(subcommandName) {
err = reconfigIfTempFileMoveFails(dEnv)
Expand Down Expand Up @@ -770,16 +780,11 @@ func seedGlobalRand() {
// 1. The config key |metrics.disabled|, when set to |true|, disables all metrics emission
// 2. The environment key |DOLT_DISABLE_EVENT_FLUSH| allows writing events to disk but not sending them to the server.
// This is mostly used for testing.
func emitUsageEvents(dEnv *env.DoltEnv, homeDir string, args []string) {
metricsDisabled := dEnv.Config.GetStringOrDefault(config.MetricsDisabled, "false")
disabled, err := strconv.ParseBool(metricsDisabled)
if err != nil || disabled {
return
}

func emitUsageEvents(emitter events.Emitter, args []string) {
// write events
emitter := events.NewFileEmitter(homeDir, dbfactory.DoltDir)
_ = emitter.LogEvents(Version, events.GlobalCollector.Close())
collector := events.GlobalCollector()
ctx := context.Background()
_ = emitter.LogEvents(ctx, Version, collector.Close())

// flush events
if !eventFlushDisabled && len(args) > 0 && shouldFlushEvents(args[0]) {
Expand Down
2 changes: 1 addition & 1 deletion go/libraries/doltcore/dbfactory/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (fact DoltRemoteFactory) newChunkStore(ctx context.Context, nbf *types.Noms
return nil, err
}

opts := append(cfg.DialOptions, grpc.WithChainUnaryInterceptor(remotestorage.EventsUnaryClientInterceptor(events.GlobalCollector)))
opts := append(cfg.DialOptions, grpc.WithChainUnaryInterceptor(remotestorage.EventsUnaryClientInterceptor(events.GlobalCollector())))
opts = append(opts, grpc.WithChainUnaryInterceptor(remotestorage.RetryingUnaryClientInterceptor))

conn, err := grpc.Dial(cfg.Endpoint, opts...)
Expand Down
148 changes: 130 additions & 18 deletions go/libraries/events/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
package events

import (
"context"
"sync"
"sync/atomic"
"time"

"github.com/cenkalti/backoff/v4"
"github.com/denisbrodbeck/machineid"

eventsapi "github.com/dolthub/dolt/go/gen/proto/dolt/services/eventsapi/v1alpha1"
Expand All @@ -40,34 +42,62 @@ func getMachineID() string {
}

// GlobalCollector is an instance of a collector where all events should be sent via the CloseEventAndAdd function
var GlobalCollector = NewCollector()
var globalCollector = NewCollector("invalid", nil)
var globalMu *sync.Mutex = &sync.Mutex{}

func GlobalCollector() *Collector {
globalMu.Lock()
defer globalMu.Unlock()
return globalCollector
}

func SetGlobalCollector(c *Collector) {
globalMu.Lock()
defer globalMu.Unlock()
cur := globalCollector
globalCollector = c
toTransfer := cur.Close()
for _, e := range toTransfer {
globalCollector.evtCh <- e
}
}

const collChanBufferSize = 32
const maxBatchedEvents = 64

// Collector collects and stores Events later to be sent to an Emitter.
type Collector struct {
val *atomic.Value
wg *sync.WaitGroup
evtCh chan *eventsapi.ClientEvent
events []*eventsapi.ClientEvent
wg sync.WaitGroup
evtCh chan *eventsapi.ClientEvent
st *sendingThread
}

// NewCollector creates a new instance of a collector
func NewCollector() *Collector {
wg := &sync.WaitGroup{}
func NewCollector(version string, emitter Emitter) *Collector {
evtCh := make(chan *eventsapi.ClientEvent, collChanBufferSize)

c := &Collector{&atomic.Value{}, wg, evtCh}
c := &Collector{
evtCh: evtCh,
st: newSendingThread(version, emitter),
}

wg.Add(1)
c.st.start()
c.wg.Add(1)
go func() {
defer wg.Done()

var events []*eventsapi.ClientEvent
for evt := range evtCh {
events = append(events, evt)
defer c.wg.Done()
for evt := range c.evtCh {
c.events = append(c.events, evt)
if len(c.events) >= maxBatchedEvents {
c.st.batchCh <- c.events
c.events = nil
}
}

c.val.Store(events)
if len(c.events) > 0 {
c.st.batchCh <- c.events
c.events = nil
}
c.events = c.st.stop()
}()

return c
Expand All @@ -85,7 +115,89 @@ func (c *Collector) Close() []*eventsapi.ClientEvent {

c.wg.Wait()

interf := c.val.Load()
return c.events
}

type sendingThread struct {
logCtx context.Context
cancelF func()

batchCh chan []*eventsapi.ClientEvent
unsent []*eventsapi.ClientEvent
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like this is actually a local var?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Almost. Returned from stop() for any un-emitted things.

version string

emitter Emitter

wg sync.WaitGroup
}

func newSendingThread(version string, emitter Emitter) *sendingThread {
ctx, cancel := context.WithCancel(context.Background())
return &sendingThread{
logCtx: ctx,
cancelF: cancel,
batchCh: make(chan []*eventsapi.ClientEvent, 8),
version: version,
emitter: emitter,
}
}

func (s *sendingThread) start() {
s.wg.Add(1)
go s.run()
}

func (s *sendingThread) stop() []*eventsapi.ClientEvent {
s.cancelF()
close(s.batchCh)
s.wg.Wait()
return s.unsent
}

func (s *sendingThread) run() {
defer s.wg.Done()

var timer *time.Timer

return interf.([]*eventsapi.ClientEvent)
bo := backoff.NewExponentialBackOff()
bo.InitialInterval = time.Second
bo.MaxInterval = time.Minute
bo.MaxElapsedTime = 0

for {
var timerCh <-chan time.Time
if timer != nil {
timerCh = timer.C
}
select {
case batch, ok := <-s.batchCh:
if !ok {
if s.emitter != nil && len(s.unsent) > 0 {
err := s.emitter.LogEvents(s.logCtx, s.version, s.unsent)
if err == nil {
s.unsent = nil
}
}
return
}
s.unsent = append(s.unsent, batch...)
if s.emitter != nil {
if timer != nil && !timer.Stop() {
<-timer.C
timer.Reset(0)
} else {
timer = time.NewTimer(0)
}
}
case <-timerCh:
err := s.emitter.LogEvents(s.logCtx, s.version, s.unsent)
if err == nil {
s.unsent = nil
bo.Reset()
timer = nil
} else {
timer.Reset(bo.NextBackOff())
}
}
}
}
26 changes: 12 additions & 14 deletions go/libraries/events/emitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ const DefaultMetricsPort = "443"
// Emitter is an interface used for processing a batch of events
type Emitter interface {
// LogEvents emits a batch of events
LogEvents(version string, evts []*eventsapi.ClientEvent) error
LogEvents(ctx context.Context, version string, evts []*eventsapi.ClientEvent) error
// LogEventsRequest emits a batch of events wrapped in a request object, with other metadata
LogEventsRequest(ctx context.Context, req *eventsapi.LogEventsRequest) error
}
Expand All @@ -64,7 +64,7 @@ type Emitter interface {
type NullEmitter struct{}

// LogEvents takes a batch of events and processes them. In this case it just drops them
func (ne NullEmitter) LogEvents(version string, evts []*eventsapi.ClientEvent) error {
func (ne NullEmitter) LogEvents(ctx context.Context, version string, evts []*eventsapi.ClientEvent) error {
return nil
}

Expand All @@ -80,7 +80,7 @@ type WriterEmitter struct {

// LogEvents takes a batch of events and processes them. In this case the text encoding of the events is written to
// the writer
func (we WriterEmitter) LogEvents(version string, evts []*eventsapi.ClientEvent) error {
func (we WriterEmitter) LogEvents(ctx context.Context, version string, evts []*eventsapi.ClientEvent) error {
for i, evt := range evts {
header := fmt.Sprintf("event%03d: <\n", i)

Expand Down Expand Up @@ -124,8 +124,8 @@ func NewGrpcEmitter(conn *grpc.ClientConn) *GrpcEmitter {
return &GrpcEmitter{client}
}

func (em *GrpcEmitter) LogEvents(version string, evts []*eventsapi.ClientEvent) error {
ctx, cnclFn := context.WithDeadline(context.Background(), time.Now().Add(time.Second+500*time.Millisecond))
func (em *GrpcEmitter) LogEvents(ctx context.Context, version string, evts []*eventsapi.ClientEvent) error {
ctx, cnclFn := context.WithDeadline(ctx, time.Now().Add(time.Second+500*time.Millisecond))
defer cnclFn()

var plat eventsapi.Platform
Expand All @@ -138,25 +138,23 @@ func (em *GrpcEmitter) LogEvents(version string, evts []*eventsapi.ClientEvent)
plat = eventsapi.Platform_WINDOWS
}

req := eventsapi.LogEventsRequest{
req := &eventsapi.LogEventsRequest{
MachineId: getMachineID(),
Version: version,
Platform: plat,
Events: evts,
App: Application,
}

_, err := em.client.LogEvents(ctx, &req)

return err
return em.sendLogEventsRequest(ctx, req)
}

func (em *GrpcEmitter) LogEventsRequest(ctx context.Context, req *eventsapi.LogEventsRequest) error {
return em.SendLogEventsRequest(ctx, req)
return em.sendLogEventsRequest(ctx, req)
}

// SendLogEventsRequest sends a request using the grpc client
func (em *GrpcEmitter) SendLogEventsRequest(ctx context.Context, req *eventsapi.LogEventsRequest) error {
func (em *GrpcEmitter) sendLogEventsRequest(ctx context.Context, req *eventsapi.LogEventsRequest) error {
_, err := em.client.LogEvents(ctx, req)
if err != nil {
return err
Expand All @@ -177,7 +175,7 @@ func NewFileEmitter(userHomeDir string, doltDir string) *FileEmitter {
}

// LogEvents implements the Emitter interface and writes events requests to files
func (fe *FileEmitter) LogEvents(version string, evts []*eventsapi.ClientEvent) error {
func (fe *FileEmitter) LogEvents(ctx context.Context, version string, evts []*eventsapi.ClientEvent) error {
if err := fe.fbp.WriteEvents(version, evts); err != nil {
return err
}
Expand All @@ -198,10 +196,10 @@ type LoggerEmitter struct {
logLevel logrus.Level
}

func (l LoggerEmitter) LogEvents(version string, evts []*eventsapi.ClientEvent) error {
func (l LoggerEmitter) LogEvents(ctx context.Context, version string, evts []*eventsapi.ClientEvent) error {
sb := &strings.Builder{}
wr := WriterEmitter{Wr: sb}
err := wr.LogEvents(version, evts)
err := wr.LogEvents(ctx, version, evts)
if err != nil {
return err
}
Expand Down
Loading