Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DNM] skip evict leader for v7.5.1 #8614

Open
wants to merge 35 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
63eb0cb
placement: add rule/group count metrics (#7232) (#7243)
ti-chi-bot Oct 25, 2023
8b64ecf
rule_checker: fix the issue of not being able to achieve the better R…
ti-chi-bot Oct 25, 2023
7b3611a
*: check whether region is nil (#7263) (#7267)
ti-chi-bot Oct 26, 2023
a54621a
api: fix cannot dump trace (#7255) (#7265)
ti-chi-bot Oct 26, 2023
595d5b0
dashboard: update hotfix version (#7303) (#7307)
ti-chi-bot Nov 2, 2023
710ffcd
replication mode: fix wrong available store list (#7222) (#7328)
ti-chi-bot Nov 8, 2023
da1e92d
core: batch get region size (#7252) (#7332)
ti-chi-bot Nov 8, 2023
a22710c
checker: reduces the probability of deleting normal peers when the st…
lhy1024 Nov 8, 2023
7c65b8d
chore(dashboard): update tidb dashboard verstion to v2023.11.08.1 (#7…
ti-chi-bot Nov 9, 2023
d09a4f5
mcs/resourcemanager: delete expire tokenSlot (#7344) (#7350)
ti-chi-bot Nov 10, 2023
d0a17ca
etcdutil, leadership: avoid redundant created watch channel (#7352) (…
ti-chi-bot Nov 10, 2023
ef6ba85
resourcemanager: return resource-group priority in OnRequestWait (#73…
ti-chi-bot Nov 16, 2023
a5b9d66
go.mod: upgrade gin version from v1.8.1 to v1.9.1 (#7451) (#7514)
ti-chi-bot Dec 11, 2023
3d7f65e
resource_control: improve trace logs, ctl and metrics (#7510) (#7524)
ti-chi-bot Dec 12, 2023
d2074a9
resource_control: fix data race in controller (#7520) (#7526)
ti-chi-bot Dec 13, 2023
c9c9979
errs: remove redundant `FastGenWithCause` in `ZapError` (#7497) (#7545)
ti-chi-bot Dec 22, 2023
8ea0f6f
client: update the leader even if the connection creation fails (#744…
ti-chi-bot Dec 25, 2023
7ce5860
resource_mananger: deep clone resource group (#7623) (#7625)
ti-chi-bot Jan 2, 2024
511b094
resource_control: unify label name to group_name (#7547) (#7656)
ti-chi-bot Jan 3, 2024
a276843
resource_group: don't accumulate tokens when burstlimit less than 0 (…
ti-chi-bot Jan 4, 2024
0794b5e
memory: support cgroup with systemd (#7627) (#7666)
ti-chi-bot Jan 10, 2024
25071dd
scheduler: add aduit log for scheduler config API and add resp msg fo…
ti-chi-bot Jan 16, 2024
1be15d7
check: remove orphan peer only when the peers is greater than the rul…
ti-chi-bot Feb 1, 2024
6978558
client: return total wait duration in resource interceptor OnRequestW…
ti-chi-bot Feb 2, 2024
ae19047
member: avoid frequent campaign times (#7301) (#7790)
ti-chi-bot Feb 2, 2024
85e1a27
*: cherry-pick the etcd client health checker improvements (#7793)
JmPotato Feb 4, 2024
318a3fd
mcs: fix metrics cleanup (#7652) (#7659)
ti-chi-bot Feb 5, 2024
83f290a
*: fix context usage when watch etcd (#7806) (#7811)
ti-chi-bot Feb 7, 2024
decd310
schedule: fix panic when switching placement rules (#7415) (#7425)
ti-chi-bot Feb 7, 2024
ae9db49
api: fix panic when region doesn't have a leader (#7629) (#7650)
ti-chi-bot Feb 9, 2024
b8feb2b
prepare_check: remove redundant check (#7217) (#7818)
ti-chi-bot Feb 10, 2024
3488a65
*: fix region stats check (#7748) (#7812)
ti-chi-bot Feb 10, 2024
7294ff9
chore(dashboard): update TiDB Dashboard to v7.5.1-43fe8dac [release-7…
baurine Feb 20, 2024
d71a1a3
core: fix datarace in MergeLabels (#7537) (#7830)
ti-chi-bot Feb 20, 2024
463297b
scheduler: skip evict-leader-scheduler when setting schedule deny lab…
okJiang Jun 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
client: update the leader even if the connection creation fails (#7443)…
… (#7476)

close #7416

Signed-off-by: Cabinfever_B <cabinfeveroier@gmail.com>

Co-authored-by: Cabinfever_B <cabinfeveroier@gmail.com>
  • Loading branch information
ti-chi-bot and CabinfeverB authored Dec 25, 2023
commit 8ea0f6f26fa03ed7317bd0f129319bcbb967d943
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ SHELL := env PATH='$(PATH)' GOBIN='$(GO_TOOLS_BIN_PATH)' $(shell which bash)

install-tools:
@mkdir -p $(GO_TOOLS_BIN_PATH)
@which golangci-lint >/dev/null 2>&1 || curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(GO_TOOLS_BIN_PATH) v1.51.2
@which golangci-lint >/dev/null 2>&1 || curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(GO_TOOLS_BIN_PATH) v1.55.2
@grep '_' tools.go | sed 's/"//g' | awk '{print $$2}' | xargs go install

.PHONY: install-tools
Expand Down
8 changes: 6 additions & 2 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -732,16 +732,18 @@ func (c *client) checkLeaderHealth(ctx context.Context) {
if client := c.pdSvcDiscovery.GetServingEndpointClientConn(); client != nil {
healthCli := healthpb.NewHealthClient(client)
resp, err := healthCli.Check(ctx, &healthpb.HealthCheckRequest{Service: ""})
rpcErr, ok := status.FromError(err)
failpoint.Inject("unreachableNetwork1", func() {
resp = nil
err = status.New(codes.Unavailable, "unavailable").Err()
})
rpcErr, ok := status.FromError(err)
if (ok && isNetworkError(rpcErr.Code())) || resp.GetStatus() != healthpb.HealthCheckResponse_SERVING {
atomic.StoreInt32(&(c.leaderNetworkFailure), int32(1))
} else {
atomic.StoreInt32(&(c.leaderNetworkFailure), int32(0))
}
} else {
atomic.StoreInt32(&(c.leaderNetworkFailure), int32(1))
}
}

Expand Down Expand Up @@ -1062,7 +1064,9 @@ func (c *client) ScanRegions(ctx context.Context, key, endKey []byte, limit int)
defer span.Finish()
}
start := time.Now()
defer cmdDurationScanRegions.Observe(time.Since(start).Seconds())
defer func() {
cmdDurationScanRegions.Observe(time.Since(start).Seconds())
}()

var cancel context.CancelFunc
scanCtx := ctx
Expand Down
8 changes: 8 additions & 0 deletions client/grpcutil/grpcutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"sync"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/tikv/pd/client/errs"
"github.com/tikv/pd/client/tlsutil"
Expand Down Expand Up @@ -88,6 +90,12 @@ func GetOrCreateGRPCConn(ctx context.Context, clientConns *sync.Map, addr string
dCtx, cancel := context.WithTimeout(ctx, dialTimeout)
defer cancel()
cc, err := GetClientConn(dCtx, addr, tlsConfig, opt...)
failpoint.Inject("unreachableNetwork2", func(val failpoint.Value) {
if val, ok := val.(string); ok && val == addr {
cc = nil
err = errors.Errorf("unreachable network")
}
})
if err != nil {
return nil, err
}
Expand Down
1 change: 0 additions & 1 deletion client/pd_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -616,7 +616,6 @@ func (c *pdServiceDiscovery) switchLeader(addrs []string) error {

if _, err := c.GetOrCreateGRPCConn(addr); err != nil {
log.Warn("[pd] failed to connect leader", zap.String("leader", addr), errs.ZapError(err))
return err
}
// Set PD leader and Global TSO Allocator (which is also the PD leader)
c.leader.Store(addr)
Expand Down
3 changes: 2 additions & 1 deletion client/tso_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,10 @@ func (c *tsoClient) GetTSOAllocatorClientConnByDCLocation(dcLocation string) (*g
if !ok {
panic(fmt.Sprintf("the allocator leader in %s should exist", dcLocation))
}
// todo: if we support local tso forward, we should get or create client conns.
cc, ok := c.svcDiscovery.GetClientConns().Load(url)
if !ok {
panic(fmt.Sprintf("the client connection of %s in %s should exist", url, dcLocation))
return nil, url.(string)
}
return cc.(*grpc.ClientConn), url.(string)
}
Expand Down
82 changes: 45 additions & 37 deletions client/tso_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ func (c *tsoClient) checkAllocator(
requestForwarded.WithLabelValues(forwardedHostTrim, addrTrim).Set(0)
}()
cc, u := c.GetTSOAllocatorClientConnByDCLocation(dc)
healthCli := healthpb.NewHealthClient(cc)
var healthCli healthpb.HealthClient
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
Expand All @@ -263,20 +263,25 @@ func (c *tsoClient) checkAllocator(
log.Info("[tso] the leader of the allocator leader is changed", zap.String("dc", dc), zap.String("origin", url), zap.String("new", u))
return
}
healthCtx, healthCancel := context.WithTimeout(dispatcherCtx, c.option.timeout)
resp, err := healthCli.Check(healthCtx, &healthpb.HealthCheckRequest{Service: ""})
failpoint.Inject("unreachableNetwork", func() {
resp.Status = healthpb.HealthCheckResponse_UNKNOWN
})
healthCancel()
if err == nil && resp.GetStatus() == healthpb.HealthCheckResponse_SERVING {
// create a stream of the original allocator
cctx, cancel := context.WithCancel(dispatcherCtx)
stream, err := c.tsoStreamBuilderFactory.makeBuilder(cc).build(cctx, cancel, c.option.timeout)
if err == nil && stream != nil {
log.Info("[tso] recover the original tso stream since the network has become normal", zap.String("dc", dc), zap.String("url", url))
updateAndClear(url, &tsoConnectionContext{url, stream, cctx, cancel})
return
if healthCli == nil && cc != nil {
healthCli = healthpb.NewHealthClient(cc)
}
if healthCli != nil {
healthCtx, healthCancel := context.WithTimeout(dispatcherCtx, c.option.timeout)
resp, err := healthCli.Check(healthCtx, &healthpb.HealthCheckRequest{Service: ""})
failpoint.Inject("unreachableNetwork", func() {
resp.Status = healthpb.HealthCheckResponse_UNKNOWN
})
healthCancel()
if err == nil && resp.GetStatus() == healthpb.HealthCheckResponse_SERVING {
// create a stream of the original allocator
cctx, cancel := context.WithCancel(dispatcherCtx)
stream, err := c.tsoStreamBuilderFactory.makeBuilder(cc).build(cctx, cancel, c.option.timeout)
if err == nil && stream != nil {
log.Info("[tso] recover the original tso stream since the network has become normal", zap.String("dc", dc), zap.String("url", url))
updateAndClear(url, &tsoConnectionContext{url, stream, cctx, cancel})
return
}
}
}
select {
Expand All @@ -285,7 +290,7 @@ func (c *tsoClient) checkAllocator(
case <-ticker.C:
// To ensure we can get the latest allocator leader
// and once the leader is changed, we can exit this function.
_, u = c.GetTSOAllocatorClientConnByDCLocation(dc)
cc, u = c.GetTSOAllocatorClientConnByDCLocation(dc)
}
}
}
Expand Down Expand Up @@ -597,29 +602,32 @@ func (c *tsoClient) tryConnectToTSO(
for i := 0; i < maxRetryTimes; i++ {
c.svcDiscovery.ScheduleCheckMemberChanged()
cc, url = c.GetTSOAllocatorClientConnByDCLocation(dc)
cctx, cancel := context.WithCancel(dispatcherCtx)
stream, err = c.tsoStreamBuilderFactory.makeBuilder(cc).build(cctx, cancel, c.option.timeout)
failpoint.Inject("unreachableNetwork", func() {
stream = nil
err = status.New(codes.Unavailable, "unavailable").Err()
})
if stream != nil && err == nil {
updateAndClear(url, &tsoConnectionContext{url, stream, cctx, cancel})
return nil
}

if err != nil && c.option.enableForwarding {
// The reason we need to judge if the error code is equal to "Canceled" here is that
// when we create a stream we use a goroutine to manually control the timeout of the connection.
// There is no need to wait for the transport layer timeout which can reduce the time of unavailability.
// But it conflicts with the retry mechanism since we use the error code to decide if it is caused by network error.
// And actually the `Canceled` error can be regarded as a kind of network error in some way.
if rpcErr, ok := status.FromError(err); ok && (isNetworkError(rpcErr.Code()) || rpcErr.Code() == codes.Canceled) {
networkErrNum++
if cc != nil {
cctx, cancel := context.WithCancel(dispatcherCtx)
stream, err = c.tsoStreamBuilderFactory.makeBuilder(cc).build(cctx, cancel, c.option.timeout)
failpoint.Inject("unreachableNetwork", func() {
stream = nil
err = status.New(codes.Unavailable, "unavailable").Err()
})
if stream != nil && err == nil {
updateAndClear(url, &tsoConnectionContext{url, stream, cctx, cancel})
return nil
}
}

cancel()
if err != nil && c.option.enableForwarding {
// The reason we need to judge if the error code is equal to "Canceled" here is that
// when we create a stream we use a goroutine to manually control the timeout of the connection.
// There is no need to wait for the transport layer timeout which can reduce the time of unavailability.
// But it conflicts with the retry mechanism since we use the error code to decide if it is caused by network error.
// And actually the `Canceled` error can be regarded as a kind of network error in some way.
if rpcErr, ok := status.FromError(err); ok && (isNetworkError(rpcErr.Code()) || rpcErr.Code() == codes.Canceled) {
networkErrNum++
}
}
cancel()
} else {
networkErrNum++
}
select {
case <-dispatcherCtx.Done():
return err
Expand Down
101 changes: 98 additions & 3 deletions tests/integrations/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,7 @@ func TestCustomTimeout(t *testing.T) {
re.Less(time.Since(start), 2*time.Second)
}

func TestGetRegionFromFollowerClient(t *testing.T) {
func TestGetRegionByFollowerForwarding(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand All @@ -540,7 +540,7 @@ func TestGetRegionFromFollowerClient(t *testing.T) {
}

// case 1: unreachable -> normal
func TestGetTsoFromFollowerClient1(t *testing.T) {
func TestGetTsoByFollowerForwarding1(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down Expand Up @@ -571,7 +571,7 @@ func TestGetTsoFromFollowerClient1(t *testing.T) {
}

// case 2: unreachable -> leader transfer -> normal
func TestGetTsoFromFollowerClient2(t *testing.T) {
func TestGetTsoByFollowerForwarding2(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down Expand Up @@ -605,6 +605,101 @@ func TestGetTsoFromFollowerClient2(t *testing.T) {
checkTS(re, cli, lastTS)
}

// case 3: network partition between client and follower A -> transfer leader to follower A -> normal
func TestGetTsoAndRegionByFollowerForwarding(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
pd.LeaderHealthCheckInterval = 100 * time.Millisecond
cluster, err := tests.NewTestCluster(ctx, 3)
re.NoError(err)
defer cluster.Destroy()

endpoints := runServer(re, cluster)
re.NotEmpty(cluster.WaitLeader())
leader := cluster.GetLeaderServer()
grpcPDClient := testutil.MustNewGrpcClient(re, leader.GetAddr())
testutil.Eventually(re, func() bool {
regionHeartbeat, err := grpcPDClient.RegionHeartbeat(ctx)
re.NoError(err)
regionID := regionIDAllocator.alloc()
region := &metapb.Region{
Id: regionID,
RegionEpoch: &metapb.RegionEpoch{
ConfVer: 1,
Version: 1,
},
Peers: peers,
}
req := &pdpb.RegionHeartbeatRequest{
Header: newHeader(leader.GetServer()),
Region: region,
Leader: peers[0],
}
err = regionHeartbeat.Send(req)
re.NoError(err)
_, err = regionHeartbeat.Recv()
return err == nil
})
follower := cluster.GetServer(cluster.GetFollower())
re.NoError(failpoint.Enable("github.com/tikv/pd/client/grpcutil/unreachableNetwork2", fmt.Sprintf("return(\"%s\")", follower.GetAddr())))

cli := setupCli(re, ctx, endpoints, pd.WithForwardingOption(true))
var lastTS uint64
testutil.Eventually(re, func() bool {
physical, logical, err := cli.GetTS(context.TODO())
if err == nil {
lastTS = tsoutil.ComposeTS(physical, logical)
return true
}
t.Log(err)
return false
})
lastTS = checkTS(re, cli, lastTS)
r, err := cli.GetRegion(context.Background(), []byte("a"))
re.NoError(err)
re.NotNil(r)
leader.GetServer().GetMember().ResignEtcdLeader(leader.GetServer().Context(),
leader.GetServer().Name(), follower.GetServer().Name())
re.NotEmpty(cluster.WaitLeader())
testutil.Eventually(re, func() bool {
physical, logical, err := cli.GetTS(context.TODO())
if err == nil {
lastTS = tsoutil.ComposeTS(physical, logical)
return true
}
t.Log(err)
return false
})
lastTS = checkTS(re, cli, lastTS)
testutil.Eventually(re, func() bool {
r, err = cli.GetRegion(context.Background(), []byte("a"))
if err == nil && r != nil {
return true
}
return false
})

re.NoError(failpoint.Disable("github.com/tikv/pd/client/grpcutil/unreachableNetwork2"))
testutil.Eventually(re, func() bool {
physical, logical, err := cli.GetTS(context.TODO())
if err == nil {
lastTS = tsoutil.ComposeTS(physical, logical)
return true
}
t.Log(err)
return false
})
lastTS = checkTS(re, cli, lastTS)
testutil.Eventually(re, func() bool {
r, err = cli.GetRegion(context.Background(), []byte("a"))
if err == nil && r != nil {
return true
}
return false
})
}

func checkTS(re *require.Assertions, cli pd.Client, lastTS uint64) uint64 {
for i := 0; i < tsoRequestRound; i++ {
physical, logical, err := cli.GetTS(context.TODO())
Expand Down
Loading