Skip to content

Commit 5dccf08

Browse files
authored
Stream chunks from blocks-ingester to querier (#3889)
Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>
1 parent 2b94533 commit 5dccf08

16 files changed

+604
-72
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@
8989
* `cortex_bucket_store_chunk_pool_requested_bytes_total`
9090
* `cortex_bucket_store_chunk_pool_returned_bytes_total`
9191
* [ENHANCEMENT] Alertmanager: load alertmanager configurations from object storage concurrently, and only load necessary configurations, speeding configuration synchronization process and executing fewer "GET object" operations to the storage when sharding is enabled. #3898
92+
* [ENHANCEMENT] Blocks storage: Ingester can now stream entire chunks instead of individual samples to the querier. At the moment this feature must be explicitly enabled either by using `-ingester.stream-chunks-when-using-blocks` flag or `ingester_stream_chunks_when_using_blocks` (boolean) field in runtime config file, but these configuration options are temporary and will be removed when feature is stable. #3889
9293
* [BUGFIX] Cortex: Fixed issue where fatal errors and various log messages where not logged. #3778
9394
* [BUGFIX] HA Tracker: don't track as error in the `cortex_kv_request_duration_seconds` metric a CAS operation intentionally aborted. #3745
9495
* [BUGFIX] Querier / ruler: do not log "error removing stale clients" if the ring is empty. #3761

docs/configuration/v1-guarantees.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,3 +72,7 @@ Currently experimental features are:
7272
- HA Tracker: cleanup of old replicas from KV Store.
7373
- Ruler storage: backend client configuration options using a config fields similar to the blocks storage backend clients.
7474
- Alertmanager storage: backend client configuration options using a config fields similar to the blocks storage backend clients.
75+
- Ruler storage: backend client configuration options using a config fields similar to the TSDB object storage clients.
76+
- Flags for configuring whether blocks-ingester streams samples or chunks are temporary, and will be removed when feature is tested:
77+
- `-ingester.stream-chunks-when-using-blocks` CLI flag
78+
- `ingester_stream_chunks_when_using_blocks` (boolean) field in runtime config file

integration/querier_streaming_mixed_ingester_test.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package integration
55
import (
66
"context"
77
"flag"
8+
"fmt"
89
"strings"
910
"testing"
1011
"time"
@@ -21,6 +22,14 @@ import (
2122
)
2223

2324
func TestQuerierWithStreamingBlocksAndChunksIngesters(t *testing.T) {
25+
for _, streamChunks := range []bool{false, true} {
26+
t.Run(fmt.Sprintf("%v", streamChunks), func(t *testing.T) {
27+
testQuerierWithStreamingBlocksAndChunksIngesters(t, streamChunks)
28+
})
29+
}
30+
}
31+
32+
func testQuerierWithStreamingBlocksAndChunksIngesters(t *testing.T, streamChunks bool) {
2433
s, err := e2e.NewScenario(networkName)
2534
require.NoError(t, err)
2635
defer s.Close()
@@ -33,6 +42,7 @@ func TestQuerierWithStreamingBlocksAndChunksIngesters(t *testing.T) {
3342
"-store-gateway.sharding-enabled": "false",
3443
"-querier.ingester-streaming": "true",
3544
})
45+
blockFlags["-ingester.stream-chunks-when-using-blocks"] = fmt.Sprintf("%v", streamChunks)
3646

3747
// Start dependencies.
3848
consul := e2edb.NewConsul()

pkg/chunk/encoding/chunk_test.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import (
2929

3030
func TestLen(t *testing.T) {
3131
chunks := []Chunk{}
32-
for _, encoding := range []Encoding{DoubleDelta, Varbit, Bigchunk} {
32+
for _, encoding := range []Encoding{DoubleDelta, Varbit, Bigchunk, PrometheusXorChunk} {
3333
c, err := NewForEncoding(encoding)
3434
if err != nil {
3535
t.Fatal(err)
@@ -63,6 +63,7 @@ func TestChunk(t *testing.T) {
6363
{DoubleDelta, 989},
6464
{Varbit, 2048},
6565
{Bigchunk, 4096},
66+
{PrometheusXorChunk, 2048},
6667
} {
6768
for samples := tc.maxSamples / 10; samples < tc.maxSamples; samples += tc.maxSamples / 10 {
6869

@@ -87,9 +88,11 @@ func TestChunk(t *testing.T) {
8788
testChunkBatch(t, tc.encoding, samples)
8889
})
8990

90-
t.Run(fmt.Sprintf("testChunkRebound/%s/%d", tc.encoding.String(), samples), func(t *testing.T) {
91-
testChunkRebound(t, tc.encoding, samples)
92-
})
91+
if tc.encoding != PrometheusXorChunk {
92+
t.Run(fmt.Sprintf("testChunkRebound/%s/%d", tc.encoding.String(), samples), func(t *testing.T) {
93+
testChunkRebound(t, tc.encoding, samples)
94+
})
95+
}
9396
}
9497
}
9598
}

pkg/chunk/encoding/factory.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ const (
5252
Varbit
5353
// Bigchunk encoding
5454
Bigchunk
55+
// PrometheusXorChunk is a wrapper around Prometheus XOR-encoded chunk.
56+
PrometheusXorChunk
5557
)
5658

5759
type encoding struct {
@@ -78,6 +80,12 @@ var encodings = map[Encoding]encoding{
7880
return newBigchunk()
7981
},
8082
},
83+
PrometheusXorChunk: {
84+
Name: "PrometheusXorChunk",
85+
New: func() Chunk {
86+
return newPrometheusXorChunk()
87+
},
88+
},
8189
}
8290

8391
// Set implements flag.Value.
Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
package encoding
2+
3+
import (
4+
"io"
5+
6+
"github.com/pkg/errors"
7+
"github.com/prometheus/common/model"
8+
"github.com/prometheus/prometheus/tsdb/chunkenc"
9+
)
10+
11+
// Wrapper around Prometheus chunk.
12+
type prometheusXorChunk struct {
13+
chunk chunkenc.Chunk
14+
}
15+
16+
func newPrometheusXorChunk() *prometheusXorChunk {
17+
return &prometheusXorChunk{}
18+
}
19+
20+
// Add adds another sample to the chunk. While Add works, it is only implemented
21+
// to make tests work, and should not be used in production. In particular, it appends
22+
// all samples to single chunk, and uses new Appender for each Add.
23+
func (p *prometheusXorChunk) Add(m model.SamplePair) (Chunk, error) {
24+
if p.chunk == nil {
25+
p.chunk = chunkenc.NewXORChunk()
26+
}
27+
28+
app, err := p.chunk.Appender()
29+
if err != nil {
30+
return nil, err
31+
}
32+
33+
app.Append(int64(m.Timestamp), float64(m.Value))
34+
return nil, nil
35+
}
36+
37+
func (p *prometheusXorChunk) NewIterator(iterator Iterator) Iterator {
38+
if p.chunk == nil {
39+
return errorIterator("Prometheus chunk is not set")
40+
}
41+
42+
if pit, ok := iterator.(*prometheusChunkIterator); ok {
43+
pit.c = p.chunk
44+
pit.it = p.chunk.Iterator(pit.it)
45+
return pit
46+
}
47+
48+
return &prometheusChunkIterator{c: p.chunk, it: p.chunk.Iterator(nil)}
49+
}
50+
51+
func (p *prometheusXorChunk) Marshal(i io.Writer) error {
52+
if p.chunk == nil {
53+
return errors.New("chunk data not set")
54+
}
55+
_, err := i.Write(p.chunk.Bytes())
56+
return err
57+
}
58+
59+
func (p *prometheusXorChunk) UnmarshalFromBuf(bytes []byte) error {
60+
c, err := chunkenc.FromData(chunkenc.EncXOR, bytes)
61+
if err != nil {
62+
return errors.Wrap(err, "failed to create Prometheus chunk from bytes")
63+
}
64+
65+
p.chunk = c
66+
return nil
67+
}
68+
69+
func (p *prometheusXorChunk) Encoding() Encoding {
70+
return PrometheusXorChunk
71+
}
72+
73+
func (p *prometheusXorChunk) Utilization() float64 {
74+
// Used for reporting when chunk is used to store new data.
75+
return 0
76+
}
77+
78+
func (p *prometheusXorChunk) Slice(_, _ model.Time) Chunk {
79+
return p
80+
}
81+
82+
func (p *prometheusXorChunk) Rebound(from, to model.Time) (Chunk, error) {
83+
return nil, errors.New("Rebound not supported by PrometheusXorChunk")
84+
}
85+
86+
func (p *prometheusXorChunk) Len() int {
87+
if p.chunk == nil {
88+
return 0
89+
}
90+
return p.chunk.NumSamples()
91+
}
92+
93+
func (p *prometheusXorChunk) Size() int {
94+
if p.chunk == nil {
95+
return 0
96+
}
97+
return len(p.chunk.Bytes())
98+
}
99+
100+
type prometheusChunkIterator struct {
101+
c chunkenc.Chunk // we need chunk, because FindAtOrAfter needs to start with fresh iterator.
102+
it chunkenc.Iterator
103+
}
104+
105+
func (p *prometheusChunkIterator) Scan() bool {
106+
return p.it.Next()
107+
}
108+
109+
func (p *prometheusChunkIterator) FindAtOrAfter(time model.Time) bool {
110+
// FindAtOrAfter must return OLDEST value at given time. That means we need to start with a fresh iterator,
111+
// otherwise we cannot guarantee OLDEST.
112+
p.it = p.c.Iterator(p.it)
113+
return p.it.Seek(int64(time))
114+
}
115+
116+
func (p *prometheusChunkIterator) Value() model.SamplePair {
117+
ts, val := p.it.At()
118+
return model.SamplePair{
119+
Timestamp: model.Time(ts),
120+
Value: model.SampleValue(val),
121+
}
122+
}
123+
124+
func (p *prometheusChunkIterator) Batch(size int) Batch {
125+
var batch Batch
126+
j := 0
127+
for j < size {
128+
t, v := p.it.At()
129+
batch.Timestamps[j] = t
130+
batch.Values[j] = v
131+
j++
132+
if j < size && !p.it.Next() {
133+
break
134+
}
135+
}
136+
batch.Index = 0
137+
batch.Length = j
138+
return batch
139+
}
140+
141+
func (p *prometheusChunkIterator) Err() error {
142+
return p.it.Err()
143+
}
144+
145+
type errorIterator string
146+
147+
func (e errorIterator) Scan() bool { return false }
148+
func (e errorIterator) FindAtOrAfter(time model.Time) bool { return false }
149+
func (e errorIterator) Value() model.SamplePair { panic("no values") }
150+
func (e errorIterator) Batch(size int) Batch { panic("no values") }
151+
func (e errorIterator) Err() error { return errors.New(string(e)) }

pkg/cortex/modules.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -415,6 +415,7 @@ func (t *Cortex) initIngesterService() (serv services.Service, err error) {
415415
t.Cfg.Ingester.LifecyclerConfig.ListenPort = t.Cfg.Server.GRPCListenPort
416416
t.Cfg.Ingester.DistributorShardingStrategy = t.Cfg.Distributor.ShardingStrategy
417417
t.Cfg.Ingester.DistributorShardByAllLabels = t.Cfg.Distributor.ShardByAllLabels
418+
t.Cfg.Ingester.StreamTypeFn = ingesterChunkStreaming(t.RuntimeConfig)
418419
t.tsdbIngesterConfig()
419420

420421
t.Ingester, err = ingester.New(t.Cfg.Ingester, t.Cfg.IngesterClient, t.Overrides, t.Store, prometheus.DefaultRegisterer, util_log.Logger)

pkg/cortex/runtime_config.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77

88
"gopkg.in/yaml.v2"
99

10+
"github.com/cortexproject/cortex/pkg/ingester"
1011
"github.com/cortexproject/cortex/pkg/ring/kv"
1112
"github.com/cortexproject/cortex/pkg/util"
1213
"github.com/cortexproject/cortex/pkg/util/runtimeconfig"
@@ -24,6 +25,8 @@ type runtimeConfigValues struct {
2425
TenantLimits map[string]*validation.Limits `yaml:"overrides"`
2526

2627
Multi kv.MultiRuntimeConfig `yaml:"multi_kv_config"`
28+
29+
IngesterChunkStreaming *bool `yaml:"ingester_stream_chunks_when_using_blocks"`
2730
}
2831

2932
// runtimeConfigTenantLimits provides per-tenant limit overrides based on a runtimeconfig.Manager
@@ -98,6 +101,29 @@ func multiClientRuntimeConfigChannel(manager *runtimeconfig.Manager) func() <-ch
98101
return outCh
99102
}
100103
}
104+
105+
func ingesterChunkStreaming(manager *runtimeconfig.Manager) func() ingester.QueryStreamType {
106+
if manager == nil {
107+
return nil
108+
}
109+
110+
return func() ingester.QueryStreamType {
111+
val := manager.GetConfig()
112+
if cfg, ok := val.(*runtimeConfigValues); ok && cfg != nil {
113+
if cfg.IngesterChunkStreaming == nil {
114+
return ingester.QueryStreamDefault
115+
}
116+
117+
if *cfg.IngesterChunkStreaming {
118+
return ingester.QueryStreamChunks
119+
}
120+
return ingester.QueryStreamSamples
121+
}
122+
123+
return ingester.QueryStreamDefault
124+
}
125+
}
126+
101127
func runtimeConfigHandler(runtimeCfgManager *runtimeconfig.Manager, defaultLimits validation.Limits) http.HandlerFunc {
102128
return func(w http.ResponseWriter, r *http.Request) {
103129
cfg, ok := runtimeCfgManager.GetConfig().(*runtimeConfigValues)

pkg/ingester/ingester.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,11 @@ type Config struct {
7979
ActiveSeriesMetricsIdleTimeout time.Duration `yaml:"active_series_metrics_idle_timeout"`
8080

8181
// Use blocks storage.
82-
BlocksStorageEnabled bool `yaml:"-"`
83-
BlocksStorageConfig tsdb.BlocksStorageConfig `yaml:"-"`
82+
BlocksStorageEnabled bool `yaml:"-"`
83+
BlocksStorageConfig tsdb.BlocksStorageConfig `yaml:"-"`
84+
StreamChunksWhenUsingBlocks bool `yaml:"-"`
85+
// Runtime-override for type of streaming query to use (chunks or samples).
86+
StreamTypeFn func() QueryStreamType `yaml:"-"`
8487

8588
// Injected at runtime and read from the distributor config, required
8689
// to accurately apply global limits.
@@ -114,6 +117,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
114117
f.BoolVar(&cfg.ActiveSeriesMetricsEnabled, "ingester.active-series-metrics-enabled", false, "Enable tracking of active series and export them as metrics.")
115118
f.DurationVar(&cfg.ActiveSeriesMetricsUpdatePeriod, "ingester.active-series-metrics-update-period", 1*time.Minute, "How often to update active series metrics.")
116119
f.DurationVar(&cfg.ActiveSeriesMetricsIdleTimeout, "ingester.active-series-metrics-idle-timeout", 10*time.Minute, "After what time a series is considered to be inactive.")
120+
f.BoolVar(&cfg.StreamChunksWhenUsingBlocks, "ingester.stream-chunks-when-using-blocks", false, "Stream chunks when using blocks. This is experimental feature and not yet tested. Once ready, it will be made default and this config option removed.")
117121
}
118122

119123
// Ingester deals with "in flight" chunks. Based on Prometheus 1.x

0 commit comments

Comments
 (0)