Skip to content

Commit

Permalink
kvclient(ticdc): fix kvclient takes too long time to recover (pingcap…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored and 3AceShowHand committed Jan 13, 2022
1 parent 3c36fc0 commit d93f48e
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 55 deletions.
37 changes: 9 additions & 28 deletions cdc/kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,21 +132,6 @@ func newSingleRegionInfo(verID tikv.RegionVerID, span regionspan.ComparableSpan,
}
}

// partialClone clones part fields of singleRegionInfo, this is used when error
// happens, kv client needs to recover region request from singleRegionInfo
func (s *singleRegionInfo) partialClone() singleRegionInfo {
sri := singleRegionInfo{
verID: s.verID,
span: s.span.Clone(),
ts: s.ts,
rpcCtx: &tikv.RPCContext{},
}
if s.rpcCtx != nil {
sri.rpcCtx.Addr = s.rpcCtx.Addr
}
return sri
}

type regionErrorInfo struct {
singleRegionInfo
err error
Expand Down Expand Up @@ -358,21 +343,13 @@ func (c *CDCClient) newStream(ctx context.Context, addr string, storeID uint64)
}
err = version.CheckStoreVersion(ctx, c.pd, storeID)
if err != nil {
// TODO: we don't close gPRC conn here, let it goes into TransientFailure
// state. If the store recovers, the gPRC conn can be reused. But if
// store goes away forever, the conn will be leaked, we need a better
// connection pool.
log.Error("check tikv version failed", zap.Error(err), zap.Uint64("storeID", storeID))
return
}
client := cdcpb.NewChangeDataClient(conn.ClientConn)
var streamClient cdcpb.ChangeData_EventFeedClient
streamClient, err = client.EventFeed(ctx)
if err != nil {
// TODO: we don't close gPRC conn here, let it goes into TransientFailure
// state. If the store recovers, the gPRC conn can be reused. But if
// store goes away forever, the conn will be leaked, we need a better
// connection pool.
err = cerror.WrapError(cerror.ErrTiKVEventFeed, err)
log.Info("establish stream to store failed, retry later", zap.String("addr", addr), zap.Error(err))
return
Expand All @@ -383,7 +360,7 @@ func (c *CDCClient) newStream(ctx context.Context, addr string, storeID uint64)
}
log.Debug("created stream to store", zap.String("addr", addr))
return nil
}, retry.WithBackoffBaseDelay(500), retry.WithMaxTries(8), retry.WithIsRetryableErr(cerror.IsRetryableError))
}, retry.WithBackoffBaseDelay(500), retry.WithMaxTries(2), retry.WithIsRetryableErr(cerror.IsRetryableError))
return
}

