@@ -55,58 +55,89 @@ func (noop) Log(*Record) error {
55
55
// Stop any background WAL processes.
56
56
func (noop ) Stop () {}
57
57
58
- type wrapper struct {
58
+ type walWrapper struct {
59
59
cfg WALConfig
60
60
ingester * Ingester
61
61
quit chan struct {}
62
62
wait sync.WaitGroup
63
63
64
64
lastWalSegment int
65
65
wal * wal.WAL
66
+
67
+ // Checkpoint metrics.
68
+ checkpointDeleteFail prometheus.Counter
69
+ checkpointDeleteTotal prometheus.Counter
70
+ checkpointCreationFail prometheus.Counter
71
+ checkpointCreationTotal prometheus.Counter
66
72
}
67
73
68
74
func newWAL (cfg WALConfig , ingester * Ingester ) (WAL , error ) {
69
75
if ! cfg .enabled {
70
76
return & noop {}, nil
71
77
}
72
78
73
- var samplesRegistry prometheus.Registerer
79
+ var walRegistry prometheus.Registerer
74
80
if cfg .metricsRegisterer != nil {
75
- samplesRegistry = prometheus .WrapRegistererWith (prometheus.Labels {"kind" : "samples " }, cfg .metricsRegisterer )
81
+ walRegistry = prometheus .WrapRegistererWith (prometheus.Labels {"kind" : "wal " }, cfg .metricsRegisterer )
76
82
}
77
- samples , err := wal .New (util .Logger , samplesRegistry , path .Join (cfg .dir , "samples " ), true )
83
+ tsdbWAL , err := wal .New (util .Logger , walRegistry , path .Join (cfg .dir , "wal " ), true )
78
84
if err != nil {
79
85
return nil , err
80
86
}
81
87
82
- w := & wrapper {
88
+ w := & walWrapper {
83
89
cfg : cfg ,
84
90
ingester : ingester ,
85
91
quit : make (chan struct {}),
86
- wal : samples ,
92
+ wal : tsdbWAL ,
93
+ }
94
+
95
+ w .checkpointDeleteFail = prometheus .NewCounter (prometheus.CounterOpts {
96
+ Name : "ingester_checkpoint_deletions_failed_total" ,
97
+ Help : "Total number of checkpoint deletions that failed." ,
98
+ })
99
+ w .checkpointDeleteTotal = prometheus .NewCounter (prometheus.CounterOpts {
100
+ Name : "ingester_checkpoint_deletions_total" ,
101
+ Help : "Total number of checkpoint deletions attempted." ,
102
+ })
103
+ w .checkpointCreationFail = prometheus .NewCounter (prometheus.CounterOpts {
104
+ Name : "ingester_checkpoint_creations_failed_total" ,
105
+ Help : "Total number of checkpoint creations that failed." ,
106
+ })
107
+ w .checkpointCreationTotal = prometheus .NewCounter (prometheus.CounterOpts {
108
+ Name : "ingester_checkpoint_creations_total" ,
109
+ Help : "Total number of checkpoint creations attempted." ,
110
+ })
111
+ if cfg .metricsRegisterer != nil {
112
+ cfg .metricsRegisterer .MustRegister (
113
+ w .checkpointDeleteFail ,
114
+ w .checkpointDeleteTotal ,
115
+ w .checkpointCreationFail ,
116
+ w .checkpointCreationTotal ,
117
+ )
87
118
}
88
119
89
120
w .wait .Add (1 )
90
121
go w .run ()
91
122
return w , nil
92
123
}
93
124
94
- func (w * wrapper ) Stop () {
125
+ func (w * walWrapper ) Stop () {
95
126
close (w .quit )
96
127
w .wait .Wait ()
97
128
98
129
w .wal .Close ()
99
130
}
100
131
101
- func (w * wrapper ) Log (record * Record ) error {
132
+ func (w * walWrapper ) Log (record * Record ) error {
102
133
buf , err := proto .Marshal (record )
103
134
if err != nil {
104
135
return err
105
136
}
106
137
return w .wal .Log (buf )
107
138
}
108
139
109
- func (w * wrapper ) run () {
140
+ func (w * walWrapper ) run () {
110
141
defer w .wait .Done ()
111
142
112
143
for ! w .isStopped () {
@@ -123,7 +154,7 @@ func (w *wrapper) run() {
123
154
}
124
155
}
125
156
126
- func (w * wrapper ) isStopped () bool {
157
+ func (w * walWrapper ) isStopped () bool {
127
158
select {
128
159
case <- w .quit :
129
160
return true
@@ -134,7 +165,13 @@ func (w *wrapper) isStopped() bool {
134
165
135
166
const checkpointPrefix = "checkpoint."
136
167
137
- func (w * wrapper ) checkpoint () error {
168
+ func (w * walWrapper ) checkpoint () (err error ) {
169
+ w .checkpointCreationTotal .Inc ()
170
+ defer func () {
171
+ if err != nil {
172
+ w .checkpointCreationFail .Inc ()
173
+ }
174
+ }()
138
175
_ , last , err := w .lastCheckpoint ()
139
176
if err != nil {
140
177
return err
@@ -201,7 +238,7 @@ func (w *wrapper) checkpoint() error {
201
238
202
239
// lastCheckpoint returns the directory name and index of the most recent checkpoint.
203
240
// If dir does not contain any checkpoints, -1 is returned as index.
204
- func (w * wrapper ) lastCheckpoint () (string , int , error ) {
241
+ func (w * walWrapper ) lastCheckpoint () (string , int , error ) {
205
242
files , err := ioutil .ReadDir (w .wal .Dir ())
206
243
if err != nil {
207
244
return "" , - 1 , err
@@ -226,7 +263,14 @@ func (w *wrapper) lastCheckpoint() (string, int, error) {
226
263
}
227
264
228
265
// deleteCheckpoints deletes all checkpoints in a directory below a given index.
229
- func (w * wrapper ) deleteCheckpoints (maxIndex int ) error {
266
+ func (w * walWrapper ) deleteCheckpoints (maxIndex int ) (err error ) {
267
+ w .checkpointDeleteTotal .Inc ()
268
+ defer func () {
269
+ if err != nil {
270
+ w .checkpointDeleteFail .Inc ()
271
+ }
272
+ }()
273
+
230
274
var errs tsdb_errors.MultiError
231
275
232
276
files , err := ioutil .ReadDir (w .wal .Dir ())
@@ -248,7 +292,7 @@ func (w *wrapper) deleteCheckpoints(maxIndex int) error {
248
292
return errs .Err ()
249
293
}
250
294
251
- func (w * wrapper ) checkpointSeries (cp * wal.WAL , userID string , fp model.Fingerprint , series * memorySeries ) error {
295
+ func (w * walWrapper ) checkpointSeries (cp * wal.WAL , userID string , fp model.Fingerprint , series * memorySeries ) error {
252
296
wireChunks , err := toWireChunks (series .chunkDescs )
253
297
if err != nil {
254
298
return err
@@ -268,7 +312,7 @@ func (w *wrapper) checkpointSeries(cp *wal.WAL, userID string, fp model.Fingerpr
268
312
}
269
313
270
314
// truncateSamples removed the wal from before the checkpoint.
271
- func (w * wrapper ) truncateSamples () error {
315
+ func (w * walWrapper ) truncateSamples () error {
272
316
_ , last , err := w .wal .Segments ()
273
317
if err != nil {
274
318
return err
0 commit comments