Skip to content

Randomly distribute queries across store-gateways #3824

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
* `cortex_ingester_tsdb_symbol_table_size_bytes`
* `cortex_ingester_tsdb_storage_blocks_bytes`
* `cortex_ingester_tsdb_time_retentions_total`
* [ENHANCEMENT] Querier: distribute workload across `-store-gateway.sharding-ring.replication-factor` store-gateway replicas when querying blocks and `-store-gateway.sharding-enabled=true`. #3824
* [ENHANCEMENT] Distributor / HA Tracker: added cleanup of unused elected HA replicas from KV store. Added following metrics to monitor this process: #3809
* `cortex_ha_tracker_replicas_cleanup_started_total`
* `cortex_ha_tracker_replicas_cleanup_marked_for_deletion_total`
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/blocks_store_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ func NewBlocksStoreQueryableFromConfig(querierCfg Config, gatewayCfg storegatewa
reg.MustRegister(storesRing)
}

stores, err = newBlocksStoreReplicationSet(storesRing, gatewayCfg.ShardingStrategy, limits, querierCfg.StoreGatewayClient, logger, reg)
stores, err = newBlocksStoreReplicationSet(storesRing, gatewayCfg.ShardingStrategy, randomLoadBalancing, limits, querierCfg.StoreGatewayClient, logger, reg)
if err != nil {
return nil, errors.Wrap(err, "failed to create store set")
}
Expand Down
40 changes: 29 additions & 11 deletions pkg/querier/blocks_store_replicated_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package querier
import (
"context"
"fmt"
"math/rand"

"github.com/go-kit/kit/log"
"github.com/oklog/ulid"
Expand All @@ -17,15 +18,23 @@ import (
"github.com/cortexproject/cortex/pkg/util/services"
)

type loadBalancingStrategy int

const (
noLoadBalancing = loadBalancingStrategy(iota)
randomLoadBalancing
)

// BlocksStoreSet implementation used when the blocks are sharded and replicated across
// a set of store-gateway instances.
type blocksStoreReplicationSet struct {
services.Service

storesRing *ring.Ring
clientsPool *client.Pool
shardingStrategy string
limits BlocksStoreLimits
storesRing *ring.Ring
clientsPool *client.Pool
shardingStrategy string
balancingStrategy loadBalancingStrategy
limits BlocksStoreLimits

// Subservices manager.
subservices *services.Manager
Expand All @@ -35,16 +44,18 @@ type blocksStoreReplicationSet struct {
func newBlocksStoreReplicationSet(
storesRing *ring.Ring,
shardingStrategy string,
balancingStrategy loadBalancingStrategy,
limits BlocksStoreLimits,
clientConfig ClientConfig,
logger log.Logger,
reg prometheus.Registerer,
) (*blocksStoreReplicationSet, error) {
s := &blocksStoreReplicationSet{
storesRing: storesRing,
clientsPool: newStoreGatewayClientPool(client.NewRingServiceDiscovery(storesRing), clientConfig, logger, reg),
shardingStrategy: shardingStrategy,
limits: limits,
storesRing: storesRing,
clientsPool: newStoreGatewayClientPool(client.NewRingServiceDiscovery(storesRing), clientConfig, logger, reg),
shardingStrategy: shardingStrategy,
balancingStrategy: balancingStrategy,
limits: limits,
}

var err error
Expand Down Expand Up @@ -106,8 +117,8 @@ func (s *blocksStoreReplicationSet) GetClientsFor(userID string, blockIDs []ulid
return nil, errors.Wrapf(err, "failed to get store-gateway replication set owning the block %s", blockID.String())
}

// Pick the first non excluded store-gateway instance.
addr := getFirstNonExcludedInstanceAddr(set, exclude[blockID])
// Pick a non excluded store-gateway instance.
addr := getNonExcludedInstanceAddr(set, exclude[blockID], s.balancingStrategy)
if addr == "" {
return nil, fmt.Errorf("no store-gateway instance left after checking exclude for block %s", blockID.String())
}
Expand All @@ -130,7 +141,14 @@ func (s *blocksStoreReplicationSet) GetClientsFor(userID string, blockIDs []ulid
return clients, nil
}

func getFirstNonExcludedInstanceAddr(set ring.ReplicationSet, exclude []string) string {
func getNonExcludedInstanceAddr(set ring.ReplicationSet, exclude []string, balancingStrategy loadBalancingStrategy) string {
if balancingStrategy == randomLoadBalancing {
// Randomize the list of instances to not always query the same one.
rand.Shuffle(len(set.Ingesters), func(i, j int) {
set.Ingesters[i], set.Ingesters[j] = set.Ingesters[j], set.Ingesters[i]
})
}

for _, instance := range set.Ingesters {
if !util.StringsContain(exclude, instance.Addr) {
return instance.Addr
Expand Down
66 changes: 65 additions & 1 deletion pkg/querier/blocks_store_replicated_set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ func TestBlocksStoreReplicationSet_GetClientsFor(t *testing.T) {
}

reg := prometheus.NewPedanticRegistry()
s, err := newBlocksStoreReplicationSet(r, testData.shardingStrategy, limits, ClientConfig{}, log.NewNopLogger(), reg)
s, err := newBlocksStoreReplicationSet(r, testData.shardingStrategy, noLoadBalancing, limits, ClientConfig{}, log.NewNopLogger(), reg)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(ctx, s))
defer services.StopAndAwaitTerminated(ctx, s) //nolint:errcheck
Expand All @@ -360,6 +360,70 @@ func TestBlocksStoreReplicationSet_GetClientsFor(t *testing.T) {
}
}

func TestBlocksStoreReplicationSet_GetClientsFor_ShouldSupportRandomLoadBalancingStrategy(t *testing.T) {
const (
numRuns = 1000
numInstances = 3
)

ctx := context.Background()
userID := "user-A"
registeredAt := time.Now()
block1 := ulid.MustNew(1, nil)

// Create a ring.
ringStore := consul.NewInMemoryClient(ring.GetCodec())
require.NoError(t, ringStore.CAS(ctx, "test", func(in interface{}) (interface{}, bool, error) {
d := ring.NewDesc()
for n := 1; n <= numInstances; n++ {
d.AddIngester(fmt.Sprintf("instance-%d", n), fmt.Sprintf("127.0.0.%d", n), "", []uint32{uint32(n)}, ring.ACTIVE, registeredAt)
}
return d, true, nil
}))

// Configure a replication factor equal to the number of instances, so that every store-gateway gets all blocks.
ringCfg := ring.Config{}
flagext.DefaultValues(&ringCfg)
ringCfg.ReplicationFactor = numInstances

r, err := ring.NewWithStoreClientAndStrategy(ringCfg, "test", "test", ringStore, ring.NewIgnoreUnhealthyInstancesReplicationStrategy())
require.NoError(t, err)

limits := &blocksStoreLimitsMock{}
reg := prometheus.NewPedanticRegistry()
s, err := newBlocksStoreReplicationSet(r, util.ShardingStrategyDefault, randomLoadBalancing, limits, ClientConfig{}, log.NewNopLogger(), reg)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(ctx, s))
defer services.StopAndAwaitTerminated(ctx, s) //nolint:errcheck

// Wait until the ring client has initialised the state.
test.Poll(t, time.Second, true, func() interface{} {
all, err := r.GetAllHealthy(ring.Read)
return err == nil && len(all.Ingesters) > 0
})

// Request the same block multiple times and ensure the distribution of
// requests across store-gateways is balanced.
distribution := map[string]int{}

for n := 0; n < numRuns; n++ {
clients, err := s.GetClientsFor(userID, []ulid.ULID{block1}, nil)
require.NoError(t, err)
require.Len(t, clients, 1)

for addr := range getStoreGatewayClientAddrs(clients) {
distribution[addr]++
}
}

assert.Len(t, distribution, numInstances)
for addr, count := range distribution {
// Ensure that the number of times each client is returned is above
// the 80% of the perfect even distribution.
assert.Greaterf(t, float64(count), (float64(numRuns)/float64(numInstances))*0.8, "store-gateway address: %s", addr)
}
}

func getStoreGatewayClientAddrs(clients map[BlocksStoreClient][]ulid.ULID) map[string][]ulid.ULID {
addrs := map[string][]ulid.ULID{}
for c, blockIDs := range clients {
Expand Down