Skip to content

ringhash: Remove TODO comment #8096

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 19, 2025

Conversation

arjan-bal
Copy link
Contributor

Fixes: #8085

A61 was updated in grpc/proposal#475. Go already follows the latest proposal, so no changes are required. Tests for the two scenarios mentioned in the PR are present.

// Tests that ringhash is able to recover automatically in situations when a
// READY endpoint enters IDLE making the aggregated state TRANSIENT_FAILURE. The
// test creates 4 endpoints in the following connectivity states: [TF, TF,
// READY, IDLE]. The test fails the READY backend and verifies that the last
// IDLE endopint is dialed and the channel enters READY.
func (s) TestRingHash_RecoverWhenEndpointEntersIdle(t *testing.T) {
const backendsCount = 4
backends := startTestServiceBackends(t, backendsCount)
backendAddrs := backendAddrs(backends)
const clusterName = "cluster"
endpoints := endpointResource(t, clusterName, backendAddrs)
cluster := e2e.ClusterResourceWithOptions(e2e.ClusterOptions{
ClusterName: clusterName,
ServiceName: clusterName,
Policy: e2e.LoadBalancingPolicyRingHash,
})
route := headerHashRoute("new_route", virtualHostName, clusterName, "address_hash")
listener := e2e.DefaultClientListener(virtualHostName, route.Name)
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
xdsServer, nodeID, xdsResolver := setupManagementServerAndResolver(t)
if err := xdsServer.Update(ctx, xdsUpdateOpts(nodeID, endpoints, cluster, route, listener)); err != nil {
t.Fatalf("Failed to update xDS resources: %v", err)
}
dialer := testutils.NewBlockingDialer()
dialOpts := []grpc.DialOption{
grpc.WithResolvers(xdsResolver),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithContextDialer(dialer.DialContext),
grpc.WithConnectParams(fastConnectParams),
}
conn, err := grpc.NewClient("xds:///test.server", dialOpts...)
if err != nil {
t.Fatalf("Failed to create client: %s", err)
}
defer conn.Close()
// Create holds for each backend address to delay a successful connection
// until the end of the test.
holds := make([]*testutils.Hold, backendsCount)
for i := 0; i < len(backendAddrs); i++ {
holds[i] = dialer.Hold(backendAddrs[i])
}
client := testgrpc.NewTestServiceClient(conn)
rpcCtx, rpcCancel := context.WithCancel(ctx)
errCh := make(chan error, 1)
go func() {
rpcCtx = metadata.NewOutgoingContext(rpcCtx, metadata.Pairs("address_hash", backendAddrs[0]+"_0"))
_, err := client.EmptyCall(rpcCtx, &testpb.Empty{})
if status.Code(err) == codes.Canceled {
errCh <- nil
return
}
errCh <- err
}()
// Wait for the RPC to trigger a connection attempt to the first address,
// then cancel the RPC. No other connection attempts should be started yet.
if !holds[0].Wait(ctx) {
t.Fatalf("Timeout waiting for connection attempt to backend 0")
}
rpcCancel()
if err := <-errCh; err != nil {
t.Fatalf("Expected RPC to fail be canceled, got %v", err)
}
// The number of dialed backends increases by 1 in every iteration of the
// loop as ringhash tries to exit TRANSIENT_FAILURE. Run the loop twice to
// get two endpoints in TRANSIENT_FAILURE.
activeAddrs := map[string]bool{}
for wantFailingBackendCount := 1; wantFailingBackendCount <= 2; wantFailingBackendCount++ {
newAddrIdx := -1
for ; ctx.Err() == nil && len(activeAddrs) < wantFailingBackendCount; <-time.After(time.Millisecond) {
for i, hold := range holds {
if !hold.IsStarted() {
continue
}
if _, ok := activeAddrs[backendAddrs[i]]; ok {
continue
}
activeAddrs[backendAddrs[i]] = true
newAddrIdx = i
}
}
if ctx.Err() != nil {
t.Fatal("Context timed out waiting for new backneds to be dialed.")
}
if len(activeAddrs) > wantFailingBackendCount {
t.Fatalf("More backends dialed than expected: got %d, want %d", len(activeAddrs), wantFailingBackendCount)
}
// Create a new hold for the address dialed in this iteration and fail
// the existing hold.
hold := holds[newAddrIdx]
holds[newAddrIdx] = dialer.Hold(backendAddrs[newAddrIdx])
hold.Fail(errors.New("Test error"))
}
// Current state of endpoints: [TF, TF, READY, IDLE].
// Two endpoints failing should cause the channel to enter
// TRANSIENT_FAILURE.
testutils.AwaitState(ctx, t, conn, connectivity.TransientFailure)
// Allow the request to the backend dialed next to succeed.
readyBackendIdx := -1
for ; ctx.Err() == nil && readyBackendIdx == -1; <-time.After(time.Millisecond) {
for i, addr := range backendAddrs {
if _, ok := activeAddrs[addr]; ok || !holds[i].IsStarted() {
continue
}
readyBackendIdx = i
activeAddrs[addr] = true
holds[i].Resume()
break
}
}
if ctx.Err() != nil {
t.Fatal("Context timed out waiting for the next backend to be contacted.")
}
// Wait for channel to become READY without any pending RPC.
testutils.AwaitState(ctx, t, conn, connectivity.Ready)
// Current state of endpoints: [TF, TF, READY, IDLE].
// Stopping the READY backend should cause the channel to re-enter
// TRANSIENT_FAILURE.
backends[readyBackendIdx].Stop()
testutils.AwaitState(ctx, t, conn, connectivity.TransientFailure)
// To recover from TRANSIENT_FAILURE, ringhash should automatically try to
// connect to the final endpoint.
readyBackendIdx = -1
for ; ctx.Err() == nil && readyBackendIdx == -1; <-time.After(time.Millisecond) {
for i, addr := range backendAddrs {
if _, ok := activeAddrs[addr]; ok || !holds[i].IsStarted() {
continue
}
readyBackendIdx = i
activeAddrs[addr] = true
holds[i].Resume()
break
}
}
if ctx.Err() != nil {
t.Fatal("Context timed out waiting for next backend to be contacted.")
}
// Wait for channel to become READY without any pending RPC.
testutils.AwaitState(ctx, t, conn, connectivity.Ready)
}
// Tests that ringhash is able to recover automatically in situations when a
// READY endpoint is removed by the resolver making the aggregated state
// TRANSIENT_FAILURE. The test creates 4 endpoints in the following
// connectivity states: [TF, TF, READY, IDLE]. The test removes the
// READY endpoint and verifies that the last IDLE endopint is dialed and the
// channel enters READY.
func (s) TestRingHash_RecoverWhenResolverRemovesEndpoint(t *testing.T) {
const backendsCount = 4
backends := startTestServiceBackends(t, backendsCount)
backendAddrs := backendAddrs(backends)
const clusterName = "cluster"
endpoints := endpointResource(t, clusterName, backendAddrs)
cluster := e2e.ClusterResourceWithOptions(e2e.ClusterOptions{
ClusterName: clusterName,
ServiceName: clusterName,
Policy: e2e.LoadBalancingPolicyRingHash,
})
route := headerHashRoute("new_route", virtualHostName, clusterName, "address_hash")
listener := e2e.DefaultClientListener(virtualHostName, route.Name)
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
xdsServer, nodeID, xdsResolver := setupManagementServerAndResolver(t)
if err := xdsServer.Update(ctx, xdsUpdateOpts(nodeID, endpoints, cluster, route, listener)); err != nil {
t.Fatalf("Failed to update xDS resources: %v", err)
}
dialer := testutils.NewBlockingDialer()
dialOpts := []grpc.DialOption{
grpc.WithResolvers(xdsResolver),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithContextDialer(dialer.DialContext),
grpc.WithConnectParams(fastConnectParams),
}
conn, err := grpc.NewClient("xds:///test.server", dialOpts...)
if err != nil {
t.Fatalf("Failed to create client: %s", err)
}
defer conn.Close()
// Create holds for each backend address to delay a successful connection
// until the end of the test.
holds := make([]*testutils.Hold, backendsCount)
for i := 0; i < len(backendAddrs); i++ {
holds[i] = dialer.Hold(backendAddrs[i])
}
client := testgrpc.NewTestServiceClient(conn)
rpcCtx, rpcCancel := context.WithCancel(ctx)
errCh := make(chan error, 1)
go func() {
rpcCtx = metadata.NewOutgoingContext(rpcCtx, metadata.Pairs("address_hash", backendAddrs[0]+"_0"))
_, err := client.EmptyCall(rpcCtx, &testpb.Empty{})
if status.Code(err) == codes.Canceled {
errCh <- nil
return
}
errCh <- err
}()
// Wait for the RPC to trigger a connection attempt to the first address,
// then cancel the RPC. No other connection attempts should be started yet.
if !holds[0].Wait(ctx) {
t.Fatalf("Timeout waiting for connection attempt to backend 0")
}
rpcCancel()
if err := <-errCh; err != nil {
t.Fatalf("Expected RPC to fail be canceled, got %v", err)
}
// The number of dialed backends increases by 1 in every iteration of the
// loop as ringhash tries to exit TRANSIENT_FAILURE. Run the loop twice to
// get two endpoints in TRANSIENT_FAILURE.
activeAddrs := map[string]bool{}
for wantFailingBackendCount := 1; wantFailingBackendCount <= 2; wantFailingBackendCount++ {
newAddrIdx := -1
for ; ctx.Err() == nil && len(activeAddrs) < wantFailingBackendCount; <-time.After(time.Millisecond) {
for i, hold := range holds {
if !hold.IsStarted() {
continue
}
if _, ok := activeAddrs[backendAddrs[i]]; ok {
continue
}
activeAddrs[backendAddrs[i]] = true
newAddrIdx = i
}
}
if ctx.Err() != nil {
t.Fatal("Context timed out waiting for new backneds to be dialed.")
}
if len(activeAddrs) > wantFailingBackendCount {
t.Fatalf("More backends dialed than expected: got %d, want %d", len(activeAddrs), wantFailingBackendCount)
}
// Create a new hold for the address dialed in this iteration and fail
// the existing hold.
hold := holds[newAddrIdx]
holds[newAddrIdx] = dialer.Hold(backendAddrs[newAddrIdx])
hold.Fail(errors.New("Test error"))
}
// Current state of endpoints: [TF, TF, READY, IDLE].
// Two endpoints failing should cause the channel to enter
// TRANSIENT_FAILURE.
testutils.AwaitState(ctx, t, conn, connectivity.TransientFailure)
// Allow the request to the backend dialed next to succeed.
readyBackendIdx := -1
for ; ctx.Err() == nil && readyBackendIdx == -1; <-time.After(time.Millisecond) {
for i, addr := range backendAddrs {
if _, ok := activeAddrs[addr]; ok || !holds[i].IsStarted() {
continue
}
readyBackendIdx = i
activeAddrs[addr] = true
holds[i].Resume()
break
}
}
if ctx.Err() != nil {
t.Fatal("Context timed out waiting for the next backend to be contacted.")
}
// Wait for channel to become READY without any pending RPC.
testutils.AwaitState(ctx, t, conn, connectivity.Ready)
// Current state of endpoints: [TF, TF, READY, IDLE].
// Removing the READY backend should cause the channel to re-enter
// TRANSIENT_FAILURE.
updatedAddrs := append([]string{}, backendAddrs[:readyBackendIdx]...)
updatedAddrs = append(updatedAddrs, backendAddrs[readyBackendIdx+1:]...)
updatedEndpoints := endpointResource(t, clusterName, updatedAddrs)
if err := xdsServer.Update(ctx, xdsUpdateOpts(nodeID, updatedEndpoints, cluster, route, listener)); err != nil {
t.Fatalf("Failed to update xDS resources: %v", err)
}
testutils.AwaitState(ctx, t, conn, connectivity.TransientFailure)
// To recover from TRANSIENT_FAILURE, ringhash should automatically try to
// connect to the final endpoint.
readyBackendIdx = -1
for ; ctx.Err() == nil && readyBackendIdx == -1; <-time.After(time.Millisecond) {
for i, addr := range backendAddrs {
if _, ok := activeAddrs[addr]; ok || !holds[i].IsStarted() {
continue
}
readyBackendIdx = i
activeAddrs[addr] = true
holds[i].Resume()
break
}
}
if ctx.Err() != nil {
t.Fatal("Context timed out waiting for next backend to be contacted.")
}
// Wait for channel to become READY without any pending RPC.
testutils.AwaitState(ctx, t, conn, connectivity.Ready)
}

