Skip to content

Commit

Permalink
chore(blooms): Add setting to fetch bloom blocks async or synchronous…
Browse files Browse the repository at this point in the history
…ly (#14862)

In case bloom blocks are small, they could be downloaded synchronously to avoid missing blocks on cold start.
This is an undocumented setting.

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
  • Loading branch information
chaudum authored Nov 11, 2024
1 parent f5ae015 commit f3f0184
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 11 deletions.
1 change: 1 addition & 0 deletions pkg/bloomgateway/bloomgateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ func New(cfg Config, store bloomshipper.Store, logger log.Logger, reg prometheus
workerConfig: workerConfig{
maxItems: cfg.NumMultiplexItems,
queryConcurrency: cfg.BlockQueryConcurrency,
async: cfg.FetchBlocksAsync,
},
pendingTasks: &atomic.Int64{},

Expand Down
10 changes: 6 additions & 4 deletions pkg/bloomgateway/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@ type Config struct {
// Client configures the Bloom Gateway client
Client ClientConfig `yaml:"client,omitempty" doc:""`

WorkerConcurrency int `yaml:"worker_concurrency"`
BlockQueryConcurrency int `yaml:"block_query_concurrency"`
MaxOutstandingPerTenant int `yaml:"max_outstanding_per_tenant"`
NumMultiplexItems int `yaml:"num_multiplex_tasks"`
WorkerConcurrency int `yaml:"worker_concurrency"`
BlockQueryConcurrency int `yaml:"block_query_concurrency"`
MaxOutstandingPerTenant int `yaml:"max_outstanding_per_tenant"`
NumMultiplexItems int `yaml:"num_multiplex_tasks"`
FetchBlocksAsync bool `yaml:"fetch_blocks_async" doc:"hidden"`
}

// RegisterFlags registers flags for the Bloom Gateway configuration.
Expand All @@ -29,6 +30,7 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.IntVar(&cfg.BlockQueryConcurrency, prefix+"block-query-concurrency", 8, "Number of blocks processed concurrently on a single worker. Usually set to 2x number of CPU cores.")
f.IntVar(&cfg.MaxOutstandingPerTenant, prefix+"max-outstanding-per-tenant", 1024, "Maximum number of outstanding tasks per tenant.")
f.IntVar(&cfg.NumMultiplexItems, prefix+"num-multiplex-tasks", 512, "How many tasks are multiplexed at once.")
f.BoolVar(&cfg.FetchBlocksAsync, prefix+"fetch-blocks-async", true, "Whether blocks should be fetched asynchronously.")
// TODO(chaudum): Figure out what the better place is for registering flags
// -bloom-gateway.client.* or -bloom-gateway-client.*
cfg.Client.RegisterFlags(f)
Expand Down
8 changes: 5 additions & 3 deletions pkg/bloomgateway/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@ import (
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
)

func newProcessor(id string, concurrency int, store bloomshipper.Store, logger log.Logger, metrics *workerMetrics) *processor {
func newProcessor(id string, concurrency int, async bool, store bloomshipper.Store, logger log.Logger, metrics *workerMetrics) *processor {
return &processor{
id: id,
concurrency: concurrency,
async: async,
store: store,
logger: logger,
metrics: metrics,
Expand All @@ -28,7 +29,8 @@ func newProcessor(id string, concurrency int, store bloomshipper.Store, logger l

type processor struct {
id string
concurrency int // concurrency at which bloom blocks are processed
concurrency int // concurrency at which bloom blocks are processed
async bool // whether blocks should be downloaded asynchronously
store bloomshipper.Store
logger log.Logger
metrics *workerMetrics
Expand Down Expand Up @@ -71,7 +73,7 @@ func (p *processor) processTasksForDay(ctx context.Context, _ string, _ config.D
bqs, err := p.store.FetchBlocks(
ctx,
refs,
bloomshipper.WithFetchAsync(true),
bloomshipper.WithFetchAsync(p.async),
bloomshipper.WithIgnoreNotFound(true),
// NB(owen-d): we relinquish bloom pages to a pool
// after iteration for performance (alloc reduction).
Expand Down
6 changes: 3 additions & 3 deletions pkg/bloomgateway/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func TestProcessor(t *testing.T) {
refs, metas, queriers, data := createBlocks(t, tenant, 10, now.Add(-1*time.Hour), now, 0x0000, 0x0fff)

mockStore := newMockBloomStore(refs, queriers, metas)
p := newProcessor("worker", 1, mockStore, log.NewNopLogger(), metrics)
p := newProcessor("worker", 1, false, mockStore, log.NewNopLogger(), metrics)

chunkRefs := createQueryInputFromBlockData(t, tenant, data, 10)
swb := seriesWithInterval{
Expand Down Expand Up @@ -179,7 +179,7 @@ func TestProcessor(t *testing.T) {
}

mockStore := newMockBloomStore(refs, queriers, metas)
p := newProcessor("worker", 1, mockStore, log.NewNopLogger(), metrics)
p := newProcessor("worker", 1, false, mockStore, log.NewNopLogger(), metrics)

chunkRefs := createQueryInputFromBlockData(t, tenant, data, 10)
swb := seriesWithInterval{
Expand Down Expand Up @@ -226,7 +226,7 @@ func TestProcessor(t *testing.T) {
mockStore := newMockBloomStore(refs, queriers, metas)
mockStore.err = errors.New("store failed")

p := newProcessor("worker", 1, mockStore, log.NewNopLogger(), metrics)
p := newProcessor("worker", 1, false, mockStore, log.NewNopLogger(), metrics)

chunkRefs := createQueryInputFromBlockData(t, tenant, data, 10)
swb := seriesWithInterval{
Expand Down
3 changes: 2 additions & 1 deletion pkg/bloomgateway/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ const (
type workerConfig struct {
maxItems int
queryConcurrency int
async bool
}

// worker is a datastructure that consumes tasks from the request queue,
Expand Down Expand Up @@ -64,7 +65,7 @@ func (w *worker) starting(_ context.Context) error {
func (w *worker) running(_ context.Context) error {
idx := queue.StartIndexWithLocalQueue

p := newProcessor(w.id, w.cfg.queryConcurrency, w.store, w.logger, w.metrics)
p := newProcessor(w.id, w.cfg.queryConcurrency, w.cfg.async, w.store, w.logger, w.metrics)

for st := w.State(); st == services.Running || st == services.Stopping; {
taskCtx := context.Background()
Expand Down

0 comments on commit f3f0184

Please sign in to comment.