diff --git a/cdc/kv/client.go b/cdc/kv/client.go index 0a726bd1f86..b006e2bd52c 100644 --- a/cdc/kv/client.go +++ b/cdc/kv/client.go @@ -132,21 +132,6 @@ func newSingleRegionInfo(verID tikv.RegionVerID, span regionspan.ComparableSpan, } } -// partialClone clones part fields of singleRegionInfo, this is used when error -// happens, kv client needs to recover region request from singleRegionInfo -func (s *singleRegionInfo) partialClone() singleRegionInfo { - sri := singleRegionInfo{ - verID: s.verID, - span: s.span.Clone(), - ts: s.ts, - rpcCtx: &tikv.RPCContext{}, - } - if s.rpcCtx != nil { - sri.rpcCtx.Addr = s.rpcCtx.Addr - } - return sri -} - type regionErrorInfo struct { singleRegionInfo err error @@ -358,10 +343,6 @@ func (c *CDCClient) newStream(ctx context.Context, addr string, storeID uint64) } err = version.CheckStoreVersion(ctx, c.pd, storeID) if err != nil { - // TODO: we don't close gPRC conn here, let it goes into TransientFailure - // state. If the store recovers, the gPRC conn can be reused. But if - // store goes away forever, the conn will be leaked, we need a better - // connection pool. log.Error("check tikv version failed", zap.Error(err), zap.Uint64("storeID", storeID)) return } @@ -369,10 +350,6 @@ func (c *CDCClient) newStream(ctx context.Context, addr string, storeID uint64) var streamClient cdcpb.ChangeData_EventFeedClient streamClient, err = client.EventFeed(ctx) if err != nil { - // TODO: we don't close gPRC conn here, let it goes into TransientFailure - // state. If the store recovers, the gPRC conn can be reused. But if - // store goes away forever, the conn will be leaked, we need a better - // connection pool. err = cerror.WrapError(cerror.ErrTiKVEventFeed, err) log.Info("establish stream to store failed, retry later", zap.String("addr", addr), zap.Error(err)) return @@ -383,7 +360,7 @@ func (c *CDCClient) newStream(ctx context.Context, addr string, storeID uint64) } log.Debug("created stream to store", zap.String("addr", addr)) return nil - }, retry.WithBackoffBaseDelay(500), retry.WithMaxTries(8), retry.WithIsRetryableErr(cerror.IsRetryableError)) + }, retry.WithBackoffBaseDelay(500), retry.WithMaxTries(2), retry.WithIsRetryableErr(cerror.IsRetryableError)) return } @@ -1025,7 +1002,6 @@ func (s *eventFeedSession) handleError(ctx context.Context, errInfo regionErrorI innerErr := eerr.err if notLeader := innerErr.GetNotLeader(); notLeader != nil { metricFeedNotLeaderCounter.Inc() - // TODO: Handle the case that notleader.GetLeader() is nil. s.regionCache.UpdateLeader(errInfo.verID, notLeader.GetLeader(), errInfo.rpcCtx.AccessIdx) } else if innerErr.GetEpochNotMatch() != nil { // TODO: If only confver is updated, we don't need to reload the region from region cache. @@ -1064,10 +1040,12 @@ func (s *eventFeedSession) handleError(ctx context.Context, errInfo regionErrorI case *sendRequestToStoreErr: metricStoreSendRequestErr.Inc() default: + //[TODO] Move all OnSendFail logic here + // We expect some unknown error to trigger RegionCache recheck its store state and change leader to peer to + // make some detection(peer may tell us where new leader is) + // RegionCache.OnSendFail is thread_safe inner. bo := tikv.NewBackoffer(ctx, tikvRequestMaxBackoff) - if errInfo.rpcCtx.Meta != nil { - s.regionCache.OnSendFail(bo, errInfo.rpcCtx, regionScheduleReload, err) - } + s.regionCache.OnSendFail(bo, errInfo.rpcCtx, regionScheduleReload, err) } failpoint.Inject("kvClientRegionReentrantErrorDelay", nil) @@ -1165,6 +1143,9 @@ func (s *eventFeedSession) receiveFromStream( zap.Uint64("storeID", storeID), zap.Error(err), ) + // Note that pd need at lease 10s+ to tag a kv node as disconnect if kv node down + // tikv raft need wait (raft-base-tick-interval * raft-election-timeout-ticks) 10s to start a new + // election } // Use the same delay mechanism as `stream.Send` error handling, since diff --git a/cdc/kv/client_test.go b/cdc/kv/client_test.go index 5917dbfb17c..d64b22d14b9 100644 --- a/cdc/kv/client_test.go +++ b/cdc/kv/client_test.go @@ -50,6 +50,7 @@ import ( "go.etcd.io/etcd/embed" "go.uber.org/zap" "google.golang.org/grpc" + "google.golang.org/grpc/keepalive" ) func Test(t *testing.T) { @@ -294,7 +295,18 @@ func newMockServiceSpecificAddr( lis, err := lc.Listen(ctx, "tcp", listenAddr) c.Assert(err, check.IsNil) addr = lis.Addr().String() - grpcServer = grpc.NewServer() + kaep := keepalive.EnforcementPolicy{ + MinTime: 60 * time.Second, + PermitWithoutStream: true, + } + kasp := keepalive.ServerParameters{ + MaxConnectionIdle: 60 * time.Second, // If a client is idle for 60 seconds, send a GOAWAY + MaxConnectionAge: 60 * time.Second, // If any connection is alive for more than 60 seconds, send a GOAWAY + MaxConnectionAgeGrace: 5 * time.Second, // Allow 5 seconds for pending RPCs to complete before forcibly closing connections + Time: 5 * time.Second, // Ping the client if it is idle for 5 seconds to ensure the connection is still active + Timeout: 1 * time.Second, // Wait 1 second for the ping ack before assuming the connection is dead + } + grpcServer = grpc.NewServer(grpc.KeepaliveEnforcementPolicy(kaep), grpc.KeepaliveParams(kasp)) cdcpb.RegisterChangeDataServer(grpcServer, srv) wg.Add(1) go func() { @@ -1682,10 +1694,7 @@ func (s *clientSuite) TestIncompatibleTiKV(c *check.C) { var genLock sync.Mutex nextVer := -1 call := int32(0) - // 20 here not too much, since check version itself has 3 time retry, and - // region cache could also call get store API, which will trigger version - // generator too. - versionGenCallBoundary := int32(20) + versionGenCallBoundary := int32(8) gen := func() string { genLock.Lock() defer genLock.Unlock() @@ -1739,8 +1748,16 @@ func (s *clientSuite) TestIncompatibleTiKV(c *check.C) { isPullInit := &mockPullerInit{} grpcPool := NewGrpcPoolImpl(ctx, &security.Credential{}) defer grpcPool.Close() +<<<<<<< HEAD cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool) eventCh := make(chan model.RegionFeedEvent, 10) +======= + regionCache := tikv.NewRegionCache(pdClient) + defer regionCache.Close() + cdcClient := NewCDCClient(ctx, pdClient, kvStorage, grpcPool, regionCache) + // NOTICE: eventCh may block the main logic of EventFeed + eventCh := make(chan model.RegionFeedEvent, 128) +>>>>>>> 82c4d68de (kvclient(ticdc): fix kvclient takes too long time to recover (#3612)) wg.Add(1) go func() { defer wg.Done() @@ -2119,7 +2136,6 @@ func (s *clientSuite) testEventCommitTsFallback(c *check.C, events []*cdcpb.Chan ch1 <- event } clientWg.Wait() - cancel() } @@ -2603,23 +2619,6 @@ func (s *clientSuite) TestOutOfRegionRangeEvent(c *check.C) { cancel() } -func (s *clientSuite) TestSingleRegionInfoClone(c *check.C) { - defer testleak.AfterTest(c)() - defer s.TearDownTest(c) - sri := newSingleRegionInfo( - tikv.RegionVerID{}, - regionspan.ComparableSpan{Start: []byte("a"), End: []byte("c")}, - 1000, &tikv.RPCContext{}) - sri2 := sri.partialClone() - sri2.ts = 2000 - sri2.span.End[0] = 'b' - c.Assert(sri.ts, check.Equals, uint64(1000)) - c.Assert(sri.span.String(), check.Equals, "[61, 63)") - c.Assert(sri2.ts, check.Equals, uint64(2000)) - c.Assert(sri2.span.String(), check.Equals, "[61, 62)") - c.Assert(sri2.rpcCtx, check.DeepEquals, &tikv.RPCContext{}) -} - // TestResolveLockNoCandidate tests the resolved ts manager can work normally // when no region exceeds reslove lock interval, that is what candidate means. func (s *clientSuite) TestResolveLockNoCandidate(c *check.C) { @@ -2941,6 +2940,7 @@ func (s *clientSuite) testKVClientForceReconnect(c *check.C) { server1Stopped <- struct{}{} }() for { + // Currently no msg more than 60s will cause a GoAway msg to end the connection _, err := server.Recv() if err != nil { log.Error("mock server error", zap.Error(err)) @@ -2989,6 +2989,7 @@ func (s *clientSuite) testKVClientForceReconnect(c *check.C) { initialized := mockInitializedEvent(regionID3, currentRequestID()) ch1 <- initialized + // Connection close for timeout <-server1Stopped var requestIds sync.Map diff --git a/cdc/kv/region_worker.go b/cdc/kv/region_worker.go index b92d06dff07..fa79e59e434 100644 --- a/cdc/kv/region_worker.go +++ b/cdc/kv/region_worker.go @@ -784,9 +784,8 @@ func (w *regionWorker) evictAllRegions() error { } state.markStopped() w.delRegionState(state.sri.verID.GetID()) - singleRegionInfo := state.sri.partialClone() - if state.lastResolvedTs > singleRegionInfo.ts { - singleRegionInfo.ts = state.lastResolvedTs + if state.lastResolvedTs > state.sri.ts { + state.sri.ts = state.lastResolvedTs } revokeToken := !state.initialized state.lock.Unlock() @@ -794,7 +793,7 @@ func (w *regionWorker) evictAllRegions() error { // region worker exits, we must use the parent context to prevent // regionErrorInfo loss. err = w.session.onRegionFail(w.parentCtx, regionErrorInfo{ - singleRegionInfo: singleRegionInfo, + singleRegionInfo: state.sri, err: cerror.ErrEventFeedAborted.FastGenByArgs(), }, revokeToken) return err == nil