Skip to content

Commit 41bd6be

Browse files
committed
ring/util: allow waiting for ring stability with good state transitions
In some use cases, such as the store gateway, it is enough for the user to know that the token distribution didn't change and it is not important to know if some member changed between allowed states. This patch implements the new WaitRingTokensStability function to be able to wait for tokens stability and ignore allowed state changes. The original motivation was that store gateway tests took a minute longer then strictly necessary. Signed-off-by: György Krajcsovits <gyorgy.krajcsovits@grafana.com>
1 parent 2cdfa2f commit 41bd6be

File tree

4 files changed

+86
-66
lines changed

4 files changed

+86
-66
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,3 +24,4 @@
2424
* [ENHANCEMENT] flagext: for cases such as `DeprecatedFlag()` that need a logger, add RegisterFlagsWithLogger. #80
2525
* [BUGFIX] spanlogger: Support multiple tenant IDs. #59
2626
* [BUGFIX] Memberlist: fixed corrupted packets when sending compound messages with more than 255 messages or messages bigger than 64KB. #85
27+
* [ENHANCEMENT] Add WaitRingTokensStability function to ring, to be able to wait on ring stability excluding allowed state transitions. #95

ring/replication_set.go

Lines changed: 17 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -130,35 +130,24 @@ func (r ReplicationSet) GetAddressesWithout(exclude string) []string {
130130
// HasReplicationSetChanged returns true if two replications sets are the same (with possibly different timestamps),
131131
// false if they differ in any way (number of instances, instance states, tokens, zones, ...).
132132
func HasReplicationSetChanged(before, after ReplicationSet) bool {
133-
beforeInstances := before.Instances
134-
afterInstances := after.Instances
135-
136-
if len(beforeInstances) != len(afterInstances) {
137-
return true
138-
}
139-
140-
sort.Sort(ByAddr(beforeInstances))
141-
sort.Sort(ByAddr(afterInstances))
142-
143-
for i := 0; i < len(beforeInstances); i++ {
144-
b := beforeInstances[i]
145-
a := afterInstances[i]
146-
147-
// Exclude the heartbeat timestamp from the comparison.
148-
b.Timestamp = 0
149-
a.Timestamp = 0
150-
151-
if !b.Equal(a) {
152-
return true
153-
}
154-
}
155-
156-
return false
133+
return hasReplicationSetChangedExcluding(before, after, func(i *InstanceDesc) {
134+
i.Timestamp = 0
135+
})
157136
}
158137

159-
// HasReplicationSetChanged returns true if two replications sets are the same (with possibly different timestamps),
160-
// false if they differ in any way (number of instances, instance states, tokens, zones, ...).
138+
// HasReplicationSetChangedWithoutState returns true if two replications sets
139+
// are the same (with possibly different timestamps and instance states),
140+
// false if they differ in any other way (number of instances, tokens, zones, ...).
161141
func HasReplicationSetChangedWithoutState(before, after ReplicationSet) bool {
142+
return hasReplicationSetChangedExcluding(before, after, func(i *InstanceDesc) {
143+
i.Timestamp = 0
144+
i.State = PENDING
145+
})
146+
}
147+
148+
// Do comparision of replicasets, but apply a function first
149+
// to be able to exclude (reset) some values
150+
func hasReplicationSetChangedExcluding(before, after ReplicationSet, exclude func(*InstanceDesc)) bool {
162151
beforeInstances := before.Instances
163152
afterInstances := after.Instances
164153

@@ -173,13 +162,8 @@ func HasReplicationSetChangedWithoutState(before, after ReplicationSet) bool {
173162
b := beforeInstances[i]
174163
a := afterInstances[i]
175164

176-
// Exclude the heartbeat timestamp from the comparison.
177-
b.Timestamp = 0
178-
a.Timestamp = 0
179-
180-
// Exclude the state
181-
b.State = PENDING
182-
a.State = PENDING
165+
exclude(&a)
166+
exclude(&b)
183167

184168
if !b.Equal(a) {
185169
return true

ring/replication_set_test.go

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,3 +241,60 @@ func TestReplicationSet_Do(t *testing.T) {
241241
})
242242
}
243243
}
244+
245+
func TestHasReplicationSetChangedWithoutStateIgnoresTimeStampAndState(t *testing.T) {
246+
// Only testing difference to underlying Equal function
247+
rs1 := ReplicationSet{
248+
Instances: []InstanceDesc{
249+
{Addr: "127.0.0.1"},
250+
{Addr: "127.0.0.2"},
251+
{Addr: "127.0.0.3"},
252+
},
253+
}
254+
tests := map[string]struct {
255+
rs ReplicationSet
256+
expectHasReplicationSetChanged bool
257+
expectHasReplicationSetChangedWithoutState bool
258+
}{
259+
"timestamp changed": {
260+
ReplicationSet{
261+
Instances: []InstanceDesc{
262+
{Addr: "127.0.0.1", Timestamp: time.Hour.Microseconds()},
263+
{Addr: "127.0.0.2"},
264+
{Addr: "127.0.0.3"},
265+
},
266+
},
267+
false,
268+
false,
269+
},
270+
"state changed": {
271+
ReplicationSet{
272+
Instances: []InstanceDesc{
273+
{Addr: "127.0.0.1", State: PENDING},
274+
{Addr: "127.0.0.2"},
275+
{Addr: "127.0.0.3"},
276+
},
277+
},
278+
true,
279+
false,
280+
},
281+
"more instances": {
282+
ReplicationSet{
283+
Instances: []InstanceDesc{
284+
{Addr: "127.0.0.1"},
285+
{Addr: "127.0.0.2"},
286+
{Addr: "127.0.0.3"},
287+
{Addr: "127.0.0.4"},
288+
},
289+
},
290+
true,
291+
true,
292+
},
293+
}
294+
for testName, testData := range tests {
295+
t.Run(testName, func(t *testing.T) {
296+
assert.Equal(t, testData.expectHasReplicationSetChanged, HasReplicationSetChanged(rs1, testData.rs), "HasReplicationSetChanged wrong result")
297+
assert.Equal(t, testData.expectHasReplicationSetChangedWithoutState, HasReplicationSetChangedWithoutState(rs1, testData.rs), "HasReplicationSetChangedWithoutState wrong result")
298+
})
299+
}
300+
}

ring/util.go

Lines changed: 11 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -96,41 +96,19 @@ func WaitInstanceState(ctx context.Context, r ReadRing, instanceID string, state
9696
// WaitRingStability monitors the ring topology for the provided operation and waits until it
9797
// keeps stable for at least minStability.
9898
func WaitRingStability(ctx context.Context, r *Ring, op Operation, minStability, maxWaiting time.Duration) error {
99-
// Configure the max waiting time as a context deadline.
100-
ctx, cancel := context.WithTimeout(ctx, maxWaiting)
101-
defer cancel()
102-
103-
// Get the initial ring state.
104-
ringLastState, _ := r.GetAllHealthy(op) // nolint:errcheck
105-
ringLastStateTs := time.Now()
106-
107-
const pollingFrequency = time.Second
108-
pollingTicker := time.NewTicker(pollingFrequency)
109-
defer pollingTicker.Stop()
110-
111-
for {
112-
select {
113-
case <-ctx.Done():
114-
return ctx.Err()
115-
case <-pollingTicker.C:
116-
// We ignore the error because in case of error it will return an empty
117-
// replication set which we use to compare with the previous state.
118-
currRingState, _ := r.GetAllHealthy(op) // nolint:errcheck
119-
120-
if HasReplicationSetChanged(ringLastState, currRingState) {
121-
ringLastState = currRingState
122-
ringLastStateTs = time.Now()
123-
} else if time.Since(ringLastStateTs) >= minStability {
124-
return nil
125-
}
126-
}
127-
}
99+
return waitStability(HasReplicationSetChanged, ctx, r, op, minStability, maxWaiting)
128100
}
129101

130-
// WaitRingTokensStability waits for the Tokens in the Ring to be unchanged at
131-
// least for minStability time period. This can be used to avoid wasting
132-
// resources on moving data around due to multiple changes in the Ring.
102+
// WaitRingTokensStability waits for the Ring to be unchanged at
103+
// least for minStability time period, excluding transitioninig between
104+
// allowed states (e.g. JOINING->ACTIVE if allowed by op).
105+
// This can be used to avoid wasting resources on moving data around
106+
// due to multiple changes in the Ring.
133107
func WaitRingTokensStability(ctx context.Context, r *Ring, op Operation, minStability, maxWaiting time.Duration) error {
108+
return waitStability(HasReplicationSetChangedWithoutState, ctx, r, op, minStability, maxWaiting)
109+
}
110+
111+
func waitStability(isChanged func(ReplicationSet, ReplicationSet) bool, ctx context.Context, r *Ring, op Operation, minStability, maxWaiting time.Duration) error {
134112
// Configure the max waiting time as a context deadline.
135113
ctx, cancel := context.WithTimeout(ctx, maxWaiting)
136114
defer cancel()
@@ -152,7 +130,7 @@ func WaitRingTokensStability(ctx context.Context, r *Ring, op Operation, minStab
152130
// replication set which we use to compare with the previous state.
153131
currRingState, _ := r.GetAllHealthy(op) // nolint:errcheck
154132

155-
if HasReplicationSetChangedWithoutState(ringLastState, currRingState) {
133+
if isChanged(ringLastState, currRingState) {
156134
ringLastState = currRingState
157135
ringLastStateTs = time.Now()
158136
} else if time.Since(ringLastStateTs) >= minStability {

0 commit comments

Comments
 (0)