RELEASE NOTES: N/A

@arjan-bal arjan-bal added the Type: Internal Cleanup Refactors, etc label Feb 17, 2025
@arjan-bal arjan-bal added this to the 1.71 Release milestone Feb 17, 2025
@arjan-bal arjan-bal requested a review from easwars February 17, 2025 04:38
Copy link

codecov bot commented Feb 17, 2025

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 82.33%. Comparing base (ae2a04f) to head (939be62).
Report is 3 commits behind head on master.

Additional details and impacted files
@@            Coverage Diff             @@
##           master    #8096      +/-   ##
==========================================
+ Coverage   82.29%   82.33%   +0.04%     
==========================================
  Files         387      387              
  Lines       38967    38956      -11     
==========================================
+ Hits        32066    32074       +8     
+ Misses       5586     5576      -10     
+ Partials     1315     1306       -9     
Files with missing lines Coverage Δ
xds/internal/balancer/ringhash/ringhash.go 93.92% <ø> (-0.30%) ⬇️

... and 22 files with indirect coverage changes

@easwars easwars assigned arjan-bal and unassigned easwars Feb 18, 2025
@arjan-bal arjan-bal merged commit 05bdd66 into grpc:master Feb 19, 2025
15 checks passed
janardhanvissa pushed a commit to janardhanvissa/grpc-go that referenced this pull request Mar 24, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

ringhash: Update ringhash to handle conditions under which to make child balancers exit idle state
2 participants