Skip to content

Commit dd4240d

Browse files
authored
Graduating stream-chunks-when-using-blocks configuration (#4864)
* Graduating `stream-chunks-when-using-blocks` configuration Signed-off-by: Alan Protasio <approtas@amazon.com> * changelog Signed-off-by: Alan Protasio <approtas@amazon.com> * fix test Signed-off-by: Alan Protasio <approtas@amazon.com> * Keeping documentation Signed-off-by: Alan Protasio <approtas@amazon.com> * make clean-white-noise Signed-off-by: Alan Protasio <approtas@amazon.com> Signed-off-by: Alan Protasio <approtas@amazon.com>
1 parent d6dbc90 commit dd4240d

File tree

7 files changed

+23
-293
lines changed

7 files changed

+23
-293
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
* [CHANGE] Remove support for alertmanager and ruler legacy store configuration. Before upgrading, you need to convert your configuration to use the `alertmanager-storage` and `ruler-storage` configuration on the version that you're already running, then upgrade.
4141
* [CHANGE] Disables TSDB isolation. #4825
4242
* [CHANGE] Drops support Prometheus 1.x rule format on configdb. #4826
43+
* [CHANGE] Removes `-ingester.stream-chunks-when-using-blocks` experimental flag and stream chunks by default when `querier.ingester-streaming` is enabled. #4864
4344
* [ENHANCEMENT] AlertManager: Retrying AlertManager Get Requests (Get Alertmanager status, Get Alertmanager Receivers) on next replica on error #4840
4445
* [ENHANCEMENT] Querier/Ruler: Retry store-gateway in case of unexpected failure, instead of failing the query. #4532 #4839
4546
* [ENHANCEMENT] Ring: DoBatch prioritize 4xx errors when failing. #4783

docs/configuration/v1-guarantees.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ Currently experimental features are:
6565
- Querier: tenant federation
6666
- The thanosconvert tool for converting Thanos block metadata to Cortex
6767
- HA Tracker: cleanup of old replicas from KV Store.
68-
- Flags for configuring whether blocks-ingester streams samples or chunks are temporary, and will be removed when feature is tested:
68+
- Flags for configuring whether blocks-ingester streams samples or chunks are temporary, and will be removed on next release:
6969
- `-ingester.stream-chunks-when-using-blocks` CLI flag
7070
- `-ingester_stream_chunks_when_using_blocks` (boolean) field in runtime config file
7171
- Instance limits in ingester and distributor

integration/ruler_test.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -540,8 +540,6 @@ func TestRulerMetricsForInvalidQueries(t *testing.T) {
540540

541541
// Very low limit so that ruler hits it.
542542
"-querier.max-fetched-chunks-per-query": "5",
543-
// We need this to make limit work.
544-
"-ingester.stream-chunks-when-using-blocks": "true",
545543
},
546544
)
547545

pkg/cortex/modules.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -391,7 +391,6 @@ func (t *Cortex) initIngesterService() (serv services.Service, err error) {
391391
t.Cfg.Ingester.LifecyclerConfig.ListenPort = t.Cfg.Server.GRPCListenPort
392392
t.Cfg.Ingester.DistributorShardingStrategy = t.Cfg.Distributor.ShardingStrategy
393393
t.Cfg.Ingester.DistributorShardByAllLabels = t.Cfg.Distributor.ShardByAllLabels
394-
t.Cfg.Ingester.StreamTypeFn = ingesterChunkStreaming(t.RuntimeConfig)
395394
t.Cfg.Ingester.InstanceLimitsFn = ingesterInstanceLimits(t.RuntimeConfig)
396395
t.tsdbIngesterConfig()
397396

pkg/cortex/runtime_config.go

Lines changed: 0 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -104,28 +104,6 @@ func multiClientRuntimeConfigChannel(manager *runtimeconfig.Manager) func() <-ch
104104
}
105105
}
106106

