Skip to content

Commit 28e349b

Browse files
grpc client
1 parent 526b43e commit 28e349b

File tree

8 files changed

+1416
-80
lines changed

8 files changed

+1416
-80
lines changed

grpc/batch.go

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
package grpc
2+
3+
import (
4+
"time"
5+
6+
"github.com/netobserv/loki-client-go/pkg/logproto"
7+
"github.com/prometheus/common/model"
8+
)
9+
10+
// entry represents a log entry with tenant and label information
11+
type entry struct {
12+
tenantID string
13+
labels model.LabelSet
14+
logproto.Entry
15+
}
16+
17+
// batch holds pending log streams waiting to be sent to Loki via GRPC.
18+
// Similar to HTTP batch but optimized for GRPC operations.
19+
type batch struct {
20+
streams map[string]*logproto.Stream
21+
bytes int
22+
createdAt time.Time
23+
tenantID string // GRPC batches are per-tenant for connection management
24+
}
25+
26+
// newBatch creates a new batch for a specific tenant
27+
func newBatch(tenantID string, entries ...entry) *batch {
28+
b := &batch{
29+
streams: map[string]*logproto.Stream{},
30+
bytes: 0,
31+
createdAt: time.Now(),
32+
tenantID: tenantID,
33+
}
34+
35+
// Add entries to the batch
36+
for _, entry := range entries {
37+
b.add(entry)
38+
}
39+
40+
return b
41+
}
42+
43+
// add an entry to the batch
44+
func (b *batch) add(entry entry) {
45+
b.bytes += len(entry.Line)
46+
47+
// Append the entry to an already existing stream (if any)
48+
labels := entry.labels.String()
49+
if stream, ok := b.streams[labels]; ok {
50+
stream.Entries = append(stream.Entries, entry.Entry)
51+
return
52+
}
53+
54+
// Add the entry as a new stream
55+
b.streams[labels] = &logproto.Stream{
56+
Labels: labels,
57+
Entries: []logproto.Entry{entry.Entry},
58+
}
59+
}
60+
61+
// sizeBytes returns the current batch size in bytes
62+
func (b *batch) sizeBytes() int {
63+
return b.bytes
64+
}
65+
66+
// sizeBytesAfter returns the size of the batch after the input entry
67+
// will be added to the batch itself
68+
func (b *batch) sizeBytesAfter(entry entry) int {
69+
return b.bytes + len(entry.Line)
70+
}
71+
72+
// age of the batch since its creation
73+
func (b *batch) age() time.Duration {
74+
return time.Since(b.createdAt)
75+
}
76+
77+
// createPushRequest creates a push request from the batch
78+
func (b *batch) createPushRequest() (*logproto.PushRequest, int) {
79+
req := &logproto.PushRequest{
80+
Streams: make([]logproto.Stream, 0, len(b.streams)),
81+
}
82+
83+
entriesCount := 0
84+
for _, stream := range b.streams {
85+
req.Streams = append(req.Streams, *stream)
86+
entriesCount += len(stream.Entries)
87+
}
88+
89+
return req, entriesCount
90+
}
91+
92+
// isEmpty returns true if the batch has no entries
93+
func (b *batch) isEmpty() bool {
94+
return len(b.streams) == 0
95+
}
96+
97+
// streamCount returns the number of streams in the batch
98+
func (b *batch) streamCount() int {
99+
return len(b.streams)
100+
}
101+
102+
// entryCount returns the total number of entries across all streams
103+
func (b *batch) entryCount() int {
104+
count := 0
105+
for _, stream := range b.streams {
106+
count += len(stream.Entries)
107+
}
108+
return count
109+
}

grpc/batch_test.go

