Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 109 additions & 0 deletions grpc/batch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package grpc

import (
"time"

"github.com/netobserv/loki-client-go/pkg/logproto"
"github.com/prometheus/common/model"
)

// entry represents a log entry with tenant and label information
type entry struct {
tenantID string
labels model.LabelSet
logproto.Entry
}

// batch holds pending log streams waiting to be sent to Loki via GRPC.
// Similar to HTTP batch but optimized for GRPC operations.
type batch struct {
streams map[string]*logproto.Stream
bytes int
createdAt time.Time
tenantID string // GRPC batches are per-tenant for connection management
}

// newBatch creates a new batch for a specific tenant
func newBatch(tenantID string, entries ...entry) *batch {
b := &batch{
streams: map[string]*logproto.Stream{},
bytes: 0,
createdAt: time.Now(),
tenantID: tenantID,
}

// Add entries to the batch
for _, entry := range entries {
b.add(entry)
}

return b
}

// add an entry to the batch
func (b *batch) add(entry entry) {
b.bytes += len(entry.Line)

// Append the entry to an already existing stream (if any)
labels := entry.labels.String()
if stream, ok := b.streams[labels]; ok {
stream.Entries = append(stream.Entries, entry.Entry)
return
}

// Add the entry as a new stream
b.streams[labels] = &logproto.Stream{
Labels: labels,
Entries: []logproto.Entry{entry.Entry},
}
}

// sizeBytes returns the current batch size in bytes
func (b *batch) sizeBytes() int {
return b.bytes
}

// sizeBytesAfter returns the size of the batch after the input entry
// will be added to the batch itself
func (b *batch) sizeBytesAfter(entry entry) int {
return b.bytes + len(entry.Line)
}

// age of the batch since its creation
func (b *batch) age() time.Duration {
return time.Since(b.createdAt)
}

// createPushRequest creates a push request from the batch
func (b *batch) createPushRequest() (*logproto.PushRequest, int) {
req := &logproto.PushRequest{
Streams: make([]logproto.Stream, 0, len(b.streams)),
}

entriesCount := 0
for _, stream := range b.streams {
req.Streams = append(req.Streams, *stream)
entriesCount += len(stream.Entries)
}

return req, entriesCount
}

// isEmpty returns true if the batch has no entries
func (b *batch) isEmpty() bool {
return len(b.streams) == 0
}

// streamCount returns the number of streams in the batch
func (b *batch) streamCount() int {
return len(b.streams)
}

// entryCount returns the total number of entries across all streams
func (b *batch) entryCount() int {
count := 0
for _, stream := range b.streams {
count += len(stream.Entries)
}
return count
}
198 changes: 198 additions & 0 deletions grpc/batch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
package grpc

