Skip to content

Commit 8a3ae22

Browse files
fix: fix a bug where AppendRequest with no entries triggers flush (#13672)
1 parent cfa3479 commit 8a3ae22

File tree

3 files changed

+37
-2
lines changed

3 files changed

+37
-2
lines changed

pkg/storage/wal/manager.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ func (m *Manager) Append(r AppendRequest) (*AppendResult, error) {
156156
s.w.Append(r.TenantID, r.LabelsStr, r.Labels, r.Entries, m.clock.Now())
157157
// If the segment exceeded the maximum age or the maximum size, move s to
158158
// the closed list to be flushed.
159-
if m.clock.Since(s.w.firstAppend) >= m.cfg.MaxAge || s.w.InputSize() >= m.cfg.MaxSegmentSize {
159+
if s.w.Age(m.clock.Now()) >= m.cfg.MaxAge || s.w.InputSize() >= m.cfg.MaxSegmentSize {
160160
m.move(el, s)
161161
}
162162
return s.r, nil
@@ -224,7 +224,7 @@ func (m *Manager) move(el *list.Element, s *segment) {
224224
func (m *Manager) moveFrontIfExpired() bool {
225225
if el := m.available.Front(); el != nil {
226226
s := el.Value.(*segment)
227-
if !s.w.firstAppend.IsZero() && m.clock.Since(s.w.firstAppend) >= m.cfg.MaxAge {
227+
if s.w.Age(m.clock.Now()) >= m.cfg.MaxAge {
228228
m.move(el, s)
229229
return true
230230
}

pkg/storage/wal/manager_test.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,38 @@ func TestManager_Append(t *testing.T) {
5454
require.NoError(t, res.Err())
5555
}
5656

57+
func TestManager_AppendNoEntries(t *testing.T) {
58+
m, err := NewManager(Config{
59+
MaxAge: 30 * time.Second,
60+
MaxSegments: 1,
61+
MaxSegmentSize: 1024, // 1KB
62+
}, NewManagerMetrics(nil))
63+
require.NoError(t, err)
64+
65+
// Append no entries.
66+
lbs := labels.Labels{{Name: "a", Value: "b"}}
67+
res, err := m.Append(AppendRequest{
68+
TenantID: "1",
69+
Labels: lbs,
70+
LabelsStr: lbs.String(),
71+
Entries: []*logproto.Entry{},
72+
})
73+
require.NoError(t, err)
74+
require.NotNil(t, res)
75+
76+
// The data hasn't been flushed, so reading from Done() should block.
77+
select {
78+
case <-res.Done():
79+
t.Fatal("unexpected closed Done()")
80+
default:
81+
}
82+
83+
// The segment that was just appended to has neither reached the maximum
84+
// age nor maximum size to be flushed.
85+
require.Equal(t, 1, m.available.Len())
86+
require.Equal(t, 0, m.pending.Len())
87+
}
88+
5789
func TestManager_AppendFailed(t *testing.T) {
5890
m, err := NewManager(Config{
5991
MaxAge: 30 * time.Second,

pkg/storage/wal/segment.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,9 @@ func NewWalSegmentWriter() (*SegmentWriter, error) {
140140

141141
// Age returns the age of the segment.
142142
func (b *SegmentWriter) Age(now time.Time) time.Duration {
143+
if b.firstAppend.IsZero() {
144+
return 0
145+
}
143146
return now.Sub(b.firstAppend)
144147
}
145148

0 commit comments

Comments
 (0)