Skip to content

Commit

Permalink
fix connection
Browse files Browse the repository at this point in the history
Signed-off-by: Cabinfever_B <cabinfeveroier@gmail.com>
  • Loading branch information
CabinfeverB committed Nov 27, 2023
1 parent 56ba830 commit d926df2
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 3 deletions.
4 changes: 3 additions & 1 deletion client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -732,16 +732,18 @@ func (c *client) checkLeaderHealth(ctx context.Context) {
if client := c.pdSvcDiscovery.GetServingEndpointClientConn(); client != nil {
healthCli := healthpb.NewHealthClient(client)
resp, err := healthCli.Check(ctx, &healthpb.HealthCheckRequest{Service: ""})
rpcErr, ok := status.FromError(err)
failpoint.Inject("unreachableNetwork1", func() {
resp = nil
err = status.New(codes.Unavailable, "unavailable").Err()
})
rpcErr, ok := status.FromError(err)
if (ok && isNetworkError(rpcErr.Code())) || resp.GetStatus() != healthpb.HealthCheckResponse_SERVING {
atomic.StoreInt32(&(c.leaderNetworkFailure), int32(1))
} else {
atomic.StoreInt32(&(c.leaderNetworkFailure), int32(0))
}
} else {
atomic.StoreInt32(&(c.leaderNetworkFailure), int32(1))
}
}

Expand Down
44 changes: 42 additions & 2 deletions tests/integrations/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -610,7 +610,7 @@ func TestGetTsoByFollowerForwarding2(t *testing.T) {
}

// case 3: network partition between client and follower A -> transfer leader to follower A -> normal
func TestGetTsoByFollowerForwarding3(t *testing.T) {
func TestGetTsoAndRegionByFollowerForwarding(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand All @@ -622,6 +622,29 @@ func TestGetTsoByFollowerForwarding3(t *testing.T) {
endpoints := runServer(re, cluster)
re.NotEmpty(cluster.WaitLeader())
leader := cluster.GetLeaderServer()
grpcPDClient := testutil.MustNewGrpcClient(re, leader.GetAddr())
testutil.Eventually(re, func() bool {
regionHeartbeat, err := grpcPDClient.RegionHeartbeat(ctx)
re.NoError(err)
regionID := regionIDAllocator.alloc()
region := &metapb.Region{
Id: regionID,
RegionEpoch: &metapb.RegionEpoch{
ConfVer: 1,
Version: 1,
},
Peers: peers,
}
req := &pdpb.RegionHeartbeatRequest{
Header: newHeader(leader.GetServer()),
Region: region,
Leader: peers[0],
}
err = regionHeartbeat.Send(req)
re.NoError(err)
_, err = regionHeartbeat.Recv()
return err == nil
})
follower := cluster.GetServer(cluster.GetFollower())
re.NoError(failpoint.Enable("github.com/tikv/pd/client/grpcutil/unreachableNetwork2", fmt.Sprintf("return(\"%s\")", follower.GetAddr())))

Expand All @@ -637,7 +660,9 @@ func TestGetTsoByFollowerForwarding3(t *testing.T) {
return false
})
lastTS = checkTS(re, cli, lastTS)

r, err := cli.GetRegion(context.Background(), []byte("a"))
re.NoError(err)
re.NotNil(r)
leader.GetServer().GetMember().ResignEtcdLeader(leader.GetServer().Context(),
leader.GetServer().Name(), follower.GetServer().Name())
re.NotEmpty(cluster.WaitLeader())
Expand All @@ -651,6 +676,14 @@ func TestGetTsoByFollowerForwarding3(t *testing.T) {
return false
})
lastTS = checkTS(re, cli, lastTS)
testutil.Eventually(re, func() bool {
r, err = cli.GetRegion(context.Background(), []byte("a"))
if err == nil && r != nil {
return true
}
return false
})

re.NoError(failpoint.Disable("github.com/tikv/pd/client/grpcutil/unreachableNetwork2"))
testutil.Eventually(re, func() bool {
physical, logical, err := cli.GetTS(context.TODO())
Expand All @@ -662,6 +695,13 @@ func TestGetTsoByFollowerForwarding3(t *testing.T) {
return false
})
lastTS = checkTS(re, cli, lastTS)
testutil.Eventually(re, func() bool {
r, err = cli.GetRegion(context.Background(), []byte("a"))
if err == nil && r != nil {
return true
}
return false
})
}

func checkTS(re *require.Assertions, cli pd.Client, lastTS uint64) uint64 {
Expand Down

0 comments on commit d926df2

Please sign in to comment.