Lines changed: 198 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,198 @@
1+
package grpc
2+
3+
import (
4+
"testing"
5+
"time"
6+
7+
"github.com/netobserv/loki-client-go/pkg/logproto"
8+
"github.com/prometheus/common/model"
9+
"github.com/stretchr/testify/assert"
10+
"github.com/stretchr/testify/require"
11+
)
12+
13+
func TestNewBatch(t *testing.T) {
14+
tenantID := "test-tenant"
15+
16+
// Test empty batch
17+
b := newBatch(tenantID)
18+
assert.Equal(t, tenantID, b.tenantID)
19+
assert.Equal(t, 0, b.bytes)
20+
assert.True(t, b.isEmpty())
21+
assert.Equal(t, 0, b.streamCount())
22+
assert.Equal(t, 0, b.entryCount())
23+
24+
// Test batch with initial entries
25+
entry1 := entry{
26+
tenantID: tenantID,
27+
labels: model.LabelSet{"job": "test"},
28+
Entry: logproto.Entry{
29+
Timestamp: time.Now(),
30+
Line: "test log line 1",
31+
},
32+
}
33+
34+
entry2 := entry{
35+
tenantID: tenantID,
36+
labels: model.LabelSet{"job": "test"},
37+
Entry: logproto.Entry{
38+
Timestamp: time.Now(),
39+
Line: "test log line 2",
40+
},
41+
}
42+
43+
b2 := newBatch(tenantID, entry1, entry2)
44+
assert.Equal(t, tenantID, b2.tenantID)
45+
assert.Equal(t, len("test log line 1")+len("test log line 2"), b2.bytes)
46+
assert.False(t, b2.isEmpty())
47+
assert.Equal(t, 1, b2.streamCount()) // Same labels, so same stream
48+
assert.Equal(t, 2, b2.entryCount())
49+
}
50+
51+
func TestBatchAdd(t *testing.T) {
52+
tenantID := "test-tenant"
53+
b := newBatch(tenantID)
54+
55+
entry := entry{
56+
tenantID: tenantID,
57+
labels: model.LabelSet{"job": "test", "instance": "localhost"},
58+
Entry: logproto.Entry{
59+
Timestamp: time.Now(),
60+
Line: "test log line",
61+
},
62+
}
63+
64+
// Add first entry
65+
b.add(entry)
66+
assert.Equal(t, len("test log line"), b.bytes)
67+
assert.Equal(t, 1, b.streamCount())
68+
assert.Equal(t, 1, b.entryCount())
69+
70+
// Add entry with same labels (should go to same stream)
71+
entry2 := entry
72+
entry2.Line = "another line"
73+
b.add(entry2)
74+
assert.Equal(t, len("test log line")+len("another line"), b.bytes)
75+
assert.Equal(t, 1, b.streamCount())
76+
assert.Equal(t, 2, b.entryCount())
77+
78+
// Add entry with different labels (should create new stream)
79+
entry3 := entry
80+
entry3.labels = model.LabelSet{"job": "different"}
81+
entry3.Line = "different stream"
82+
b.add(entry3)
83+
assert.Equal(t, len("test log line")+len("another line")+len("different stream"), b.bytes)
84+
assert.Equal(t, 2, b.streamCount())
85+
assert.Equal(t, 3, b.entryCount())
86+
}
87+
88+
func TestBatchSizeBytes(t *testing.T) {
89+
tenantID := "test-tenant"
90+
b := newBatch(tenantID)
91+
92+
entry := entry{
93+
tenantID: tenantID,
94+
labels: model.LabelSet{"job": "test"},
95+
Entry: logproto.Entry{
96+
Timestamp: time.Now(),
97+
Line: "test",
98+
},
99+
}
100+
101+
assert.Equal(t, 0, b.sizeBytes())
102+
103+
expectedSize := len("test")
104+
assert.Equal(t, expectedSize, b.sizeBytesAfter(entry))
105+
106+
b.add(entry)
107+
assert.Equal(t, expectedSize, b.sizeBytes())
108+
}
109+
110+
func TestBatchAge(t *testing.T) {
111+
tenantID := "test-tenant"
112+
b := newBatch(tenantID)
113+
114+
// Should be very recent
115+
age := b.age()
116+
assert.True(t, age < 100*time.Millisecond)
117+
118+
// Wait a bit and check again
119+
time.Sleep(10 * time.Millisecond)
120+
age2 := b.age()
121+
assert.True(t, age2 > age)
122+
}
123+
124+
func TestCreatePushRequest(t *testing.T) {
125+
tenantID := "test-tenant"
126+
timestamp := time.Now()
127+
128+
entry1 := entry{
129+
tenantID: tenantID,
130+
labels: model.LabelSet{"job": "test1"},
131+
Entry: logproto.Entry{
132+
Timestamp: timestamp,
133+
Line: "line 1",
134+
},
135+
}
136+
137+
entry2 := entry{
138+
tenantID: tenantID,
139+
labels: model.LabelSet{"job": "test1"},
140+
Entry: logproto.Entry{
141+
Timestamp: timestamp.Add(time.Second),
142+
Line: "line 2",
143+
},
144+
}
145+
146+
entry3 := entry{
147+
tenantID: tenantID,
148+
labels: model.LabelSet{"job": "test2"},
149+
Entry: logproto.Entry{
150+
Timestamp: timestamp.Add(2 * time.Second),
151+
Line: "line 3",
152+
},
153+
}
154+
155+
b := newBatch(tenantID, entry1, entry2, entry3)
156+
157+
req, entriesCount := b.createPushRequest()
158+
require.NotNil(t, req)
159+
assert.Equal(t, 3, entriesCount)
160+
assert.Equal(t, 2, len(req.Streams)) // Two different label sets
161+
162+
// Check streams
163+
streamsByLabel := make(map[string]logproto.Stream)
164+
for _, stream := range req.Streams {
165+
streamsByLabel[stream.Labels] = stream
166+
}
167+
168+
// Check first stream (job=test1)
169+
stream1, exists := streamsByLabel[`{job="test1"}`]
170+
require.True(t, exists)
171+
assert.Equal(t, 2, len(stream1.Entries))
172+
assert.Equal(t, "line 1", stream1.Entries[0].Line)
173+
assert.Equal(t, "line 2", stream1.Entries[1].Line)
174+
175+
// Check second stream (job=test2)
176+
stream2, exists := streamsByLabel[`{job="test2"}`]
177+
require.True(t, exists)
178+
assert.Equal(t, 1, len(stream2.Entries))
179+
assert.Equal(t, "line 3", stream2.Entries[0].Line)
180+
}
181+
182+
func TestBatchIsEmpty(t *testing.T) {
183+
tenantID := "test-tenant"
184+
b := newBatch(tenantID)
185+
assert.True(t, b.isEmpty())
186+
187+
entry := entry{
188+
tenantID: tenantID,
189+
labels: model.LabelSet{"job": "test"},
190+
Entry: logproto.Entry{
191+
Timestamp: time.Now(),
192+
Line: "test",
193+
},
194+
}
195+
196+
b.add(entry)
197+
assert.False(t, b.isEmpty())
198+
}

0 commit comments

Comments
 (0)