Skip to content

Commit 6f7d651

Browse files
committed
#55 add max_lines_in_buffer option for input config
1 parent 879e3bc commit 6f7d651

File tree

7 files changed

+87
-50
lines changed

7 files changed

+87
-50
lines changed

config/v2/configV2.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ type InputConfig struct {
6363
Readall bool `yaml:",omitempty"`
6464
PollIntervalSeconds string `yaml:"poll_interval_seconds,omitempty"` // TODO: Use time.Duration directly
6565
PollInterval time.Duration `yaml:"-"` // parsed version of PollIntervalSeconds
66+
MaxLinesInBuffer int `yaml:"max_lines_in_buffer,omitempty"`
6667
}
6768

6869
type GrokConfig struct {

exporter/bufferLoadMetric.go

Lines changed: 56 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,14 @@ type bufferLoadMetric struct {
2828
mutex *sync.Cond
2929
tick *time.Ticker
3030
log logrus.FieldLogger
31+
lineLimitSet bool
3132
}
3233

33-
func NewBufferLoadMetric(log logrus.FieldLogger) *bufferLoadMetric {
34+
func NewBufferLoadMetric(log logrus.FieldLogger, lineLimitSet bool) *bufferLoadMetric {
3435
m := &bufferLoadMetric{
35-
mutex: sync.NewCond(&sync.Mutex{}),
36-
log: log,
36+
mutex: sync.NewCond(&sync.Mutex{}),
37+
log: log,
38+
lineLimitSet: lineLimitSet,
3739
}
3840
return m
3941
}
@@ -56,31 +58,33 @@ func (m *bufferLoadMetric) start(ticker *time.Ticker, tickProcessed chan struct{
5658
go func() {
5759
var ticksSinceLastLog = 0
5860
for range m.tick.C {
59-
m.mutex.L.Lock()
60-
61-
ticksSinceLastLog++
62-
if ticksSinceLastLog >= 4 { // every minute
63-
if m.min60s > 1000 {
64-
m.log.Warnf("Log lines are written faster than grok_exporter processes them. In the last minute there were constantly more than %d log lines in the buffer waiting to be processed. Check the built-in grok_exporter_lines_processing_time_microseconds_total metric to learn which metric takes most of the processing time.", m.min60s)
61+
func() {
62+
m.mutex.L.Lock()
63+
defer m.mutex.L.Unlock()
64+
65+
ticksSinceLastLog++
66+
if ticksSinceLastLog >= 4 { // every minute
67+
if m.min60s > 1000 && !m.lineLimitSet {
68+
m.log.Warnf("Log lines are written faster than grok_exporter processes them. In the last minute there were constantly more than %d log lines in the buffer waiting to be processed. Check the built-in grok_exporter_lines_processing_time_microseconds_total metric to learn which metric takes most of the processing time.", m.min60s)
69+
}
70+
ticksSinceLastLog = 0
6571
}
66-
ticksSinceLastLog = 0
67-
}
6872

69-
m.bufferLoad.With(minLabel).Set(float64(m.min60s))
73+
m.bufferLoad.With(minLabel).Set(float64(m.min60s))
7074

71-
m.min60s = m.min45s
72-
m.min45s = m.min30s
73-
m.min30s = m.min15s
74-
m.min15s = m.cur
75+
m.min60s = m.min45s
76+
m.min45s = m.min30s
77+
m.min30s = m.min15s
78+
m.min15s = m.cur
7579

76-
m.bufferLoad.With(maxLabel).Set(float64(m.max60s))
80+
m.bufferLoad.With(maxLabel).Set(float64(m.max60s))
7781

78-
m.max60s = m.max45s
79-
m.max45s = m.max30s
80-
m.max30s = m.max15s
81-
m.max15s = m.cur
82+
m.max60s = m.max45s
83+
m.max45s = m.max30s
84+
m.max30s = m.max15s
85+
m.max15s = m.cur
8286

83-
m.mutex.L.Unlock()
87+
}()
8488

8589
if tickProcessed != nil {
8690
tickProcessed <- struct{}{}
@@ -96,25 +100,19 @@ func (m *bufferLoadMetric) Stop() {
96100

97101
func (m *bufferLoadMetric) Inc() {
98102
m.mutex.L.Lock()
103+
defer m.mutex.L.Unlock()
99104
m.cur++
100-
if m.max15s < m.cur {
101-
m.max15s = m.cur
102-
}
103-
if m.max30s < m.cur {
104-
m.max30s = m.cur
105-
}
106-
if m.max45s < m.cur {
107-
m.max45s = m.cur
108-
}
109-
if m.max60s < m.cur {
110-
m.max60s = m.cur
111-
}
112-
m.mutex.L.Unlock()
105+
m.updateMax()
113106
}
114107

115108
func (m *bufferLoadMetric) Dec() {
116109
m.mutex.L.Lock()
110+
defer m.mutex.L.Unlock()
117111
m.cur--
112+
m.updateMin()
113+
}
114+
115+
func (m *bufferLoadMetric) updateMin() {
118116
if m.min15s > m.cur {
119117
m.min15s = m.cur
120118
}
@@ -127,5 +125,27 @@ func (m *bufferLoadMetric) Dec() {
127125
if m.min60s > m.cur {
128126
m.min60s = m.cur
129127
}
130-
m.mutex.L.Unlock()
128+
}
129+
130+
func (m *bufferLoadMetric) updateMax() {
131+
if m.max15s < m.cur {
132+
m.max15s = m.cur
133+
}
134+
if m.max30s < m.cur {
135+
m.max30s = m.cur
136+
}
137+
if m.max45s < m.cur {
138+
m.max45s = m.cur
139+
}
140+
if m.max60s < m.cur {
141+
m.max60s = m.cur
142+
}
143+
}
144+
145+
func (m *bufferLoadMetric) Set(value int64) {
146+
m.mutex.L.Lock()
147+
defer m.mutex.L.Unlock()
148+
m.cur = value
149+
m.updateMin()
150+
m.updateMax()
131151
}

exporter/bufferLoadMetric_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import (
77
)
88

99
func TestBufferLoadMetric(t *testing.T) {
10-
m := NewBufferLoadMetric(logrus.New())
10+
m := NewBufferLoadMetric(logrus.New(), false)
1111
c := make(chan time.Time)
1212
tick := &time.Ticker{
1313
C: c,

grok_exporter.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -291,5 +291,6 @@ func startTailer(cfg *v2.Config) (fswatcher.FileTailer, error) {
291291
default:
292292
return nil, fmt.Errorf("Config error: Input type '%v' unknown.", cfg.Input.Type)
293293
}
294-
return tailer.BufferedTailerWithMetrics(tail, exporter.NewBufferLoadMetric(logger)), nil
294+
bufferLoadMetric := exporter.NewBufferLoadMetric(logger, cfg.Input.MaxLinesInBuffer > 0)
295+
return tailer.BufferedTailerWithMetrics(tail, bufferLoadMetric, logger, cfg.Input.MaxLinesInBuffer), nil
295296
}

tailer/bufferedTailer.go

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ package tailer
1616

1717
import (
1818
"github.com/fstab/grok_exporter/tailer/fswatcher"
19+
"github.com/sirupsen/logrus"
1920
)
2021

2122
// implements fswatcher.FileTailer
@@ -37,7 +38,7 @@ func (b *bufferedTailer) Close() {
3738
}
3839

3940
func BufferedTailer(orig fswatcher.FileTailer) fswatcher.FileTailer {
40-
return BufferedTailerWithMetrics(orig, &noopMetric{})
41+
return BufferedTailerWithMetrics(orig, &noopMetric{}, logrus.New(), 0)
4142
}
4243

4344
// Wrapper around a tailer that consumes the lines channel quickly.
@@ -99,7 +100,7 @@ func BufferedTailer(orig fswatcher.FileTailer) fswatcher.FileTailer {
99100
//
100101
// To minimize the risk, use the buffered tailer to make sure file system events are handled
101102
// as quickly as possible without waiting for the grok patterns to be processed.
102-
func BufferedTailerWithMetrics(orig fswatcher.FileTailer, bufferLoadMetric BufferLoadMetric) fswatcher.FileTailer {
103+
func BufferedTailerWithMetrics(orig fswatcher.FileTailer, bufferLoadMetric BufferLoadMetric, log logrus.FieldLogger, maxLinesInBuffer int) fswatcher.FileTailer {
103104
buffer := NewLineBuffer()
104105
out := make(chan *fswatcher.Line)
105106

@@ -109,6 +110,11 @@ func BufferedTailerWithMetrics(orig fswatcher.FileTailer, bufferLoadMetric Buffe
109110
for {
110111
line, ok := <-orig.Lines()
111112
if ok {
113+
if maxLinesInBuffer > 0 && buffer.Len() > maxLinesInBuffer-1 {
114+
log.Warnf("Line buffer reached limit of %v lines. Dropping lines in buffer.", maxLinesInBuffer)
115+
buffer.Clear()
116+
bufferLoadMetric.Set(0)
117+
}
112118
buffer.Push(line)
113119
bufferLoadMetric.Inc()
114120
} else {
@@ -140,14 +146,16 @@ func BufferedTailerWithMetrics(orig fswatcher.FileTailer, bufferLoadMetric Buffe
140146

141147
type BufferLoadMetric interface {
142148
Start()
143-
Inc() // put a log line into the buffer
144-
Dec() // take a log line from the buffer
149+
Inc() // put a log line into the buffer
150+
Dec() // take a log line from the buffer
151+
Set(value int64) // set the current number of lines in the buffer
145152
Stop()
146153
}
147154

148155
type noopMetric struct{}
149156

150-
func (m *noopMetric) Start() {}
151-
func (m *noopMetric) Inc() {}
152-
func (m *noopMetric) Dec() {}
153-
func (m *noopMetric) Stop() {}
157+
func (m *noopMetric) Start() {}
158+
func (m *noopMetric) Inc() {}
159+
func (m *noopMetric) Dec() {}
160+
func (m *noopMetric) Set(value int64) {}
161+
func (m *noopMetric) Stop() {}

tailer/bufferedTailer_test.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ package tailer
1717
import (
1818
"fmt"
1919
"github.com/fstab/grok_exporter/tailer/fswatcher"
20+
"github.com/sirupsen/logrus"
2021
"math/rand"
2122
"sync"
2223
"testing"
@@ -25,6 +26,8 @@ import (
2526

2627
const nTestLines = 10000
2728

29+
var log = logrus.New()
30+
2831
type sourceTailer struct {
2932
lines chan *fswatcher.Line
3033
}
@@ -45,7 +48,7 @@ func (tail *sourceTailer) Close() {
4548
func TestLineBufferSequential_withMetrics(t *testing.T) {
4649
src := &sourceTailer{lines: make(chan *fswatcher.Line)}
4750
metric := &peakLoadMetric{}
48-
buffered := BufferedTailerWithMetrics(src, metric)
51+
buffered := BufferedTailerWithMetrics(src, metric, log, 0)
4952
for i := 1; i <= nTestLines; i++ {
5053
src.lines <- &fswatcher.Line{Line: fmt.Sprintf("This is line number %v.", i)}
5154
}
@@ -79,7 +82,7 @@ func TestLineBufferSequential_withMetrics(t *testing.T) {
7982
func TestLineBufferParallel_withMetrics(t *testing.T) {
8083
src := &sourceTailer{lines: make(chan *fswatcher.Line)}
8184
metric := &peakLoadMetric{}
82-
buffered := BufferedTailerWithMetrics(src, metric)
85+
buffered := BufferedTailerWithMetrics(src, metric, log, 0)
8386
var wg sync.WaitGroup
8487
go func() {
8588
start := time.Now()
@@ -150,6 +153,10 @@ func (m *peakLoadMetric) Dec() {
150153
m.currentLoad--
151154
}
152155

156+
func (m *peakLoadMetric) Set(value int64) {
157+
m.currentLoad = value
158+
}
159+
153160
func (m *peakLoadMetric) Stop() {
154161
m.stopCalled = true
155162
}

tailer/lineBuffer.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import (
44
"container/list"
55
"github.com/fstab/grok_exporter/tailer/fswatcher"
66
"io"
7-
"log"
7+
logFatal "log"
88
"sync"
99
)
1010

@@ -56,7 +56,7 @@ func (b *lineBufferImpl) BlockingPop() *fswatcher.Line {
5656
return line
5757
default:
5858
// this cannot happen
59-
log.Fatal("unexpected type in tailer b.buffer")
59+
logFatal.Fatal("unexpected type in tailer b.buffer")
6060
}
6161
}
6262
}

0 commit comments

Comments
 (0)