From 3bffca89d373e5a6710e63724fdcd81984cbc31e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?M=C3=A1rton=20Elek?= Date: Thu, 15 Feb 2024 14:50:38 +0100 Subject: [PATCH] support sending eventkit events in batches... We have a specific Destination implementation which directly saves the Events to BQ (instead of sending via UDP to a daemon, which saves them to BQ). But this endpoint doesn't have any queue. In this patch, I copy pasted the queue logic from the UDP client and made a generic BatchQueue destination. Change-Id: I25ad4c3c2c9a2a261985b9efab1cd12f9acec9cc --- client.go | 16 +-- eventkitd-bigquery/bigquery/batch.go | 107 +++++++++++++++++++++ eventkitd-bigquery/bigquery/batch_test.go | 44 +++++++++ eventkitd-bigquery/bigquery/destination.go | 50 +++++----- registry.go | 6 +- 5 files changed, 193 insertions(+), 30 deletions(-) create mode 100644 eventkitd-bigquery/bigquery/batch.go create mode 100644 eventkitd-bigquery/bigquery/batch_test.go diff --git a/client.go b/client.go index c747922..89f3905 100644 --- a/client.go +++ b/client.go @@ -45,6 +45,8 @@ type UDPClient struct { droppedEvents atomic.Int64 } +var _ Destination = &UDPClient{} + func NewUDPClient(application, version, instance, addr string) *UDPClient { c := &UDPClient{ Application: application, @@ -248,13 +250,15 @@ func (c *UDPClient) send(packet *outgoingPacket, addr string) (err error) { return err } -func (c *UDPClient) Submit(event *Event) { +func (c *UDPClient) Submit(events ...*Event) { c.init() - select { - case c.submitQueue <- event: - return - default: - c.droppedEvents.Add(1) + for _, event := range events { + select { + case c.submitQueue <- event: + return + default: + c.droppedEvents.Add(1) + } } } diff --git a/eventkitd-bigquery/bigquery/batch.go b/eventkitd-bigquery/bigquery/batch.go new file mode 100644 index 0000000..0b72f8e --- /dev/null +++ b/eventkitd-bigquery/bigquery/batch.go @@ -0,0 +1,107 @@ +package bigquery + +import ( + "context" + "sync" + "sync/atomic" + "time" + + "golang.org/x/sync/errgroup" + + "storj.io/eventkit" + "storj.io/eventkit/utils" +) + +// BatchQueue collects events and send them in batches. +type BatchQueue struct { + batchThreshold int + flushInterval time.Duration + submitQueue chan *eventkit.Event + target eventkit.Destination + mu sync.Mutex + events []*eventkit.Event + droppedEvents atomic.Int64 +} + +// NewBatchQueue creates a new batchQueue. It sends out the received events in batch. Either after the flushInterval is +// expired or when there are more than batchSize element in the queue. +func NewBatchQueue(target eventkit.Destination, queueSize int, batchSize int, flushInterval time.Duration) *BatchQueue { + c := &BatchQueue{ + submitQueue: make(chan *eventkit.Event, queueSize), + batchThreshold: batchSize, + events: make([]*eventkit.Event, 0), + flushInterval: flushInterval, + target: target, + } + return c +} + +// Run implements Destination. +func (c *BatchQueue) Run(ctx context.Context) { + ticker := utils.NewJitteredTicker(c.flushInterval) + var background errgroup.Group + defer func() { _ = background.Wait() }() + background.Go(func() error { + c.target.Run(ctx) + return nil + }) + background.Go(func() error { + ticker.Run(ctx) + return nil + }) + + sendAndReset := func() { + c.mu.Lock() + eventsToSend := c.events + c.events = make([]*eventkit.Event, 0) + c.mu.Unlock() + + c.target.Submit(eventsToSend...) + } + + for { + if drops := c.droppedEvents.Load(); drops > 0 { + mon.Counter("dropped_events").Inc(drops) + c.droppedEvents.Add(-drops) + } + + select { + case em := <-c.submitQueue: + if c.addEvent(em) { + sendAndReset() + } + case <-ticker.C: + if len(c.events) > 0 { + sendAndReset() + } + case <-ctx.Done(): + left := len(c.submitQueue) + for i := 0; i < left; i++ { + if c.addEvent(<-c.submitQueue) { + sendAndReset() + } + } + if len(c.events) > 0 { + c.target.Submit(c.events...) + } + return + } + } +} + +func (c *BatchQueue) addEvent(ev *eventkit.Event) (full bool) { + c.mu.Lock() + defer c.mu.Unlock() + c.events = append(c.events, ev) + return len(c.events) >= c.batchThreshold +} + +// Submit implements Destination. +func (c *BatchQueue) Submit(event *eventkit.Event) { + select { + case c.submitQueue <- event: + return + default: + c.droppedEvents.Add(1) + } +} diff --git a/eventkitd-bigquery/bigquery/batch_test.go b/eventkitd-bigquery/bigquery/batch_test.go new file mode 100644 index 0000000..0468889 --- /dev/null +++ b/eventkitd-bigquery/bigquery/batch_test.go @@ -0,0 +1,44 @@ +package bigquery + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "storj.io/eventkit" +) + +func TestBatchQueue(t *testing.T) { + m := &mockDestination{} + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + queue := NewBatchQueue(m, 1000, 10, 1*time.Hour) + go func() { + queue.Run(ctx) + }() + for i := 0; i < 25; i++ { + queue.Submit(&eventkit.Event{ + Name: "foobar", + }) + } + require.Eventually(t, func() bool { + return len(m.events) == 2 + }, 5*time.Second, 10*time.Millisecond) + require.Len(t, m.events[0], 10) + require.Len(t, m.events[1], 10) +} + +type mockDestination struct { + events [][]*eventkit.Event +} + +func (m *mockDestination) Submit(event ...*eventkit.Event) { + m.events = append(m.events, event) +} + +func (m *mockDestination) Run(ctx context.Context) { +} + +var _ eventkit.Destination = &mockDestination{} diff --git a/eventkitd-bigquery/bigquery/destination.go b/eventkitd-bigquery/bigquery/destination.go index 1254b2f..a724bd0 100644 --- a/eventkitd-bigquery/bigquery/destination.go +++ b/eventkitd-bigquery/bigquery/destination.go @@ -36,31 +36,35 @@ func NewBigQueryDestination(ctx context.Context, appName string, project string, return res, nil } -func (b *BigQueryDestination) Submit(event *eventkit.Event) { - var tags []*pb.Tag - for _, t := range event.Tags { - tags = append(tags, &pb.Tag{ - Key: t.Key, - Value: t.Value, - }) - } - records := map[string][]*Record{ - event.Name: { - { - Application: Application{ - Name: b.appName, - Version: "0.0.1", - }, - Source: Source{ - Instance: b.SourceInstance, - Address: "0.0.0.0", - }, - ReceivedAt: time.Now(), - Timestamp: event.Timestamp, - Tags: tags, +// Submit implements Destination. +func (b *BigQueryDestination) Submit(events ...*eventkit.Event) { + records := map[string][]*Record{} + for _, event := range events { + var tags []*pb.Tag + for _, t := range event.Tags { + tags = append(tags, &pb.Tag{ + Key: t.Key, + Value: t.Value, + }) + } + if _, found := records[event.Name]; !found { + records[event.Name] = make([]*Record, 0) + } + records[event.Name] = append(records[event.Name], &Record{ + Application: Application{ + Name: b.appName, + Version: "0.0.1", + }, + Source: Source{ + Instance: b.SourceInstance, + Address: "0.0.0.0", }, - }, + ReceivedAt: time.Now(), + Timestamp: event.Timestamp, + Tags: tags, + }) } + err := b.client.SaveRecord(context.Background(), records) if err != nil { fmt.Println("WARN: Couldn't save eventkit record to BQ: ", err) diff --git a/registry.go b/registry.go index 925818e..8876b09 100644 --- a/registry.go +++ b/registry.go @@ -13,10 +13,14 @@ type Event struct { } type Destination interface { - Submit(*Event) + Submit(...*Event) Run(ctx context.Context) } +type BatchDestination interface { + SubmitBatch(*[]Event) +} + type Registry struct { dests []Destination }