Skip to content

Commit

Permalink
kv/client: fix force reconnect in client v2 (pingcap#1682)
Browse files Browse the repository at this point in the history
  • Loading branch information
amyangfei committed Apr 30, 2021
1 parent 8158b03 commit bc26818
Show file tree
Hide file tree
Showing 6 changed files with 453 additions and 96 deletions.
31 changes: 24 additions & 7 deletions cdc/kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,11 @@ const (
// failed region will be reloaded via `BatchLoadRegionsWithKeyRange` API. So we
// don't need to force reload region any more.
regionScheduleReload = false

// time interval to force kv client to terminate gRPC stream and reconnect
reconnectInterval = 15 * time.Minute
)

// time interval to force kv client to terminate gRPC stream and reconnect
var reconnectInterval = 15 * time.Minute

// hard code switch
// true: use kv client v2, which has a region worker for each stream
// false: use kv client v1, which runs a goroutine for every single region
Expand Down Expand Up @@ -168,6 +168,24 @@ func (s *regionFeedState) isStopped() bool {
return atomic.LoadInt32(&s.stopped) > 0
}

func (s *regionFeedState) isInitialized() bool {
s.lock.RLock()
defer s.lock.RUnlock()
return s.initialized
}

func (s *regionFeedState) getLastResolvedTs() uint64 {
s.lock.RLock()
defer s.lock.RUnlock()
return s.lastResolvedTs
}

func (s *regionFeedState) getRegionSpan() regionspan.ComparableSpan {
s.lock.RLock()
defer s.lock.RUnlock()
return s.sri.span
}

type syncRegionFeedStateMap struct {
mu *sync.Mutex
regionInfoMap map[uint64]*regionFeedState
Expand Down Expand Up @@ -1385,6 +1403,9 @@ func (s *eventFeedSession) singleEventFeed(
failpoint.Inject("kvClientResolveLockInterval", func(val failpoint.Value) {
resolveLockInterval = time.Duration(val.(int)) * time.Second
})
failpoint.Inject("kvClientReconnectInterval", func(val failpoint.Value) {
reconnectInterval = time.Duration(val.(int)) * time.Second
})

for {
var event *regionEvent
Expand All @@ -1393,10 +1414,6 @@ func (s *eventFeedSession) singleEventFeed(
case <-ctx.Done():
return lastResolvedTs, ctx.Err()
case <-advanceCheckTicker.C:
failpoint.Inject("kvClientForceReconnect", func() {
log.Warn("kv client reconnect triggered by failpoint")
failpoint.Return(lastResolvedTs, errReconnect)
})
if time.Since(startFeedTime) < resolveLockInterval {
continue
}
Expand Down
189 changes: 187 additions & 2 deletions cdc/kv/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2775,10 +2775,13 @@ func (s *etcdSuite) testKVClientForceReconnect(c *check.C) {
cluster.AddStore(1, addr1)
cluster.Bootstrap(regionID3, []uint64{1}, []uint64{4}, 4)

err = failpoint.Enable("github.com/pingcap/ticdc/cdc/kv/kvClientForceReconnect", "return(true)")
err = failpoint.Enable("github.com/pingcap/ticdc/cdc/kv/kvClientResolveLockInterval", "return(1)")
c.Assert(err, check.IsNil)
err = failpoint.Enable("github.com/pingcap/ticdc/cdc/kv/kvClientReconnectInterval", "return(3)")
c.Assert(err, check.IsNil)
defer func() {
_ = failpoint.Disable("github.com/pingcap/ticdc/cdc/kv/kvClientForceReconnect")
_ = failpoint.Disable("github.com/pingcap/ticdc/cdc/kv/kvClientResolveLockInterval")
_ = failpoint.Disable("github.com/pingcap/ticdc/cdc/kv/kvClientReconnectInterval")
}()
lockresolver := txnutil.NewLockerResolver(kvStorage.(tikv.Storage))
isPullInit := &mockPullerInit{}
Expand Down Expand Up @@ -2898,3 +2901,185 @@ func (s *etcdSuite) TestKVClientForceReconnect(c *check.C) {
enableKVClientV2 = true
s.testKVClientForceReconnect(c)
}

// TestKVClientForceReconnect2 tests force reconnect gRPC stream can work, this
// test mocks the reconnectInterval tool, and simulate un-initialized regions
// can be reconnected.
func (s *etcdSuite) TestKVClientForceReconnect2(c *check.C) {
defer testleak.AfterTest(c)()
defer s.TearDownTest(c)

ctx, cancel := context.WithCancel(context.Background())
wg := &sync.WaitGroup{}

server1Stopped := make(chan struct{})
ch1 := make(chan *cdcpb.ChangeDataEvent, 10)
srv1 := newMockChangeDataService(c, ch1)
server1, addr1 := newMockService(ctx, c, srv1, wg)
srv1.recvLoop = func(server cdcpb.ChangeData_EventFeedServer) {
defer func() {
close(ch1)
server1.Stop()
server1Stopped <- struct{}{}
}()
for {
_, err := server.Recv()
if err != nil {
log.Error("mock server error", zap.Error(err))
break
}
}
}

cluster := mocktikv.NewCluster()
mvccStore := mocktikv.MustNewMVCCStore()
rpcClient, pdClient, err := mocktikv.NewTiKVAndPDClient(cluster, mvccStore, "")
c.Assert(err, check.IsNil)
pdClient = &mockPDClient{Client: pdClient, versionGen: defaultVersionGen}
tiStore, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0)
c.Assert(err, check.IsNil)
kvStorage := newStorageWithCurVersionCache(tiStore, addr1)
defer kvStorage.Close() //nolint:errcheck

regionID3 := uint64(3)
cluster.AddStore(1, addr1)
cluster.Bootstrap(regionID3, []uint64{1}, []uint64{4}, 4)

err = failpoint.Enable("github.com/pingcap/ticdc/cdc/kv/kvClientReconnectInterval", "return(3)")
c.Assert(err, check.IsNil)
// check interval is less than reconnect interval, so we can test both the hit and miss case
err = failpoint.Enable("github.com/pingcap/ticdc/cdc/kv/kvClientCheckUnInitRegionInterval", "return(1)")
c.Assert(err, check.IsNil)
defer func() {
_ = failpoint.Disable("github.com/pingcap/ticdc/cdc/kv/kvClientReconnectInterval")
_ = failpoint.Disable("github.com/pingcap/ticdc/cdc/kv/kvClientCheckUnInitRegionInterval")
}()
lockresolver := txnutil.NewLockerResolver(kvStorage.(tikv.Storage))
isPullInit := &mockPullerInit{}
cdcClient := NewCDCClient(ctx, pdClient, kvStorage.(tikv.Storage), &security.Credential{})
eventCh := make(chan *model.RegionFeedEvent, 10)
wg.Add(1)
go func() {
err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("c")}, 100, false, lockresolver, isPullInit, eventCh)
c.Assert(errors.Cause(err), check.Equals, context.Canceled)
cdcClient.Close() //nolint:errcheck
wg.Done()
}()

baseAllocatedID := currentRequestID()
waitRequestID(c, baseAllocatedID+1)
committed := &cdcpb.ChangeDataEvent{Events: []*cdcpb.Event{
{
RegionId: 3,
RequestId: currentRequestID(),
Event: &cdcpb.Event_Entries_{
Entries: &cdcpb.Event_Entries{
Entries: []*cdcpb.Event_Row{{
Type: cdcpb.Event_COMMITTED,
OpType: cdcpb.Event_Row_PUT,
Key: []byte("a"),
Value: []byte("b"),
StartTs: 105,
CommitTs: 115,
}},
},
},
},
}}
ch1 <- committed

<-server1Stopped

var requestIds sync.Map
ch2 := make(chan *cdcpb.ChangeDataEvent, 10)
srv2 := newMockChangeDataService(c, ch2)
srv2.recvLoop = func(server cdcpb.ChangeData_EventFeedServer) {
for {
req, err := server.Recv()
if err != nil {
log.Error("mock server error", zap.Error(err))
return
}
requestIds.Store(req.RegionId, req.RequestId)
}
}
// Reuse the same listen addresss as server 1 to simulate TiKV handles the
// gRPC stream terminate and reconnect.
server2, _ := newMockServiceSpecificAddr(ctx, c, srv2, addr1, wg)
defer func() {
close(ch2)
server2.Stop()
wg.Wait()
}()

// The second TiKV could start up slowly, which causes the kv client retries
// to TiKV for more than one time, so we can't determine the correct requestID
// here, we must use the real request ID received by TiKV server
err = retry.Run(time.Millisecond*300, 10, func() error {
_, ok := requestIds.Load(regionID3)
if ok {
return nil
}
return errors.New("waiting for kv client requests received by server")
})
c.Assert(err, check.IsNil)
requestID, _ := requestIds.Load(regionID3)

initialized := mockInitializedEvent(regionID3, requestID.(uint64))
ch2 <- initialized

resolved := &cdcpb.ChangeDataEvent{Events: []*cdcpb.Event{
{
RegionId: regionID3,
RequestId: requestID.(uint64),
Event: &cdcpb.Event_ResolvedTs{ResolvedTs: 135},
},
}}
ch2 <- resolved

expected := []*model.RegionFeedEvent{
{
Resolved: &model.ResolvedSpan{
Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("c")},
ResolvedTs: 100,
},
RegionID: regionID3,
},
{
Val: &model.RawKVEntry{
OpType: model.OpTypePut,
Key: []byte("a"),
Value: []byte("b"),
StartTs: 105,
CRTs: 115,
RegionID: 3,
},
RegionID: 3,
},
{
Resolved: &model.ResolvedSpan{
Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("c")},
ResolvedTs: 100,
},
RegionID: regionID3,
},
{
Resolved: &model.ResolvedSpan{
Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("c")},
ResolvedTs: 135,
},
RegionID: regionID3,
},
}

for _, expectedEv := range expected {
select {
case event := <-eventCh:
c.Assert(event, check.DeepEquals, expectedEv)
case <-time.After(time.Second):
c.Errorf("expected event %v not received", expectedEv)
}
}

cancel()
}
8 changes: 7 additions & 1 deletion cdc/kv/client_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,10 @@ func (s *eventFeedSession) sendRegionChangeEventV2(
}

state.start()
// Then spawn the goroutine to process messages of this region.
worker.setRegionState(event.RegionId, state)
// TODO: If a region doesn't receive any event from TiKV, this region
// can't be reconnected since the region state is not initialized.
worker.notifyEvTimeUpdate(event.RegionId, false /* isDelete */)

// send resolved event when starting a single event feed
select {
Expand Down Expand Up @@ -198,6 +200,10 @@ func (s *eventFeedSession) receiveFromStreamV2(
s.workers[addr] = worker
s.workersLock.Unlock()

failpoint.Inject("kvClientReconnectInterval", func(val failpoint.Value) {
reconnectInterval = time.Duration(val.(int)) * time.Second
})

g.Go(func() error {
return worker.run(ctx)
})
Expand Down
Loading

0 comments on commit bc26818

Please sign in to comment.