From d93f48e52d44810903a1b13f00a493961e535e58 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Mon, 20 Dec 2021 12:27:46 +0800 Subject: [PATCH] kvclient(ticdc): fix kvclient takes too long time to recover (#3612) (#3662) --- cdc/kv/client.go | 37 +++++++++---------------------------- cdc/kv/client_test.go | 41 ++++++++++++++++++----------------------- cdc/kv/region_worker.go | 7 +++---- 3 files changed, 30 insertions(+), 55 deletions(-) diff --git a/cdc/kv/client.go b/cdc/kv/client.go index e91008ae546..ccba12e638b 100644 --- a/cdc/kv/client.go +++ b/cdc/kv/client.go @@ -132,21 +132,6 @@ func newSingleRegionInfo(verID tikv.RegionVerID, span regionspan.ComparableSpan, } } -// partialClone clones part fields of singleRegionInfo, this is used when error -// happens, kv client needs to recover region request from singleRegionInfo -func (s *singleRegionInfo) partialClone() singleRegionInfo { - sri := singleRegionInfo{ - verID: s.verID, - span: s.span.Clone(), - ts: s.ts, - rpcCtx: &tikv.RPCContext{}, - } - if s.rpcCtx != nil { - sri.rpcCtx.Addr = s.rpcCtx.Addr - } - return sri -} - type regionErrorInfo struct { singleRegionInfo err error @@ -358,10 +343,6 @@ func (c *CDCClient) newStream(ctx context.Context, addr string, storeID uint64) } err = version.CheckStoreVersion(ctx, c.pd, storeID) if err != nil { - // TODO: we don't close gPRC conn here, let it goes into TransientFailure - // state. If the store recovers, the gPRC conn can be reused. But if - // store goes away forever, the conn will be leaked, we need a better - // connection pool. log.Error("check tikv version failed", zap.Error(err), zap.Uint64("storeID", storeID)) return } @@ -369,10 +350,6 @@ func (c *CDCClient) newStream(ctx context.Context, addr string, storeID uint64) var streamClient cdcpb.ChangeData_EventFeedClient streamClient, err = client.EventFeed(ctx) if err != nil { - // TODO: we don't close gPRC conn here, let it goes into TransientFailure - // state. If the store recovers, the gPRC conn can be reused. But if - // store goes away forever, the conn will be leaked, we need a better - // connection pool. err = cerror.WrapError(cerror.ErrTiKVEventFeed, err) log.Info("establish stream to store failed, retry later", zap.String("addr", addr), zap.Error(err)) return @@ -383,7 +360,7 @@ func (c *CDCClient) newStream(ctx context.Context, addr string, storeID uint64) } log.Debug("created stream to store", zap.String("addr", addr)) return nil - }, retry.WithBackoffBaseDelay(500), retry.WithMaxTries(8), retry.WithIsRetryableErr(cerror.IsRetryableError)) + }, retry.WithBackoffBaseDelay(500), retry.WithMaxTries(2), retry.WithIsRetryableErr(cerror.IsRetryableError)) return } @@ -1025,7 +1002,6 @@ func (s *eventFeedSession) handleError(ctx context.Context, errInfo regionErrorI innerErr := eerr.err if notLeader := innerErr.GetNotLeader(); notLeader != nil { metricFeedNotLeaderCounter.Inc() - // TODO: Handle the case that notleader.GetLeader() is nil. s.regionCache.UpdateLeader(errInfo.verID, notLeader.GetLeader(), errInfo.rpcCtx.AccessIdx) } else if innerErr.GetEpochNotMatch() != nil { // TODO: If only confver is updated, we don't need to reload the region from region cache. @@ -1059,10 +1035,12 @@ func (s *eventFeedSession) handleError(ctx context.Context, errInfo regionErrorI case *sendRequestToStoreErr: metricStoreSendRequestErr.Inc() default: + //[TODO] Move all OnSendFail logic here + // We expect some unknown error to trigger RegionCache recheck its store state and change leader to peer to + // make some detection(peer may tell us where new leader is) + // RegionCache.OnSendFail is thread_safe inner. bo := tikv.NewBackoffer(ctx, tikvRequestMaxBackoff) - if errInfo.rpcCtx.Meta != nil { - s.regionCache.OnSendFail(bo, errInfo.rpcCtx, regionScheduleReload, err) - } + s.regionCache.OnSendFail(bo, errInfo.rpcCtx, regionScheduleReload, err) } failpoint.Inject("kvClientRegionReentrantErrorDelay", nil) @@ -1160,6 +1138,9 @@ func (s *eventFeedSession) receiveFromStream( zap.Uint64("storeID", storeID), zap.Error(err), ) + // Note that pd need at lease 10s+ to tag a kv node as disconnect if kv node down + // tikv raft need wait (raft-base-tick-interval * raft-election-timeout-ticks) 10s to start a new + // election } // Use the same delay mechanism as `stream.Send` error handling, since diff --git a/cdc/kv/client_test.go b/cdc/kv/client_test.go index 8ed69ce9a1b..e61310ec110 100644 --- a/cdc/kv/client_test.go +++ b/cdc/kv/client_test.go @@ -48,6 +48,7 @@ import ( "github.com/tikv/client-go/v2/tikv" "go.uber.org/zap" "google.golang.org/grpc" + "google.golang.org/grpc/keepalive" ) func Test(t *testing.T) { @@ -278,7 +279,18 @@ func newMockServiceSpecificAddr( lis, err := lc.Listen(ctx, "tcp", listenAddr) c.Assert(err, check.IsNil) addr = lis.Addr().String() - grpcServer = grpc.NewServer() + kaep := keepalive.EnforcementPolicy{ + MinTime: 60 * time.Second, + PermitWithoutStream: true, + } + kasp := keepalive.ServerParameters{ + MaxConnectionIdle: 60 * time.Second, // If a client is idle for 60 seconds, send a GOAWAY + MaxConnectionAge: 60 * time.Second, // If any connection is alive for more than 60 seconds, send a GOAWAY + MaxConnectionAgeGrace: 5 * time.Second, // Allow 5 seconds for pending RPCs to complete before forcibly closing connections + Time: 5 * time.Second, // Ping the client if it is idle for 5 seconds to ensure the connection is still active + Timeout: 1 * time.Second, // Wait 1 second for the ping ack before assuming the connection is dead + } + grpcServer = grpc.NewServer(grpc.KeepaliveEnforcementPolicy(kaep), grpc.KeepaliveParams(kasp)) cdcpb.RegisterChangeDataServer(grpcServer, srv) wg.Add(1) go func() { @@ -1594,10 +1606,7 @@ func (s *etcdSuite) TestIncompatibleTiKV(c *check.C) { var genLock sync.Mutex nextVer := -1 call := int32(0) - // 20 here not too much, since check version itself has 3 time retry, and - // region cache could also call get store API, which will trigger version - // generator too. - versionGenCallBoundary := int32(20) + versionGenCallBoundary := int32(8) gen := func() string { genLock.Lock() defer genLock.Unlock() @@ -1652,7 +1661,8 @@ func (s *etcdSuite) TestIncompatibleTiKV(c *check.C) { grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{}) defer grpcPool.Close() cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool) - eventCh := make(chan model.RegionFeedEvent, 10) + // NOTICE: eventCh may block the main logic of EventFeed + eventCh := make(chan model.RegionFeedEvent, 128) wg.Add(1) go func() { defer wg.Done() @@ -2031,7 +2041,6 @@ func (s *etcdSuite) testEventCommitTsFallback(c *check.C, events []*cdcpb.Change ch1 <- event } clientWg.Wait() - cancel() } @@ -2514,22 +2523,6 @@ func (s *etcdSuite) TestOutOfRegionRangeEvent(c *check.C) { cancel() } -func (s *clientSuite) TestSingleRegionInfoClone(c *check.C) { - defer testleak.AfterTest(c)() - sri := newSingleRegionInfo( - tikv.RegionVerID{}, - regionspan.ComparableSpan{Start: []byte("a"), End: []byte("c")}, - 1000, &tikv.RPCContext{}) - sri2 := sri.partialClone() - sri2.ts = 2000 - sri2.span.End[0] = 'b' - c.Assert(sri.ts, check.Equals, uint64(1000)) - c.Assert(sri.span.String(), check.Equals, "[61, 63)") - c.Assert(sri2.ts, check.Equals, uint64(2000)) - c.Assert(sri2.span.String(), check.Equals, "[61, 62)") - c.Assert(sri2.rpcCtx, check.DeepEquals, &tikv.RPCContext{}) -} - // TestResolveLockNoCandidate tests the resolved ts manager can work normally // when no region exceeds reslove lock interval, that is what candidate means. func (s *etcdSuite) TestResolveLockNoCandidate(c *check.C) { @@ -2851,6 +2844,7 @@ func (s *etcdSuite) testKVClientForceReconnect(c *check.C) { server1Stopped <- struct{}{} }() for { + // Currently no msg more than 60s will cause a GoAway msg to end the connection _, err := server.Recv() if err != nil { log.Error("mock server error", zap.Error(err)) @@ -2899,6 +2893,7 @@ func (s *etcdSuite) testKVClientForceReconnect(c *check.C) { initialized := mockInitializedEvent(regionID3, currentRequestID()) ch1 <- initialized + // Connection close for timeout <-server1Stopped var requestIds sync.Map diff --git a/cdc/kv/region_worker.go b/cdc/kv/region_worker.go index dead08c2626..ad37a3eee5b 100644 --- a/cdc/kv/region_worker.go +++ b/cdc/kv/region_worker.go @@ -784,9 +784,8 @@ func (w *regionWorker) evictAllRegions() error { } state.markStopped() w.delRegionState(state.sri.verID.GetID()) - singleRegionInfo := state.sri.partialClone() - if state.lastResolvedTs > singleRegionInfo.ts { - singleRegionInfo.ts = state.lastResolvedTs + if state.lastResolvedTs > state.sri.ts { + state.sri.ts = state.lastResolvedTs } revokeToken := !state.initialized state.lock.Unlock() @@ -794,7 +793,7 @@ func (w *regionWorker) evictAllRegions() error { // region worker exits, we must use the parent context to prevent // regionErrorInfo loss. err = w.session.onRegionFail(w.parentCtx, regionErrorInfo{ - singleRegionInfo: singleRegionInfo, + singleRegionInfo: state.sri, err: cerror.ErrEventFeedAborted.FastGenByArgs(), }, revokeToken) return err == nil