Skip to content

Commit 3bba603

Browse files
committed
Separate out userStates options into config struct
1 parent e368948 commit 3bba603

File tree

3 files changed

+29
-24
lines changed

3 files changed

+29
-24
lines changed

ingester/ingester.go

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -104,21 +104,20 @@ type Config struct {
104104
FlushCheckPeriod time.Duration
105105
MaxChunkIdle time.Duration
106106
MaxChunkAge time.Duration
107-
RateUpdatePeriod time.Duration
108107
ConcurrentFlushes int
109-
MaxSeriesPerUser int
110108
ChunkEncoding string
109+
UserStatesConfig UserStatesConfig
111110
}
112111

113112
// RegisterFlags adds the flags required to config this to the given FlagSet
114113
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
115114
f.DurationVar(&cfg.FlushCheckPeriod, "ingester.flush-period", 1*time.Minute, "Period with which to attempt to flush chunks.")
116-
f.DurationVar(&cfg.RateUpdatePeriod, "ingester.rate-update-period", 15*time.Second, "Period with which to update the per-user ingestion rates.")
117115
f.DurationVar(&cfg.MaxChunkIdle, "ingester.max-chunk-idle", 1*time.Hour, "Maximum chunk idle time before flushing.")
118116
f.DurationVar(&cfg.MaxChunkAge, "ingester.max-chunk-age", 12*time.Hour, "Maximum chunk age time before flushing.")
119117
f.IntVar(&cfg.ConcurrentFlushes, "ingester.concurrent-flushes", DefaultConcurrentFlush, "Number of concurrent goroutines flushing to dynamodb.")
120118
f.StringVar(&cfg.ChunkEncoding, "ingester.chunk-encoding", "1", "Encoding version to use for chunks.")
121-
f.IntVar(&cfg.MaxSeriesPerUser, "ingester.max-series-per-user", DefaultMaxSeriesPerUser, "Maximum number of active series per user.")
119+
f.DurationVar(&cfg.UserStatesConfig.RateUpdatePeriod, "ingester.rate-update-period", 15*time.Second, "Period with which to update the per-user ingestion rates.")
120+
f.IntVar(&cfg.UserStatesConfig.MaxSeriesPerUser, "ingester.max-series-per-user", DefaultMaxSeriesPerUser, "Maximum number of active series per user.")
122121
}
123122

