Skip to content

Make MinChunkLength settable per-user #1620

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Aug 27, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 21 additions & 19 deletions pkg/ingester/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,25 +266,27 @@ func (i *Ingester) flushUserSeries(flushQueueIndex int, userID string, fp model.
chunks = chunks[:len(chunks)-1]
}

if reason == reasonIdle && series.headChunkClosed && i.cfg.MinChunkLength > 0 {
chunkLength := 0
for _, c := range chunks {
chunkLength += c.C.Len()
}
if chunkLength < i.cfg.MinChunkLength {
userState.removeSeries(fp, series.metric)
memoryChunks.Sub(float64(len(chunks)))
droppedChunks.Add(float64(len(chunks)))
util.Event().Log(
"msg", "dropped chunks",
"userID", userID,
"numChunks", len(chunks),
"chunkLength", chunkLength,
"fp", fp,
"series", series.metric,
"queue", flushQueueIndex,
)
chunks = nil
if reason == reasonIdle && series.headChunkClosed {
if minChunkLength := i.limits.MinChunkLength(userID); minChunkLength > 0 {
chunkLength := 0
for _, c := range chunks {
chunkLength += c.C.Len()
}
if chunkLength < minChunkLength {
userState.removeSeries(fp, series.metric)
memoryChunks.Sub(float64(len(chunks)))
droppedChunks.Add(float64(len(chunks)))
util.Event().Log(
"msg", "dropped chunks",
"userID", userID,
"numChunks", len(chunks),
"chunkLength", chunkLength,
"fp", fp,
"series", series.metric,
"queue", flushQueueIndex,
)
chunks = nil
}
}
}

Expand Down
2 changes: 0 additions & 2 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ type Config struct {
ChunkAgeJitter time.Duration
ConcurrentFlushes int
SpreadFlushes bool
MinChunkLength int

RateUpdatePeriod time.Duration

Expand All @@ -137,7 +136,6 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&cfg.MaxChunkAge, "ingester.max-chunk-age", 12*time.Hour, "Maximum chunk age before flushing.")
f.DurationVar(&cfg.ChunkAgeJitter, "ingester.chunk-age-jitter", 20*time.Minute, "Range of time to subtract from MaxChunkAge to spread out flushes")
f.BoolVar(&cfg.SpreadFlushes, "ingester.spread-flushes", false, "If true, spread series flushes across the whole period of MaxChunkAge")
f.IntVar(&cfg.MinChunkLength, "ingester.min-chunk-length", 0, "Minimum number of samples in an idle chunk to flush it to the store. Use with care, if chunks are less than this size they will be discarded.")
f.IntVar(&cfg.ConcurrentFlushes, "ingester.concurrent-flushes", 50, "Number of concurrent goroutines flushing to dynamodb.")
f.DurationVar(&cfg.RateUpdatePeriod, "ingester.rate-update-period", 15*time.Second, "Period with which to update the per-user ingestion rates.")
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/util/validation/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type Limits struct {
MaxSamplesPerQuery int `yaml:"max_samples_per_query"`
MaxSeriesPerUser int `yaml:"max_series_per_user"`
MaxSeriesPerMetric int `yaml:"max_series_per_metric"`
MinChunkLength int `yaml:"min_chunk_length"`

// Querier enforced limits.
MaxChunksPerQuery int `yaml:"max_chunks_per_query"`
Expand Down Expand Up @@ -61,6 +62,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&l.MaxSamplesPerQuery, "ingester.max-samples-per-query", 1000000, "The maximum number of samples that a query can return.")
f.IntVar(&l.MaxSeriesPerUser, "ingester.max-series-per-user", 5000000, "Maximum number of active series per user.")
f.IntVar(&l.MaxSeriesPerMetric, "ingester.max-series-per-metric", 50000, "Maximum number of active series per metric name.")
f.IntVar(&l.MinChunkLength, "ingester.min-chunk-length", 0, "Minimum number of samples in an idle chunk to flush it to the store. Use with care, if chunks are less than this size they will be discarded.")

f.IntVar(&l.MaxChunksPerQuery, "store.query-chunk-limit", 2e6, "Maximum number of chunks that can be fetched in a single query.")
f.DurationVar(&l.MaxQueryLength, "store.max-query-length", 0, "Limit to length of chunk store queries, 0 to disable.")
Expand Down Expand Up @@ -229,6 +231,11 @@ func (o *Overrides) CardinalityLimit(userID string) int {
return o.overridesManager.GetLimits(userID).(*Limits).CardinalityLimit
}

// MinChunkLength returns the minimum size of chunk that will be saved by ingesters
func (o *Overrides) MinChunkLength(userID string) int {
return o.overridesManager.GetLimits(userID).(*Limits).MinChunkLength
}

// Loads overrides and returns the limits as an interface to store them in OverridesManager.
// We need to implement it here since OverridesManager must store type Limits in an interface but
// it doesn't know its definition to initialize it.
Expand Down