Skip to content

Commit

Permalink
kv/client: fix unstable unit test, where request id is not accurate (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
amyangfei authored Apr 16, 2021
1 parent 2277431 commit 7f5a772
Showing 1 changed file with 28 additions and 4 deletions.
32 changes: 28 additions & 4 deletions cdc/kv/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ func newMockServiceSpecificAddr(

// waitRequestID waits request ID larger than the given allocated ID
func waitRequestID(c *check.C, allocatedID uint64) {
err := retry.Run(time.Millisecond*20, 10, func() error {
err := retry.Run(time.Millisecond*10, 20, func() error {
if currentRequestID() > allocatedID {
return nil
}
Expand Down Expand Up @@ -2741,8 +2741,20 @@ func (s *etcdSuite) testKVClientForceReconnect(c *check.C) {
ch1 <- initialized

<-server1Stopped

var requestIds sync.Map
ch2 := make(chan *cdcpb.ChangeDataEvent, 10)
srv2 := newMockChangeDataService(c, ch2)
srv2.recvLoop = func(server cdcpb.ChangeData_EventFeedServer) {
for {
req, err := server.Recv()
if err != nil {
log.Error("mock server error", zap.Error(err))
return
}
requestIds.Store(req.RegionId, req.RequestId)
}
}
// Reuse the same listen addresss as server 1 to simulate TiKV handles the
// gRPC stream terminate and reconnect.
server2, _ := newMockServiceSpecificAddr(ctx, c, srv2, addr1, wg)
Expand All @@ -2752,14 +2764,26 @@ func (s *etcdSuite) testKVClientForceReconnect(c *check.C) {
wg.Wait()
}()

waitRequestID(c, baseAllocatedID+2)
initialized = mockInitializedEvent(regionID3, currentRequestID())
// The second TiKV could start up slowly, which causes the kv client retries
// to TiKV for more than one time, so we can't determine the correct requestID
// here, we must use the real request ID received by TiKV server
err = retry.Run(time.Millisecond*300, 10, func() error {
_, ok := requestIds.Load(regionID3)
if ok {
return nil
}
return errors.New("waiting for kv client requests received by server")
})
c.Assert(err, check.IsNil)
requestID, _ := requestIds.Load(regionID3)

initialized = mockInitializedEvent(regionID3, requestID.(uint64))
ch2 <- initialized

resolved := &cdcpb.ChangeDataEvent{Events: []*cdcpb.Event{
{
RegionId: regionID3,
RequestId: currentRequestID(),
RequestId: requestID.(uint64),
Event: &cdcpb.Event_ResolvedTs{ResolvedTs: 135},
},
}}
Expand Down

0 comments on commit 7f5a772

Please sign in to comment.