107-
func ingesterChunkStreaming(manager *runtimeconfig.Manager) func() ingester.QueryStreamType {
108-
if manager == nil {
109-
return nil
110-
}
111-
112-
return func() ingester.QueryStreamType {
113-
val := manager.GetConfig()
114-
if cfg, ok := val.(*runtimeConfigValues); ok && cfg != nil {
115-
if cfg.IngesterChunkStreaming == nil {
116-
return ingester.QueryStreamDefault
117-
}
118-
119-
if *cfg.IngesterChunkStreaming {
120-
return ingester.QueryStreamChunks
121-
}
122-
return ingester.QueryStreamSamples
123-
}
124-
125-
return ingester.QueryStreamDefault
126-
}
127-
}
128-
129107
func ingesterInstanceLimits(manager *runtimeconfig.Manager) func() *ingester.InstanceLimits {
130108
if manager == nil {
131109
return nil

pkg/ingester/ingester.go

Lines changed: 2 additions & 106 deletions
Original file line numberDiff line numberDiff line change
@@ -97,10 +97,7 @@ type Config struct {
9797
ActiveSeriesMetricsIdleTimeout time.Duration `yaml:"active_series_metrics_idle_timeout"`
9898

9999
// Use blocks storage.
100-
BlocksStorageConfig cortex_tsdb.BlocksStorageConfig `yaml:"-"`
101-
StreamChunksWhenUsingBlocks bool `yaml:"-"`
102-
// Runtime-override for type of streaming query to use (chunks or samples).
103-
StreamTypeFn func() QueryStreamType `yaml:"-"`
100+
BlocksStorageConfig cortex_tsdb.BlocksStorageConfig `yaml:"-"`
104101

105102
// Injected at runtime and read from the distributor config, required
106103
// to accurately apply global limits.
@@ -126,7 +123,6 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
126123
f.BoolVar(&cfg.ActiveSeriesMetricsEnabled, "ingester.active-series-metrics-enabled", true, "Enable tracking of active series and export them as metrics.")
127124
f.DurationVar(&cfg.ActiveSeriesMetricsUpdatePeriod, "ingester.active-series-metrics-update-period", 1*time.Minute, "How often to update active series metrics.")
128125
f.DurationVar(&cfg.ActiveSeriesMetricsIdleTimeout, "ingester.active-series-metrics-idle-timeout", 10*time.Minute, "After what time a series is considered to be inactive.")
129-
f.BoolVar(&cfg.StreamChunksWhenUsingBlocks, "ingester.stream-chunks-when-using-blocks", false, "Stream chunks when using blocks. This is experimental feature and not yet tested. Once ready, it will be made default and this config option removed.")
130126

131127
f.Float64Var(&cfg.DefaultLimits.MaxIngestionRate, "ingester.instance-limits.max-ingestion-rate", 0, "Max ingestion rate (samples/sec) that ingester will accept. This limit is per-ingester, not per-tenant. Additional push requests will be rejected. Current ingestion rate is computed as exponentially weighted moving average, updated every second. This limit only works when using blocks engine. 0 = unlimited.")
132128
f.Int64Var(&cfg.DefaultLimits.MaxInMemoryTenants, "ingester.instance-limits.max-tenants", 0, "Max users that this ingester can hold. Requests from additional users will be rejected. This limit only works when using blocks engine. 0 = unlimited.")
@@ -223,15 +219,6 @@ func (r tsdbCloseCheckResult) shouldClose() bool {
223219
return r == tsdbIdle || r == tsdbTenantMarkedForDeletion
224220
}
225221

226-
// QueryStreamType defines type of function to use when doing query-stream operation.
227-
type QueryStreamType int
228-
229-
const (
230-
QueryStreamDefault QueryStreamType = iota // Use default configured value.
231-
QueryStreamSamples // Stream individual samples.
232-
QueryStreamChunks // Stream entire chunks.
233-
)
234-
235222
type userTSDB struct {
236223
db *tsdb.DB
237224
userID string
@@ -1622,31 +1609,8 @@ func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_
16221609

16231610
numSamples := 0
16241611
numSeries := 0
1612+
numSeries, numSamples, err = i.queryStreamChunks(ctx, db, int64(from), int64(through), matchers, stream)
16251613

1626-
streamType := QueryStreamSamples
1627-
if i.cfg.StreamChunksWhenUsingBlocks {
1628-
streamType = QueryStreamChunks
1629-
}
1630-
1631-
if i.cfg.StreamTypeFn != nil {
1632-
runtimeType := i.cfg.StreamTypeFn()
1633-
switch runtimeType {
1634-
case QueryStreamChunks:
1635-
streamType = QueryStreamChunks
1636-
case QueryStreamSamples:
1637-
streamType = QueryStreamSamples
1638-
default:
1639-
// no change from config value.
1640-
}
1641-
}
1642-
1643-
if streamType == QueryStreamChunks {
1644-
level.Debug(spanlog).Log("msg", "using queryStreamChunks")
1645-
numSeries, numSamples, err = i.queryStreamChunks(ctx, db, int64(from), int64(through), matchers, stream)
1646-
} else {
1647-
level.Debug(spanlog).Log("msg", "using QueryStreamSamples")
1648-
numSeries, numSamples, err = i.queryStreamSamples(ctx, db, int64(from), int64(through), matchers, stream)
1649-
}
16501614
if err != nil {
16511615
return err
16521616
}
@@ -1657,74 +1621,6 @@ func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_
16571621
return nil
16581622
}
16591623

1660-
func (i *Ingester) queryStreamSamples(ctx context.Context, db *userTSDB, from, through int64, matchers []*labels.Matcher, stream client.Ingester_QueryStreamServer) (numSeries, numSamples int, _ error) {
1661-
q, err := db.Querier(ctx, from, through)
1662-
if err != nil {
1663-
return 0, 0, err
1664-
}
1665-
defer q.Close()
1666-
1667-
// It's not required to return sorted series because series are sorted by the Cortex querier.
1668-
ss := q.Select(false, nil, matchers...)
1669-
if ss.Err() != nil {
1670-
return 0, 0, ss.Err()
1671-
}
1672-
1673-
timeseries := make([]cortexpb.TimeSeries, 0, queryStreamBatchSize)
1674-
batchSizeBytes := 0
1675-
for ss.Next() {
1676-
series := ss.At()
1677-
1678-
// convert labels to LabelAdapter
1679-
ts := cortexpb.TimeSeries{
1680-
Labels: cortexpb.FromLabelsToLabelAdapters(series.Labels()),
1681-
}
1682-
1683-
it := series.Iterator()
1684-
for it.Next() {
1685-
t, v := it.At()
1686-
ts.Samples = append(ts.Samples, cortexpb.Sample{Value: v, TimestampMs: t})
1687-
}
1688-
numSamples += len(ts.Samples)
1689-
numSeries++
1690-
tsSize := ts.Size()
1691-
1692-
if (batchSizeBytes > 0 && batchSizeBytes+tsSize > queryStreamBatchMessageSize) || len(timeseries) >= queryStreamBatchSize {
1693-
// Adding this series to the batch would make it too big,
1694-
// flush the data and add it to new batch instead.
1695-
err = client.SendQueryStream(stream, &client.QueryStreamResponse{
1696-
Timeseries: timeseries,
1697-
})
1698-
if err != nil {
1699-
return 0, 0, err
1700-
}
1701-
1702-
batchSizeBytes = 0
1703-
timeseries = timeseries[:0]
1704-
}
1705-
1706-
timeseries = append(timeseries, ts)
1707-
batchSizeBytes += tsSize
1708-
}
1709-
1710-
// Ensure no error occurred while iterating the series set.
1711-
if err := ss.Err(); err != nil {
1712-
return 0, 0, err
1713-
}
1714-
1715-
// Final flush any existing metrics
1716-
if batchSizeBytes != 0 {
1717-
err = client.SendQueryStream(stream, &client.QueryStreamResponse{
1718-
Timeseries: timeseries,
1719-
})
1720-
if err != nil {
1721-
return 0, 0, err
1722-
}
1723-
}
1724-
1725-
return numSeries, numSamples, nil
1726-
}
1727-
17281624
// queryStreamChunks streams metrics from a TSDB. This implements the client.IngesterServer interface
17291625
func (i *Ingester) queryStreamChunks(ctx context.Context, db *userTSDB, from, through int64, matchers []*labels.Matcher, stream client.Ingester_QueryStreamServer) (numSeries, numSamples int, _ error) {
17301626
q, err := db.ChunkQuerier(ctx, from, through)

0 commit comments

Comments
 (0)