From 7f5a772f9785f76730f9c8ce118c6d9c9c51f0bd Mon Sep 17 00:00:00 2001 From: amyangfei Date: Fri, 16 Apr 2021 18:57:51 +0800 Subject: [PATCH] kv/client: fix unstable unit test, where request id is not accurate (#1674) --- cdc/kv/client_test.go | 32 ++++++++++++++++++++++++++++---- 1 file changed, 28 insertions(+), 4 deletions(-) diff --git a/cdc/kv/client_test.go b/cdc/kv/client_test.go index 71a66e651c3..213cb6eb878 100644 --- a/cdc/kv/client_test.go +++ b/cdc/kv/client_test.go @@ -278,7 +278,7 @@ func newMockServiceSpecificAddr( // waitRequestID waits request ID larger than the given allocated ID func waitRequestID(c *check.C, allocatedID uint64) { - err := retry.Run(time.Millisecond*20, 10, func() error { + err := retry.Run(time.Millisecond*10, 20, func() error { if currentRequestID() > allocatedID { return nil } @@ -2741,8 +2741,20 @@ func (s *etcdSuite) testKVClientForceReconnect(c *check.C) { ch1 <- initialized <-server1Stopped + + var requestIds sync.Map ch2 := make(chan *cdcpb.ChangeDataEvent, 10) srv2 := newMockChangeDataService(c, ch2) + srv2.recvLoop = func(server cdcpb.ChangeData_EventFeedServer) { + for { + req, err := server.Recv() + if err != nil { + log.Error("mock server error", zap.Error(err)) + return + } + requestIds.Store(req.RegionId, req.RequestId) + } + } // Reuse the same listen addresss as server 1 to simulate TiKV handles the // gRPC stream terminate and reconnect. server2, _ := newMockServiceSpecificAddr(ctx, c, srv2, addr1, wg) @@ -2752,14 +2764,26 @@ func (s *etcdSuite) testKVClientForceReconnect(c *check.C) { wg.Wait() }() - waitRequestID(c, baseAllocatedID+2) - initialized = mockInitializedEvent(regionID3, currentRequestID()) + // The second TiKV could start up slowly, which causes the kv client retries + // to TiKV for more than one time, so we can't determine the correct requestID + // here, we must use the real request ID received by TiKV server + err = retry.Run(time.Millisecond*300, 10, func() error { + _, ok := requestIds.Load(regionID3) + if ok { + return nil + } + return errors.New("waiting for kv client requests received by server") + }) + c.Assert(err, check.IsNil) + requestID, _ := requestIds.Load(regionID3) + + initialized = mockInitializedEvent(regionID3, requestID.(uint64)) ch2 <- initialized resolved := &cdcpb.ChangeDataEvent{Events: []*cdcpb.Event{ { RegionId: regionID3, - RequestId: currentRequestID(), + RequestId: requestID.(uint64), Event: &cdcpb.Event_ResolvedTs{ResolvedTs: 135}, }, }}