Expand Down Expand Up @@ -1025,7 +1002,6 @@ func (s *eventFeedSession) handleError(ctx context.Context, errInfo regionErrorI
innerErr := eerr.err
if notLeader := innerErr.GetNotLeader(); notLeader != nil {
metricFeedNotLeaderCounter.Inc()
// TODO: Handle the case that notleader.GetLeader() is nil.
s.regionCache.UpdateLeader(errInfo.verID, notLeader.GetLeader(), errInfo.rpcCtx.AccessIdx)
} else if innerErr.GetEpochNotMatch() != nil {
// TODO: If only confver is updated, we don't need to reload the region from region cache.
Expand Down Expand Up @@ -1059,10 +1035,12 @@ func (s *eventFeedSession) handleError(ctx context.Context, errInfo regionErrorI
case *sendRequestToStoreErr:
metricStoreSendRequestErr.Inc()
default:
//[TODO] Move all OnSendFail logic here
// We expect some unknown error to trigger RegionCache recheck its store state and change leader to peer to
// make some detection(peer may tell us where new leader is)
// RegionCache.OnSendFail is thread_safe inner.
bo := tikv.NewBackoffer(ctx, tikvRequestMaxBackoff)
if errInfo.rpcCtx.Meta != nil {
s.regionCache.OnSendFail(bo, errInfo.rpcCtx, regionScheduleReload, err)
}
s.regionCache.OnSendFail(bo, errInfo.rpcCtx, regionScheduleReload, err)
}

failpoint.Inject("kvClientRegionReentrantErrorDelay", nil)
Expand Down Expand Up @@ -1160,6 +1138,9 @@ func (s *eventFeedSession) receiveFromStream(
zap.Uint64("storeID", storeID),
zap.Error(err),
)
// Note that pd need at lease 10s+ to tag a kv node as disconnect if kv node down
// tikv raft need wait (raft-base-tick-interval * raft-election-timeout-ticks) 10s to start a new
// election
}

// Use the same delay mechanism as `stream.Send` error handling, since
Expand Down
41 changes: 18 additions & 23 deletions cdc/kv/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import (
"github.com/tikv/client-go/v2/tikv"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
)

func Test(t *testing.T) {
Expand Down Expand Up @@ -278,7 +279,18 @@ func newMockServiceSpecificAddr(
lis, err := lc.Listen(ctx, "tcp", listenAddr)
c.Assert(err, check.IsNil)
addr = lis.Addr().String()
grpcServer = grpc.NewServer()
kaep := keepalive.EnforcementPolicy{
MinTime: 60 * time.Second,
PermitWithoutStream: true,
}
kasp := keepalive.ServerParameters{
MaxConnectionIdle: 60 * time.Second, // If a client is idle for 60 seconds, send a GOAWAY
MaxConnectionAge: 60 * time.Second, // If any connection is alive for more than 60 seconds, send a GOAWAY
MaxConnectionAgeGrace: 5 * time.Second, // Allow 5 seconds for pending RPCs to complete before forcibly closing connections
Time: 5 * time.Second, // Ping the client if it is idle for 5 seconds to ensure the connection is still active
Timeout: 1 * time.Second, // Wait 1 second for the ping ack before assuming the connection is dead
}
grpcServer = grpc.NewServer(grpc.KeepaliveEnforcementPolicy(kaep), grpc.KeepaliveParams(kasp))
cdcpb.RegisterChangeDataServer(grpcServer, srv)
wg.Add(1)
go func() {
Expand Down Expand Up @@ -1594,10 +1606,7 @@ func (s *etcdSuite) TestIncompatibleTiKV(c *check.C) {
var genLock sync.Mutex
nextVer := -1
call := int32(0)
// 20 here not too much, since check version itself has 3 time retry, and
// region cache could also call get store API, which will trigger version
// generator too.
versionGenCallBoundary := int32(20)
versionGenCallBoundary := int32(8)
gen := func() string {
genLock.Lock()
defer genLock.Unlock()
Expand Down Expand Up @@ -1652,7 +1661,8 @@ func (s *etcdSuite) TestIncompatibleTiKV(c *check.C) {
grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{})
defer grpcPool.Close()
cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool)
eventCh := make(chan model.RegionFeedEvent, 10)
// NOTICE: eventCh may block the main logic of EventFeed
eventCh := make(chan model.RegionFeedEvent, 128)
wg.Add(1)
go func() {
defer wg.Done()
Expand Down Expand Up @@ -2031,7 +2041,6 @@ func (s *etcdSuite) testEventCommitTsFallback(c *check.C, events []*cdcpb.Change
ch1 <- event
}
clientWg.Wait()

cancel()
}

Expand Down Expand Up @@ -2514,22 +2523,6 @@ func (s *etcdSuite) TestOutOfRegionRangeEvent(c *check.C) {
cancel()
}

func (s *clientSuite) TestSingleRegionInfoClone(c *check.C) {
defer testleak.AfterTest(c)()
sri := newSingleRegionInfo(
tikv.RegionVerID{},
regionspan.ComparableSpan{Start: []byte("a"), End: []byte("c")},
1000, &tikv.RPCContext{})
sri2 := sri.partialClone()
sri2.ts = 2000
sri2.span.End[0] = 'b'
c.Assert(sri.ts, check.Equals, uint64(1000))
c.Assert(sri.span.String(), check.Equals, "[61, 63)")
c.Assert(sri2.ts, check.Equals, uint64(2000))
c.Assert(sri2.span.String(), check.Equals, "[61, 62)")
c.Assert(sri2.rpcCtx, check.DeepEquals, &tikv.RPCContext{})
}

// TestResolveLockNoCandidate tests the resolved ts manager can work normally
// when no region exceeds reslove lock interval, that is what candidate means.
func (s *etcdSuite) TestResolveLockNoCandidate(c *check.C) {
Expand Down Expand Up @@ -2851,6 +2844,7 @@ func (s *etcdSuite) testKVClientForceReconnect(c *check.C) {
server1Stopped <- struct{}{}
}()
for {
// Currently no msg more than 60s will cause a GoAway msg to end the connection
_, err := server.Recv()
if err != nil {
log.Error("mock server error", zap.Error(err))
Expand Down Expand Up @@ -2899,6 +2893,7 @@ func (s *etcdSuite) testKVClientForceReconnect(c *check.C) {
initialized := mockInitializedEvent(regionID3, currentRequestID())
ch1 <- initialized

// Connection close for timeout
<-server1Stopped

var requestIds sync.Map
Expand Down
7 changes: 3 additions & 4 deletions cdc/kv/region_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -784,17 +784,16 @@ func (w *regionWorker) evictAllRegions() error {
}
state.markStopped()
w.delRegionState(state.sri.verID.GetID())
singleRegionInfo := state.sri.partialClone()
if state.lastResolvedTs > singleRegionInfo.ts {
singleRegionInfo.ts = state.lastResolvedTs
if state.lastResolvedTs > state.sri.ts {
state.sri.ts = state.lastResolvedTs
}
revokeToken := !state.initialized
state.lock.Unlock()
// since the context used in region worker will be cancelled after
// region worker exits, we must use the parent context to prevent
// regionErrorInfo loss.
err = w.session.onRegionFail(w.parentCtx, regionErrorInfo{
singleRegionInfo: singleRegionInfo,
singleRegionInfo: state.sri,
err: cerror.ErrEventFeedAborted.FastGenByArgs(),
}, revokeToken)
return err == nil
Expand Down

0 comments on commit d93f48e

Please sign in to comment.