import (
"testing"
"time"

"github.com/netobserv/loki-client-go/pkg/logproto"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestNewBatch(t *testing.T) {
tenantID := "test-tenant"

// Test empty batch
b := newBatch(tenantID)
assert.Equal(t, tenantID, b.tenantID)
assert.Equal(t, 0, b.bytes)
assert.True(t, b.isEmpty())
assert.Equal(t, 0, b.streamCount())
assert.Equal(t, 0, b.entryCount())

// Test batch with initial entries
entry1 := entry{
tenantID: tenantID,
labels: model.LabelSet{"job": "test"},
Entry: logproto.Entry{
Timestamp: time.Now(),
Line: "test log line 1",
},
}

entry2 := entry{
tenantID: tenantID,
labels: model.LabelSet{"job": "test"},
Entry: logproto.Entry{
Timestamp: time.Now(),
Line: "test log line 2",
},
}

b2 := newBatch(tenantID, entry1, entry2)
assert.Equal(t, tenantID, b2.tenantID)
assert.Equal(t, len("test log line 1")+len("test log line 2"), b2.bytes)
assert.False(t, b2.isEmpty())
assert.Equal(t, 1, b2.streamCount()) // Same labels, so same stream
assert.Equal(t, 2, b2.entryCount())
}

func TestBatchAdd(t *testing.T) {
tenantID := "test-tenant"
b := newBatch(tenantID)

entry := entry{
tenantID: tenantID,
labels: model.LabelSet{"job": "test", "instance": "localhost"},
Entry: logproto.Entry{
Timestamp: time.Now(),
Line: "test log line",
},
}

// Add first entry
b.add(entry)
assert.Equal(t, len("test log line"), b.bytes)
assert.Equal(t, 1, b.streamCount())
assert.Equal(t, 1, b.entryCount())

// Add entry with same labels (should go to same stream)
entry2 := entry
entry2.Line = "another line"
b.add(entry2)
assert.Equal(t, len("test log line")+len("another line"), b.bytes)
assert.Equal(t, 1, b.streamCount())
assert.Equal(t, 2, b.entryCount())

// Add entry with different labels (should create new stream)
entry3 := entry
entry3.labels = model.LabelSet{"job": "different"}
entry3.Line = "different stream"
b.add(entry3)
assert.Equal(t, len("test log line")+len("another line")+len("different stream"), b.bytes)
assert.Equal(t, 2, b.streamCount())
assert.Equal(t, 3, b.entryCount())
}

func TestBatchSizeBytes(t *testing.T) {
tenantID := "test-tenant"
b := newBatch(tenantID)

entry := entry{
tenantID: tenantID,
labels: model.LabelSet{"job": "test"},
Entry: logproto.Entry{
Timestamp: time.Now(),
Line: "test",
},
}

assert.Equal(t, 0, b.sizeBytes())

expectedSize := len("test")
assert.Equal(t, expectedSize, b.sizeBytesAfter(entry))

b.add(entry)
assert.Equal(t, expectedSize, b.sizeBytes())
}

func TestBatchAge(t *testing.T) {
tenantID := "test-tenant"
b := newBatch(tenantID)

// Should be very recent
age := b.age()
assert.True(t, age < 100*time.Millisecond)

// Wait a bit and check again
time.Sleep(10 * time.Millisecond)
age2 := b.age()
assert.True(t, age2 > age)
}

func TestCreatePushRequest(t *testing.T) {
tenantID := "test-tenant"
timestamp := time.Now()

entry1 := entry{
tenantID: tenantID,
labels: model.LabelSet{"job": "test1"},
Entry: logproto.Entry{
Timestamp: timestamp,
Line: "line 1",
},
}

entry2 := entry{
tenantID: tenantID,
labels: model.LabelSet{"job": "test1"},
Entry: logproto.Entry{
Timestamp: timestamp.Add(time.Second),
Line: "line 2",
},
}

entry3 := entry{
tenantID: tenantID,
labels: model.LabelSet{"job": "test2"},
Entry: logproto.Entry{
Timestamp: timestamp.Add(2 * time.Second),
Line: "line 3",
},
}

b := newBatch(tenantID, entry1, entry2, entry3)

req, entriesCount := b.createPushRequest()
require.NotNil(t, req)
assert.Equal(t, 3, entriesCount)
assert.Equal(t, 2, len(req.Streams)) // Two different label sets

// Check streams
streamsByLabel := make(map[string]logproto.Stream)
for _, stream := range req.Streams {
streamsByLabel[stream.Labels] = stream
}

// Check first stream (job=test1)
stream1, exists := streamsByLabel[`{job="test1"}`]
require.True(t, exists)
assert.Equal(t, 2, len(stream1.Entries))
assert.Equal(t, "line 1", stream1.Entries[0].Line)
assert.Equal(t, "line 2", stream1.Entries[1].Line)

// Check second stream (job=test2)
stream2, exists := streamsByLabel[`{job="test2"}`]
require.True(t, exists)
assert.Equal(t, 1, len(stream2.Entries))
assert.Equal(t, "line 3", stream2.Entries[0].Line)
}

func TestBatchIsEmpty(t *testing.T) {
tenantID := "test-tenant"
b := newBatch(tenantID)
assert.True(t, b.isEmpty())

entry := entry{
tenantID: tenantID,
labels: model.LabelSet{"job": "test"},
Entry: logproto.Entry{
Timestamp: time.Now(),
Line: "test",
},
}

b.add(entry)
assert.False(t, b.isEmpty())
}
Loading