Skip to content

Move ring operations to packages where they are used. #3675

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jan 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (

var (
errInvalidBlockRanges = "compactor block range periods should be divisible by the previous one, but %s is not divisible by %s"
RingOp = ring.NewOp([]ring.IngesterState{ring.ACTIVE}, nil)
)

// Config holds the Compactor config.
Expand Down Expand Up @@ -328,7 +329,7 @@ func (c *Compactor) starting(ctx context.Context) error {
maxWaiting := c.compactorCfg.ShardingRing.WaitStabilityMaxDuration

level.Info(c.logger).Log("msg", "waiting until compactor ring topology is stable", "min_waiting", minWaiting.String(), "max_waiting", maxWaiting.String())
if err := ring.WaitRingStability(ctx, c.ring, ring.Compactor, minWaiting, maxWaiting); err != nil {
if err := ring.WaitRingStability(ctx, c.ring, RingOp, minWaiting, maxWaiting); err != nil {
level.Warn(c.logger).Log("msg", "compactor is ring topology is not stable after the max waiting time, proceeding anyway")
} else {
level.Info(c.logger).Log("msg", "compactor is ring topology is stable")
Expand Down Expand Up @@ -624,7 +625,7 @@ func (c *Compactor) ownUser(userID string) (bool, error) {
userHash := hasher.Sum32()

// Check whether this compactor instance owns the user.
rs, err := c.ring.Get(userHash, ring.Compactor, nil, nil, nil)
rs, err := c.ring.Get(userHash, RingOp, nil, nil, nil)
if err != nil {
return false, err
}
Expand Down
1 change: 1 addition & 0 deletions pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ func (t *Cortex) initOverrides() (serv services.Service, err error) {
func (t *Cortex) initDistributorService() (serv services.Service, err error) {
t.Cfg.Distributor.DistributorRing.ListenPort = t.Cfg.Server.GRPCListenPort
t.Cfg.Distributor.ShuffleShardingLookbackPeriod = t.Cfg.Querier.ShuffleShardingIngestersLookbackPeriod
t.Cfg.Distributor.ExtendWrites = t.Cfg.Ingester.LifecyclerConfig.RingConfig.ExtendWrites

// Check whether the distributor can join the distributors ring, which is
// whenever it's not running as an internal dependency (ie. querier or
Expand Down
10 changes: 9 additions & 1 deletion pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,9 @@ type Config struct {

// This config is dynamically injected because defined in the querier config.
ShuffleShardingLookbackPeriod time.Duration `yaml:"-"`

// Defined in ingester's lifecycler.
ExtendWrites bool `yaml:"-"`
}

// RegisterFlags adds the flags required to config this to the given FlagSet
Expand Down Expand Up @@ -548,7 +551,12 @@ func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*clie
keys := append(seriesKeys, metadataKeys...)
initialMetadataIndex := len(seriesKeys)

err = ring.DoBatch(ctx, subRing, keys, func(ingester ring.IngesterDesc, indexes []int) error {
op := ring.WriteNoExtend
if d.cfg.ExtendWrites {
op = ring.Write
}

err = ring.DoBatch(ctx, op, subRing, keys, func(ingester ring.IngesterDesc, indexes []int) error {
timeseries := make([]client.PreallocTimeseries, 0, len(indexes))
var metadata []*client.MetricMetadata

Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/blocks_store_replicated_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (s *blocksStoreReplicationSet) GetClientsFor(userID string, blockIDs []ulid
// returned replication set.
bufDescs, bufHosts, bufZones := ring.MakeBuffersForGet()

set, err := userRing.Get(cortex_tsdb.HashBlockID(blockID), ring.BlocksRead, bufDescs, bufHosts, bufZones)
set, err := userRing.Get(cortex_tsdb.HashBlockID(blockID), storegateway.BlocksRead, bufDescs, bufHosts, bufZones)
if err != nil {
return nil, errors.Wrapf(err, "failed to get store-gateway replication set owning the block %s", blockID.String())
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/ring/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type itemTracker struct {
// to send to that ingester.
//
// Not implemented as a method on Ring so we can test separately.
func DoBatch(ctx context.Context, r ReadRing, keys []uint32, callback func(IngesterDesc, []int) error, cleanup func()) error {
func DoBatch(ctx context.Context, op Operation, r ReadRing, keys []uint32, callback func(IngesterDesc, []int) error, cleanup func()) error {
if r.IngesterCount() <= 0 {
return fmt.Errorf("DoBatch: IngesterCount <= 0")
}
Expand All @@ -52,7 +52,7 @@ func DoBatch(ctx context.Context, r ReadRing, keys []uint32, callback func(Inges
bufZones [GetBufferSize]string
)
for i, key := range keys {
replicationSet, err := r.Get(key, Write, bufDescs[:0], bufHosts[:0], bufZones[:0])
replicationSet, err := r.Get(key, op, bufDescs[:0], bufHosts[:0], bufZones[:0])
if err != nil {
return err
}
Expand Down
26 changes: 1 addition & 25 deletions pkg/ring/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,32 +133,8 @@ func (i *IngesterDesc) GetRegisteredAt() time.Time {
return time.Unix(i.RegisteredTimestamp, 0)
}

// IsHealthy checks whether the ingester appears to be alive and heartbeating
func (i *IngesterDesc) IsHealthy(op Operation, heartbeatTimeout time.Duration, now time.Time) bool {
healthy := false

switch op {
case Write:
healthy = i.State == ACTIVE

case Read:
healthy = (i.State == ACTIVE) || (i.State == LEAVING) || (i.State == PENDING)

case Reporting:
healthy = true

case BlocksSync:
healthy = (i.State == JOINING) || (i.State == ACTIVE) || (i.State == LEAVING)

case BlocksRead:
healthy = i.State == ACTIVE

case Ruler:
healthy = i.State == ACTIVE

case Compactor:
healthy = i.State == ACTIVE
}
healthy := op.IsInstanceInStateHealthy(i.State)

return healthy && now.Unix()-i.Timestamp <= heartbeatTimeout.Milliseconds()/1000
}
Expand Down
54 changes: 0 additions & 54 deletions pkg/ring/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,60 +63,6 @@ func TestIngesterDesc_IsHealthy_ForIngesterOperations(t *testing.T) {
}
}

func TestIngesterDesc_IsHealthy_ForStoreGatewayOperations(t *testing.T) {
t.Parallel()

tests := map[string]struct {
instance *IngesterDesc
timeout time.Duration
syncExpected bool
queryExpected bool
}{
"ACTIVE instance with last keepalive newer than timeout": {
instance: &IngesterDesc{State: ACTIVE, Timestamp: time.Now().Add(-30 * time.Second).Unix()},
timeout: time.Minute,
syncExpected: true,
queryExpected: true,
},
"ACTIVE instance with last keepalive older than timeout": {
instance: &IngesterDesc{State: ACTIVE, Timestamp: time.Now().Add(-90 * time.Second).Unix()},
timeout: time.Minute,
syncExpected: false,
queryExpected: false,
},
"JOINING instance with last keepalive newer than timeout": {
instance: &IngesterDesc{State: JOINING, Timestamp: time.Now().Add(-30 * time.Second).Unix()},
timeout: time.Minute,
syncExpected: true,
queryExpected: false,
},
"LEAVING instance with last keepalive newer than timeout": {
instance: &IngesterDesc{State: LEAVING, Timestamp: time.Now().Add(-30 * time.Second).Unix()},
timeout: time.Minute,
syncExpected: true,
queryExpected: false,
},
"PENDING instance with last keepalive newer than timeout": {
instance: &IngesterDesc{State: PENDING, Timestamp: time.Now().Add(-30 * time.Second).Unix()},
timeout: time.Minute,
syncExpected: false,
queryExpected: false,
},
}

for testName, testData := range tests {
testData := testData

t.Run(testName, func(t *testing.T) {
actual := testData.instance.IsHealthy(BlocksSync, testData.timeout, time.Now())
assert.Equal(t, testData.syncExpected, actual)

actual = testData.instance.IsHealthy(BlocksRead, testData.timeout, time.Now())
assert.Equal(t, testData.queryExpected, actual)
})
}
}

func TestIngesterDesc_GetRegisteredAt(t *testing.T) {
tests := map[string]struct {
desc *IngesterDesc
Expand Down
35 changes: 3 additions & 32 deletions pkg/ring/replication_strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,12 @@ type ReplicationStrategy interface {
// for an operation to succeed. Returns an error if there are not enough
// instances.
Filter(instances []IngesterDesc, op Operation, replicationFactor int, heartbeatTimeout time.Duration, zoneAwarenessEnabled bool) (healthy []IngesterDesc, maxFailures int, err error)

// ShouldExtendReplicaSet returns true if given an instance that's going to be
// added to the replica set, the replica set size should be extended by 1
// more instance for the given operation.
ShouldExtendReplicaSet(instance IngesterDesc, op Operation) bool
}

type defaultReplicationStrategy struct {
ExtendWrites bool
}
type defaultReplicationStrategy struct{}

func NewDefaultReplicationStrategy(extendWrites bool) ReplicationStrategy {
return &defaultReplicationStrategy{
ExtendWrites: extendWrites,
}
func NewDefaultReplicationStrategy() ReplicationStrategy {
return &defaultReplicationStrategy{}
}

// Filter decides, given the set of ingesters eligible for a key,
Expand Down Expand Up @@ -72,26 +63,6 @@ func (s *defaultReplicationStrategy) Filter(ingesters []IngesterDesc, op Operati
return ingesters, len(ingesters) - minSuccess, nil
}

func (s *defaultReplicationStrategy) ShouldExtendReplicaSet(ingester IngesterDesc, op Operation) bool {
// We do not want to Write to Ingesters that are not ACTIVE, but we do want
// to write the extra replica somewhere. So we increase the size of the set
// of replicas for the key. This means we have to also increase the
// size of the replica set for read, but we can read from Leaving ingesters,
// so don't skip it in this case.
// NB dead ingester will be filtered later by defaultReplicationStrategy.Filter().
if op == Write {
if s.ExtendWrites {
return ingester.State != ACTIVE
}
return false
} else if op == Read && (ingester.State != ACTIVE && ingester.State != LEAVING) {
return true
}

return false
}

// IsHealthy checks whether an ingester appears to be alive and heartbeating
func (r *Ring) IsHealthy(ingester *IngesterDesc, op Operation, now time.Time) bool {
return ingester.IsHealthy(op, r.cfg.HeartbeatTimeout, now)
}
Expand Down
5 changes: 2 additions & 3 deletions pkg/ring/replication_strategy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
func TestRingReplicationStrategy(t *testing.T) {
for i, tc := range []struct {
RF, LiveIngesters, DeadIngesters int
op Operation // Will default to READ
ExpectedMaxFailure int
ExpectedError string
}{
Expand Down Expand Up @@ -90,8 +89,8 @@ func TestRingReplicationStrategy(t *testing.T) {
}

t.Run(fmt.Sprintf("[%d]", i), func(t *testing.T) {
strategy := NewDefaultReplicationStrategy(true)
liveIngesters, maxFailure, err := strategy.Filter(ingesters, tc.op, tc.RF, 100*time.Second, false)
strategy := NewDefaultReplicationStrategy()
liveIngesters, maxFailure, err := strategy.Filter(ingesters, Read, tc.RF, 100*time.Second, false)
if tc.ExpectedError == "" {
assert.NoError(t, err)
assert.Equal(t, tc.LiveIngesters, len(liveIngesters))
Expand Down
78 changes: 59 additions & 19 deletions pkg/ring/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,26 +75,27 @@ type ReadRing interface {
HasInstance(instanceID string) bool
}

// Operation can be Read or Write
type Operation int

// Values for Operation
const (
Read Operation = iota
Write
Reporting // Special value for inquiring about health

// BlocksSync is the operation run by the store-gateway to sync blocks.
BlocksSync
var (
// Write operation that also extends replica set, if ingester state is not ACTIVE.
Write = NewOp([]IngesterState{ACTIVE}, func(s IngesterState) bool {
// We do not want to Write to Ingesters that are not ACTIVE, but we do want
// to write the extra replica somewhere. So we increase the size of the set
// of replicas for the key.
// NB dead ingester will be filtered later by defaultReplicationStrategy.Filter().
return s != ACTIVE
})

// BlocksRead is the operation run by the querier to query blocks via the store-gateway.
BlocksRead
// WriteNoExtend is like Write, but with no replicaset extension.
WriteNoExtend = NewOp([]IngesterState{ACTIVE}, nil)

// Ruler is the operation used for distributing rule groups between rulers.
Ruler
Read = NewOp([]IngesterState{ACTIVE, PENDING, LEAVING}, func(s IngesterState) bool {
// To match Write with extended replica set we have to also increase the
// size of the replica set for Read, but we can read from LEAVING ingesters.
return s != ACTIVE && s != LEAVING
})

// Compactor is the operation used for distributing tenants/blocks across compactors.
Compactor
// Reporting is a special value for inquiring about health.
Reporting = allStatesRingOperation
)

var (
Expand Down Expand Up @@ -202,7 +203,7 @@ func New(cfg Config, name, key string, reg prometheus.Registerer) (*Ring, error)
return nil, err
}

return NewWithStoreClientAndStrategy(cfg, name, key, store, NewDefaultReplicationStrategy(cfg.ExtendWrites))
return NewWithStoreClientAndStrategy(cfg, name, key, store, NewDefaultReplicationStrategy())
}

func NewWithStoreClientAndStrategy(cfg Config, name, key string, store kv.Client, strategy ReplicationStrategy) (*Ring, error) {
Expand Down Expand Up @@ -349,7 +350,7 @@ func (r *Ring) Get(key uint32, op Operation, bufDescs []IngesterDesc, bufHosts,

// Check whether the replica set should be extended given we're including
// this instance.
if r.strategy.ShouldExtendReplicaSet(ingester, op) {
if op.ShouldExtendReplicaSetOnState(ingester.State) {
n++
}

Expand Down Expand Up @@ -803,3 +804,42 @@ func (r *Ring) setCachedShuffledSubring(identifier string, size int, subring *Ri
r.shuffledSubringCache[subringCacheKey{identifier: identifier, shardSize: size}] = subring
}
}

// Operation describes which instances can be included in the replica set, based on their state.
//
// Implemented as bitmap, with upper 16-bits used for encoding extendReplicaSet, and lower 16-bits used for encoding healthy states.
type Operation uint32

// NewOp constructs new Operation with given "healthy" states for operation, and optional function to extend replica set.
// Result of calling shouldExtendReplicaSet is cached.
func NewOp(healthyStates []IngesterState, shouldExtendReplicaSet func(s IngesterState) bool) Operation {
op := Operation(0)
for _, s := range healthyStates {
op |= (1 << s)
}

if shouldExtendReplicaSet != nil {
for _, s := range []IngesterState{ACTIVE, LEAVING, PENDING, JOINING, LEAVING, LEFT} {
if shouldExtendReplicaSet(s) {
op |= (0x10000 << s)
}
}
}

return op
}

// IsInstanceInStateHealthy is used during "filtering" phase to remove undesired instances based on their state.
func (op Operation) IsInstanceInStateHealthy(s IngesterState) bool {
return op&(1<<s) > 0
}

// ShouldExtendReplicaSetOnState returns true if given a state of instance that's going to be
// added to the replica set, the replica set size should be extended by 1
// more instance for the given operation.
func (op Operation) ShouldExtendReplicaSetOnState(s IngesterState) bool {
return op&(0x10000<<s) > 0
}

// All states are healthy, no states extend replica set.
var allStatesRingOperation = Operation(0x0000ffff)
Loading