Skip to content

Commit

Permalink
do not initialize tsdb index store when using index gateway client fo…
Browse files Browse the repository at this point in the history
…r queries (#6305)

* do not initialize tsdb index store when using index gateway client for queries

* fix test

* lint
  • Loading branch information
sandeepsukhani authored Jun 3, 2022
1 parent 7158785 commit 1ad75d0
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 24 deletions.
2 changes: 1 addition & 1 deletion pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -485,8 +485,8 @@ func (t *Loki) initStore() (_ services.Service, err error) {
asyncStore = true
case t.Cfg.isModuleEnabled(IndexGateway):
// we want to use the actual storage when running the index-gateway, so we remove the Addr from the config
// TODO(owen-d): Do the same for TSDB when we add IndexGatewayClientConfig
t.Cfg.StorageConfig.BoltDBShipperConfig.IndexGatewayClientConfig.Disabled = true
t.Cfg.StorageConfig.TSDBShipperConfig.IndexGatewayClientConfig.Disabled = true
case t.Cfg.isModuleEnabled(All):
// We want ingester to also query the store when using boltdb-shipper but only when running with target All.
// We do not want to use AsyncStore otherwise it would start spiraling around doing queries over and over again to the ingesters and store.
Expand Down
36 changes: 26 additions & 10 deletions pkg/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ var (
indexTypeStats = usagestats.NewString("store_index_type")
objectTypeStats = usagestats.NewString("store_object_type")
schemaStats = usagestats.NewString("store_schema")

errWritingChunkUnsupported = errors.New("writing chunks is not supported while running store in read-only mode")
)

// Store is the Loki chunk store to retrieve and save chunks.
Expand Down Expand Up @@ -204,26 +206,30 @@ func (s *store) storeForPeriod(p config.PeriodConfig, chunkClient client.Client,
prometheus.Labels{"component": "index-store-" + p.From.String()}, s.registerer)

if p.IndexType == config.TSDBType {
if shouldUseIndexGatewayClient(s.cfg.TSDBShipperConfig) {
// inject the index-gateway client into the index store
gw, err := gatewayclient.NewGatewayClient(s.cfg.TSDBShipperConfig.IndexGatewayClientConfig, indexClientReg, s.logger)
if err != nil {
return nil, nil, nil, err
}
idx := series.NewIndexGatewayClientStore(gw, nil)

return failingChunkWriter{}, idx, func() {
f.Stop()
gw.Stop()
}, nil
}

objectClient, err := NewObjectClient(s.cfg.TSDBShipperConfig.SharedStoreType, s.cfg, s.clientMetrics)
if err != nil {
return nil, nil, nil, err
}

// ToDo(Sandeep): Avoid initializing writer when in read only mode
writer, idx, err := tsdb.NewStore(s.cfg.TSDBShipperConfig, p, f, objectClient, s.limits, indexClientReg)
if err != nil {
return nil, nil, nil, err
}

if shouldUseIndexGatewayClient(s.cfg.TSDBShipperConfig) {
// inject the index-gateway client into the index store
gw, err := gatewayclient.NewGatewayClient(s.cfg.TSDBShipperConfig.IndexGatewayClientConfig, indexClientReg, s.logger)
if err != nil {
return nil, nil, nil, err
}
idx = series.NewIndexGatewayClientStore(gw, idx)
}

return writer, idx,
func() {
f.Stop()
Expand Down Expand Up @@ -492,3 +498,13 @@ func filterChunksByTime(from, through model.Time, chunks []chunk.Chunk) []chunk.
}
return filtered
}

type failingChunkWriter struct{}

func (f failingChunkWriter) Put(_ context.Context, _ []chunk.Chunk) error {
return errWritingChunkUnsupported
}

func (f failingChunkWriter) PutOne(_ context.Context, _, _ model.Time, _ chunk.Chunk) error {
return errWritingChunkUnsupported
}
36 changes: 24 additions & 12 deletions pkg/storage/stores/series/series_index_gateway_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,17 @@ import (

"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/syntax"
"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/storage/stores/shipper/indexgateway/indexgatewaypb"
)

type IndexGatewayClientStore struct {
client IndexGatewayClient
IndexStore
// fallbackStore is used only to keep index gateways backwards compatible.
// Previously index gateways would only serve index rows from boltdb-shipper files.
// tsdb also supports configuring index gateways but there is no concept of serving index rows so
// the fallbackStore could be nil and should be checked before use
fallbackStore IndexStore
}

type IndexGatewayClient interface {
Expand All @@ -26,10 +31,10 @@ type IndexGatewayClient interface {
LabelValuesForMetricName(ctx context.Context, in *indexgatewaypb.LabelValuesForMetricNameRequest, opts ...grpc.CallOption) (*indexgatewaypb.LabelResponse, error)
}

func NewIndexGatewayClientStore(client IndexGatewayClient, index IndexStore) IndexStore {
func NewIndexGatewayClientStore(client IndexGatewayClient, fallbackStore IndexStore) IndexStore {
return &IndexGatewayClientStore{
client: client,
IndexStore: index,
client: client,
fallbackStore: fallbackStore,
}
}

Expand All @@ -40,9 +45,9 @@ func (c *IndexGatewayClientStore) GetChunkRefs(ctx context.Context, userID strin
Matchers: (&syntax.MatchersExpr{Mts: allMatchers}).String(),
})
if err != nil {
if isUnimplementedCallError(err) {
if isUnimplementedCallError(err) && c.fallbackStore != nil {
// Handle communication with older index gateways gracefully, by falling back to the index store calls.
return c.IndexStore.GetChunkRefs(ctx, userID, from, through, allMatchers...)
return c.fallbackStore.GetChunkRefs(ctx, userID, from, through, allMatchers...)
}
return nil, err
}
Expand All @@ -61,9 +66,9 @@ func (c *IndexGatewayClientStore) GetSeries(ctx context.Context, userID string,
Matchers: (&syntax.MatchersExpr{Mts: matchers}).String(),
})
if err != nil {
if isUnimplementedCallError(err) {
if isUnimplementedCallError(err) && c.fallbackStore != nil {
// Handle communication with older index gateways gracefully, by falling back to the index store calls.
return c.IndexStore.GetSeries(ctx, userID, from, through, matchers...)
return c.fallbackStore.GetSeries(ctx, userID, from, through, matchers...)
}
return nil, err
}
Expand All @@ -83,9 +88,9 @@ func (c *IndexGatewayClientStore) LabelNamesForMetricName(ctx context.Context, u
From: from,
Through: through,
})
if isUnimplementedCallError(err) {
if isUnimplementedCallError(err) && c.fallbackStore != nil {
// Handle communication with older index gateways gracefully, by falling back to the index store calls.
return c.IndexStore.LabelNamesForMetricName(ctx, userID, from, through, metricName)
return c.fallbackStore.LabelNamesForMetricName(ctx, userID, from, through, metricName)
}
if err != nil {
return nil, err
Expand All @@ -101,16 +106,23 @@ func (c *IndexGatewayClientStore) LabelValuesForMetricName(ctx context.Context,
Through: through,
Matchers: (&syntax.MatchersExpr{Mts: matchers}).String(),
})
if isUnimplementedCallError(err) {
if isUnimplementedCallError(err) && c.fallbackStore != nil {
// Handle communication with older index gateways gracefully, by falling back to the index store calls.
return c.IndexStore.LabelValuesForMetricName(ctx, userID, from, through, metricName, labelName, matchers...)
return c.fallbackStore.LabelValuesForMetricName(ctx, userID, from, through, metricName, labelName, matchers...)
}
if err != nil {
return nil, err
}
return resp.Values, nil
}

func (c *IndexGatewayClientStore) SetChunkFilterer(chunkFilter chunk.RequestChunkFilterer) {
// if there is no fallback store, we can't set the chunk filterer and index gateway would take care of filtering out data
if c.fallbackStore != nil {
c.fallbackStore.SetChunkFilterer(chunkFilter)
}
}

// isUnimplementedCallError tells if the GRPC error is a gRPC error with code Unimplemented.
func isUnimplementedCallError(err error) bool {
if err == nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func (fakeClient) GetSeries(ctx context.Context, in *indexgatewaypb.GetSeriesReq
func Test_IndexGatewayClient(t *testing.T) {
idx := IndexGatewayClientStore{
client: fakeClient{},
IndexStore: &indexStore{
fallbackStore: &indexStore{
chunkBatchSize: 1,
},
}
Expand Down

0 comments on commit 1ad75d0

Please sign in to comment.