124123
type flushOp struct {
@@ -144,18 +143,18 @@ func New(cfg Config, chunkStore cortex_chunk.Store, ring *ring.Ring) (*Ingester,
144143
if cfg.MaxChunkIdle == 0 {
145144
cfg.MaxChunkIdle = 1 * time.Hour
146145
}
147-
if cfg.RateUpdatePeriod == 0 {
148-
cfg.RateUpdatePeriod = 15 * time.Second
149-
}
150146
if cfg.ConcurrentFlushes <= 0 {
151147
cfg.ConcurrentFlushes = DefaultConcurrentFlush
152148
}
153-
if cfg.MaxSeriesPerUser <= 0 {
154-
cfg.MaxSeriesPerUser = DefaultMaxSeriesPerUser
155-
}
156149
if cfg.ChunkEncoding == "" {
157150
cfg.ChunkEncoding = "1"
158151
}
152+
if cfg.UserStatesConfig.MaxSeriesPerUser <= 0 {
153+
cfg.UserStatesConfig.MaxSeriesPerUser = DefaultMaxSeriesPerUser
154+
}
155+
if cfg.UserStatesConfig.RateUpdatePeriod == 0 {
156+
cfg.UserStatesConfig.RateUpdatePeriod = 15 * time.Second
157+
}
159158

160159
if err := chunk.DefaultEncoding.Set(cfg.ChunkEncoding); err != nil {
161160
return nil, err
@@ -169,7 +168,7 @@ func New(cfg Config, chunkStore cortex_chunk.Store, ring *ring.Ring) (*Ingester,
169168

170169
startTime: time.Now(),
171170

172-
userStates: newUserStates(cfg.RateUpdatePeriod, cfg.MaxSeriesPerUser),
171+
userStates: newUserStates(&cfg.UserStatesConfig),
173172
flushQueues: make([]*util.PriorityQueue, cfg.ConcurrentFlushes, cfg.ConcurrentFlushes),
174173

175174
ingestedSamples: prometheus.NewCounter(prometheus.CounterOpts{
@@ -434,7 +433,7 @@ func (i *Ingester) loop() {
434433
}()
435434

436435
flushTick := time.Tick(i.cfg.FlushCheckPeriod)
437-
rateUpdateTick := time.Tick(i.cfg.RateUpdatePeriod)
436+
rateUpdateTick := time.Tick(i.cfg.UserStatesConfig.RateUpdatePeriod)
438437
for {
439438
select {
440439
case <-flushTick:

ingester/ingester_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,9 @@ func TestIngesterSeriesLimitExceeded(t *testing.T) {
153153
cfg := Config{
154154
FlushCheckPeriod: 99999 * time.Hour,
155155
MaxChunkIdle: 99999 * time.Hour,
156-
MaxSeriesPerUser: 1,
156+
UserStatesConfig: UserStatesConfig{
157+
MaxSeriesPerUser: 1,
158+
},
157159
}
158160
store := &testStore{
159161
chunks: map[string][]chunk.Chunk{},

ingester/user_state.go

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,9 @@ import (
1414
)
1515

1616
type userStates struct {
17-
mtx sync.RWMutex
18-
states map[string]*userState
19-
rateUpdatePeriod time.Duration
20-
maxSeriesPerUser int
17+
mtx sync.RWMutex
18+
states map[string]*userState
19+
cfg *UserStatesConfig
2120
}
2221

2322
type userState struct {
@@ -29,11 +28,16 @@ type userState struct {
2928
ingestedSamples *ewmaRate
3029
}
3130

32-
func newUserStates(rateUpdatePeriod time.Duration, maxSeriesPerUser int) *userStates {
31+
// UserStatesConfig configures userStates properties.
32+
type UserStatesConfig struct {
33+
RateUpdatePeriod time.Duration
34+
MaxSeriesPerUser int
35+
}
36+
37+
func newUserStates(cfg *UserStatesConfig) *userStates {
3338
return &userStates{
34-
states: map[string]*userState{},
35-
rateUpdatePeriod: rateUpdatePeriod,
36-
maxSeriesPerUser: maxSeriesPerUser,
39+
states: map[string]*userState{},
40+
cfg: cfg,
3741
}
3842
}
3943

@@ -126,7 +130,7 @@ func (us *userStates) getOrCreateSeries(ctx context.Context, metric model.Metric
126130
us.mtx.RLock()
127131
state, ok = us.states[userID]
128132
if ok {
129-
fp, series, err = state.unlockedGet(metric, us.maxSeriesPerUser)
133+
fp, series, err = state.unlockedGet(metric, us.cfg.MaxSeriesPerUser)
130134
if err != nil {
131135
us.mtx.RUnlock()
132136
return nil, fp, nil, err
@@ -140,7 +144,7 @@ func (us *userStates) getOrCreateSeries(ctx context.Context, metric model.Metric
140144
us.mtx.Lock()
141145
defer us.mtx.Unlock()
142146
state = us.unlockedGetOrCreate(userID)
143-
fp, series, err = state.unlockedGet(metric, us.maxSeriesPerUser)
147+
fp, series, err = state.unlockedGet(metric, us.cfg.MaxSeriesPerUser)
144148
return state, fp, series, err
145149
}
146150

@@ -152,7 +156,7 @@ func (us *userStates) unlockedGetOrCreate(userID string) *userState {
152156
fpToSeries: newSeriesMap(),
153157
fpLocker: newFingerprintLocker(16),
154158
index: newInvertedIndex(),
155-
ingestedSamples: newEWMARate(0.2, us.rateUpdatePeriod),
159+
ingestedSamples: newEWMARate(0.2, us.cfg.RateUpdatePeriod),
156160
}
157161
state.mapper = newFPMapper(state.fpToSeries)
158162
us.states[userID] = state

0 commit comments

Comments
 (0)