Skip to content

Commit 05ef9d6

Browse files
authored
Merge pull request #182 from weaveworks/127-flush-chunks-older-than-12hr
Flush chunks older than 12hrs
2 parents 3592422 + 006174b commit 05ef9d6

File tree

2 files changed

+119
-84
lines changed

2 files changed

+119
-84
lines changed

cmd/cortex/main.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ func main() {
108108
flag.DurationVar(&cfg.ingesterConfig.FlushCheckPeriod, "ingester.flush-period", 1*time.Minute, "Period with which to attempt to flush chunks.")
109109
flag.DurationVar(&cfg.ingesterConfig.RateUpdatePeriod, "ingester.rate-update-period", 15*time.Second, "Period with which to update the per-user ingestion rates.")
110110
flag.DurationVar(&cfg.ingesterConfig.MaxChunkIdle, "ingester.max-chunk-idle", 1*time.Hour, "Maximum chunk idle time before flushing.")
111+
flag.DurationVar(&cfg.ingesterConfig.MaxChunkAge, "ingester.max-chunk-age", 12*time.Hour, "Maximum chunk age time before flushing.")
111112
flag.IntVar(&cfg.ingesterConfig.ConcurrentFlushes, "ingester.concurrent-flushes", 25, "Number of concurrent goroutines flushing to dynamodb.")
112113
flag.IntVar(&cfg.numTokens, "ingester.num-tokens", 128, "Number of tokens for each ingester.")
113114
flag.IntVar(&cfg.ingesterConfig.GRPCListenPort, "ingester.grpc.listen-port", 9095, "gRPC server listen port.")

ingester/ingester.go

Lines changed: 118 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ type Ingester struct {
8888
type Config struct {
8989
FlushCheckPeriod time.Duration
9090
MaxChunkIdle time.Duration
91+
MaxChunkAge time.Duration
9192
RateUpdatePeriod time.Duration
9293
ConcurrentFlushes int
9394
GRPCListenPort int
@@ -199,29 +200,6 @@ func (i *Ingester) Ready() bool {
199200
return i.cfg.Ring.Ready()
200201
}
201202

202-
func (i *Ingester) getStateFor(ctx context.Context) (*userState, error) {
203-
userID, err := user.GetID(ctx)
204-
if err != nil {
205-
return nil, fmt.Errorf("no user id")
206-
}
207-
208-
i.userStateLock.Lock()
209-
defer i.userStateLock.Unlock()
210-
state, ok := i.userState[userID]
211-
if !ok {
212-
state = &userState{
213-
userID: userID,
214-
fpToSeries: newSeriesMap(),
215-
fpLocker: newFingerprintLocker(16),
216-
index: newInvertedIndex(),
217-
ingestedSamples: newEWMARate(0.2, i.cfg.RateUpdatePeriod),
218-
}
219-
state.mapper = newFPMapper(state.fpToSeries)
220-
i.userState[userID] = state
221-
}
222-
return state, nil
223-
}
224-
225203
// Push implements cortex.IngesterServer
226204
func (i *Ingester) Push(ctx context.Context, req *remote.WriteRequest) (*cortex.WriteResponse, error) {
227205
for _, sample := range util.FromWriteRequest(req) {
@@ -290,6 +268,29 @@ func (i *Ingester) append(ctx context.Context, sample *model.Sample) error {
290268
return err
291269
}
292270

271+
func (i *Ingester) getStateFor(ctx context.Context) (*userState, error) {
272+
userID, err := user.GetID(ctx)
273+
if err != nil {
274+
return nil, fmt.Errorf("no user id")
275+
}
276+
277+
i.userStateLock.Lock()
278+
defer i.userStateLock.Unlock()
279+
state, ok := i.userState[userID]
280+
if !ok {
281+
state = &userState{
282+
userID: userID,
283+
fpToSeries: newSeriesMap(),
284+
fpLocker: newFingerprintLocker(16),
285+
index: newInvertedIndex(),
286+
ingestedSamples: newEWMARate(0.2, i.cfg.RateUpdatePeriod),
287+
}
288+
state.mapper = newFPMapper(state.fpToSeries)
289+
i.userState[userID] = state
290+
}
291+
return state, nil
292+
}
293+
293294
func (u *userState) getOrCreateSeries(metric model.Metric) (model.Fingerprint, *memorySeries, error) {
294295
rawFP := metric.FastFingerprint()
295296
u.fpLocker.Lock(rawFP)
@@ -448,7 +449,7 @@ func (i *Ingester) Stop() {
448449

449450
func (i *Ingester) loop() {
450451
defer func() {
451-
i.flushAllUsers(true)
452+
i.sweepUsers(true)
452453

453454
// We close flush queue here to ensure the flushLoops pick
454455
// up all the flushes triggered by the last run
@@ -465,7 +466,7 @@ func (i *Ingester) loop() {
465466
for {
466467
select {
467468
case <-flushTick:
468-
i.flushAllUsers(false)
469+
i.sweepUsers(false)
469470
case <-rateUpdateTick:
470471
i.updateRates()
471472
case <-i.quit:
@@ -474,7 +475,8 @@ func (i *Ingester) loop() {
474475
}
475476
}
476477

477-
func (i *Ingester) flushAllUsers(immediate bool) {
478+
// sweepUsers periodically schedules series for flushing and garbage collects users with no series
479+
func (i *Ingester) sweepUsers(immediate bool) {
478480
if i.chunkStore == nil {
479481
return
480482
}
@@ -487,42 +489,68 @@ func (i *Ingester) flushAllUsers(immediate bool) {
487489
i.userStateLock.Unlock()
488490

489491
for id, state := range userState {
490-
i.flushUser(id, state, immediate)
491-
}
492-
}
493-
494-
func (i *Ingester) flushUser(userID string, userState *userState, immediate bool) {
495-
for pair := range userState.fpToSeries.iter() {
496-
i.flushSeries(userState, pair.fp, pair.series, immediate)
492+
for pair := range state.fpToSeries.iter() {
493+
state.fpLocker.Lock(pair.fp)
494+
i.sweepSeries(id, pair.fp, pair.series, immediate)
495+
state.fpLocker.Unlock(pair.fp)
496+
}
497497
}
498498

499-
// TODO: this is probably slow, and could be done in a better way.
500499
i.userStateLock.Lock()
501-
if userState.fpToSeries.length() == 0 {
502-
delete(i.userState, userID)
500+
for id, state := range userState {
501+
if state.fpToSeries.length() == 0 {
502+
delete(i.userState, id)
503+
}
503504
}
504505
i.userStateLock.Unlock()
505506
}
506507

507-
func (i *Ingester) flushSeries(u *userState, fp model.Fingerprint, series *memorySeries, immediate bool) {
508-
// Enqueue this series flushing if the oldest chunk is older than the threshold
509-
510-
u.fpLocker.Lock(fp)
508+
// sweepSeries schedules a series for flushing based on a set of criteria
509+
//
510+
// NB we don't close the head chunk here, as the series could wait in the queue
511+
// for some time, and we want to encourage chunks to be as full as possible.
512+
func (i *Ingester) sweepSeries(userID string, fp model.Fingerprint, series *memorySeries, immediate bool) {
511513
if len(series.chunkDescs) <= 0 {
512-
u.fpLocker.Unlock(fp)
513514
return
514515
}
515516

516517
lastTime := series.lastTime
517-
flush := immediate || len(series.chunkDescs) > 1 || model.Now().Sub(lastTime) > i.cfg.MaxChunkIdle
518-
u.fpLocker.Unlock(fp)
518+
flush := i.shouldFlushSeries(series, immediate)
519519

520520
if flush {
521521
flushQueueIndex := int(uint64(fp) % uint64(i.cfg.ConcurrentFlushes))
522-
i.flushQueues[flushQueueIndex].Enqueue(&flushOp{lastTime, u.userID, fp, immediate})
522+
i.flushQueues[flushQueueIndex].Enqueue(&flushOp{lastTime, userID, fp, immediate})
523523
}
524524
}
525525

526+
func (i *Ingester) shouldFlushSeries(series *memorySeries, immediate bool) bool {
527+
// Series should be scheduled for flushing if they have more than one chunk
528+
if immediate || len(series.chunkDescs) > 1 {
529+
return true
530+
}
531+
532+
// Or if the only existing chunk need flushing
533+
if len(series.chunkDescs) > 0 {
534+
return i.shouldFlushChunk(series.chunkDescs[0])
535+
}
536+
537+
return false
538+
}
539+
540+
func (i *Ingester) shouldFlushChunk(c *prom_chunk.Desc) bool {
541+
// Chunks should be flushed if their oldest entry is older than MaxChunkAge
542+
if model.Now().Sub(c.ChunkFirstTime) > i.cfg.MaxChunkAge {
543+
return true
544+
}
545+
546+
// Chunk should be flushed if their last entry is older then MaxChunkIdle
547+
if model.Now().Sub(c.ChunkLastTime) > i.cfg.MaxChunkIdle {
548+
return true
549+
}
550+
551+
return false
552+
}
553+
526554
func (i *Ingester) flushLoop(j int) {
527555
defer func() {
528556
log.Info("Ingester.flushLoop() exited")
@@ -536,55 +564,61 @@ func (i *Ingester) flushLoop(j int) {
536564
}
537565
op := o.(*flushOp)
538566

539-
// get the user
540-
i.userStateLock.Lock()
541-
userState, ok := i.userState[op.userID]
542-
i.userStateLock.Unlock()
543-
if !ok {
544-
continue
567+
if err := i.flushUserSeries(op.userID, op.fp, op.immediate); err != nil {
568+
log.Errorf("Failed to flush user: %v", err)
545569
}
546-
ctx := user.WithID(context.Background(), op.userID)
570+
}
571+
}
547572

548-
// Decide what chunks to flush
549-
series, ok := userState.fpToSeries.get(op.fp)
550-
if !ok {
551-
continue
552-
}
573+
func (i *Ingester) flushUserSeries(userID string, fp model.Fingerprint, immediate bool) error {
574+
i.userStateLock.Lock()
575+
userState, ok := i.userState[userID]
576+
i.userStateLock.Unlock()
577+
if !ok {
578+
return nil
579+
}
553580

554-
userState.fpLocker.Lock(op.fp)
581+
series, ok := userState.fpToSeries.get(fp)
582+
if !ok {
583+
return nil
584+
}
555585

556-
// Assume we're going to flush everything
557-
chunks := series.chunkDescs
586+
userState.fpLocker.Lock(fp)
587+
if !i.shouldFlushSeries(series, immediate) {
588+
userState.fpLocker.Unlock(fp)
589+
return nil
590+
}
558591

559-
// If the head chunk is old enough, close it
560-
if op.immediate || model.Now().Sub(series.lastTime) > i.cfg.MaxChunkIdle {
561-
series.closeHead()
562-
} else {
563-
chunks = chunks[:len(chunks)-1]
564-
}
565-
userState.fpLocker.Unlock(op.fp)
592+
// Assume we're going to flush everything, and maybe don't flush the head chunk if it doesn't need it.
593+
chunks := series.chunkDescs
594+
if immediate || (len(chunks) > 0 && i.shouldFlushChunk(chunks[0])) {
595+
series.closeHead()
596+
} else {
597+
chunks = chunks[:len(chunks)-1]
598+
}
599+
userState.fpLocker.Unlock(fp)
566600

567-
if len(chunks) == 0 {
568-
continue
569-
}
601+
if len(chunks) == 0 {
602+
return nil
603+
}
570604

571-
// flush the chunks without locking the series
572-
err := i.flushChunks(ctx, op.fp, series.metric, chunks)
573-
if err != nil {
574-
log.Errorf("Failed to flush series: %v", err)
575-
continue
576-
}
605+
// flush the chunks without locking the series, as we don't want to hold the series lock for the duration of the dynamo/s3 rpcs.
606+
ctx := user.WithID(context.Background(), userID)
607+
err := i.flushChunks(ctx, fp, series.metric, chunks)
608+
if err != nil {
609+
return err
610+
}
577611

578-
// now remove the chunks
579-
userState.fpLocker.Lock(op.fp)
580-
series.chunkDescs = series.chunkDescs[len(chunks):]
581-
i.memoryChunks.Sub(float64(len(chunks)))
582-
if len(series.chunkDescs) == 0 {
583-
userState.fpToSeries.del(op.fp)
584-
userState.index.delete(series.metric, op.fp)
585-
}
586-
userState.fpLocker.Unlock(op.fp)
612+
// now remove the chunks
613+
userState.fpLocker.Lock(fp)
614+
series.chunkDescs = series.chunkDescs[len(chunks):]
615+
i.memoryChunks.Sub(float64(len(chunks)))
616+
if len(series.chunkDescs) == 0 {
617+
userState.fpToSeries.del(fp)
618+
userState.index.delete(series.metric, fp)
587619
}
620+
userState.fpLocker.Unlock(fp)
621+
return nil
588622
}
589623

590624
func (i *Ingester) flushChunks(ctx context.Context, fp model.Fingerprint, metric model.Metric, chunkDescs []*prom_chunk.Desc) error {

0 commit comments

Comments
 (0)