Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kv/client: fix force reconnect in client v2 #1682

Merged
merged 11 commits into from
Apr 28, 2021
6 changes: 3 additions & 3 deletions cdc/kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,11 @@ const (
// failed region will be reloaded via `BatchLoadRegionsWithKeyRange` API. So we
// don't need to force reload region any more.
regionScheduleReload = false

// time interval to force kv client to terminate gRPC stream and reconnect
reconnectInterval = 15 * time.Minute
)

// time interval to force kv client to terminate gRPC stream and reconnect
var reconnectInterval = 15 * time.Minute

// hard code switch
// true: use kv client v2, which has a region worker for each stream
// false: use kv client v1, which runs a goroutine for every single region
Expand Down
179 changes: 179 additions & 0 deletions cdc/kv/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2841,3 +2841,182 @@ func (s *etcdSuite) TestKVClientForceReconnect(c *check.C) {
enableKVClientV2 = true
s.testKVClientForceReconnect(c)
}

// TestKVClientForceReconnect2 tests force reconnect gRPC stream can work, this
// test mocks the reconnectInterval tool, and simulate un-initialized regions
// can be reconnected.
func (s *etcdSuite) TestKVClientForceReconnect2(c *check.C) {
defer testleak.AfterTest(c)()
defer s.TearDownTest(c)

ctx, cancel := context.WithCancel(context.Background())
wg := &sync.WaitGroup{}

server1Stopped := make(chan struct{})
ch1 := make(chan *cdcpb.ChangeDataEvent, 10)
srv1 := newMockChangeDataService(c, ch1)
server1, addr1 := newMockService(ctx, c, srv1, wg)
srv1.recvLoop = func(server cdcpb.ChangeData_EventFeedServer) {
defer func() {
close(ch1)
server1.Stop()
server1Stopped <- struct{}{}
}()
for {
_, err := server.Recv()
if err != nil {
log.Error("mock server error", zap.Error(err))
break
}
}
}

rpcClient, cluster, pdClient, err := mocktikv.NewTiKVAndPDClient("")
c.Assert(err, check.IsNil)
pdClient = &mockPDClient{Client: pdClient, versionGen: defaultVersionGen}
tiStore, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0)
c.Assert(err, check.IsNil)
kvStorage := newStorageWithCurVersionCache(tiStore, addr1)
defer kvStorage.Close() //nolint:errcheck

regionID3 := uint64(3)
cluster.AddStore(1, addr1)
cluster.Bootstrap(regionID3, []uint64{1}, []uint64{4}, 4)

err = failpoint.Enable("github.com/pingcap/ticdc/cdc/kv/kvClientReconnectInterval", "return(3)")
c.Assert(err, check.IsNil)
err = failpoint.Enable("github.com/pingcap/ticdc/cdc/kv/kvClientCheckUnInitRegionInterval", "return(3)")
c.Assert(err, check.IsNil)
defer func() {
_ = failpoint.Disable("github.com/pingcap/ticdc/cdc/kv/kvClientReconnectInterval")
_ = failpoint.Disable("github.com/pingcap/ticdc/cdc/kv/kvClientCheckUnInitRegionInterval")
}()
lockresolver := txnutil.NewLockerResolver(kvStorage)
isPullInit := &mockPullerInit{}
cdcClient := NewCDCClient(ctx, pdClient, kvStorage, &security.Credential{})
eventCh := make(chan *model.RegionFeedEvent, 10)
wg.Add(1)
go func() {
err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("c")}, 100, false, lockresolver, isPullInit, eventCh)
c.Assert(errors.Cause(err), check.Equals, context.Canceled)
cdcClient.Close() //nolint:errcheck
wg.Done()
}()

baseAllocatedID := currentRequestID()
waitRequestID(c, baseAllocatedID+1)
committed := &cdcpb.ChangeDataEvent{Events: []*cdcpb.Event{
{
RegionId: 3,
RequestId: currentRequestID(),
Event: &cdcpb.Event_Entries_{
Entries: &cdcpb.Event_Entries{
Entries: []*cdcpb.Event_Row{{
Type: cdcpb.Event_COMMITTED,
OpType: cdcpb.Event_Row_PUT,
Key: []byte("a"),
Value: []byte("b"),
StartTs: 105,
CommitTs: 115,
}},
},
},
},
}}
ch1 <- committed

<-server1Stopped

var requestIds sync.Map
ch2 := make(chan *cdcpb.ChangeDataEvent, 10)
srv2 := newMockChangeDataService(c, ch2)
srv2.recvLoop = func(server cdcpb.ChangeData_EventFeedServer) {
for {
req, err := server.Recv()
if err != nil {
log.Error("mock server error", zap.Error(err))
return
}
requestIds.Store(req.RegionId, req.RequestId)
}
}
// Reuse the same listen addresss as server 1 to simulate TiKV handles the
// gRPC stream terminate and reconnect.
server2, _ := newMockServiceSpecificAddr(ctx, c, srv2, addr1, wg)
defer func() {
close(ch2)
server2.Stop()
wg.Wait()
}()

// The second TiKV could start up slowly, which causes the kv client retries
// to TiKV for more than one time, so we can't determine the correct requestID
// here, we must use the real request ID received by TiKV server
err = retry.Run(time.Millisecond*300, 10, func() error {
_, ok := requestIds.Load(regionID3)
if ok {
return nil
}
return errors.New("waiting for kv client requests received by server")
})
c.Assert(err, check.IsNil)
requestID, _ := requestIds.Load(regionID3)

initialized := mockInitializedEvent(regionID3, requestID.(uint64))
ch2 <- initialized

resolved := &cdcpb.ChangeDataEvent{Events: []*cdcpb.Event{
{
RegionId: regionID3,
RequestId: requestID.(uint64),
Event: &cdcpb.Event_ResolvedTs{ResolvedTs: 135},
},
}}
ch2 <- resolved

expected := []*model.RegionFeedEvent{
{
Resolved: &model.ResolvedSpan{
Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("c")},
ResolvedTs: 100,
},
RegionID: regionID3,
},
{
Val: &model.RawKVEntry{
OpType: model.OpTypePut,
Key: []byte("a"),
Value: []byte("b"),
StartTs: 105,
CRTs: 115,
RegionID: 3,
},
RegionID: 3,
},
{
Resolved: &model.ResolvedSpan{
Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("c")},
ResolvedTs: 100,
},
RegionID: regionID3,
},
{
Resolved: &model.ResolvedSpan{
Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("c")},
ResolvedTs: 135,
},
RegionID: regionID3,
},
}

for _, expectedEv := range expected {
select {
case event := <-eventCh:
c.Assert(event, check.DeepEquals, expectedEv)
case <-time.After(time.Second):
c.Errorf("expected event %v not received", expectedEv)
}
}

cancel()
}
6 changes: 5 additions & 1 deletion cdc/kv/client_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,12 @@ func (s *eventFeedSession) sendRegionChangeEventV2(
}

state.start()
// Then spawn the goroutine to process messages of this region.
worker.setRegionState(event.RegionId, state)
// TODO: If a region doesn't receive any event from TiKV, this region
// can't be reconnected since the region state is not initialized.
worker.uninitRegions.Lock()
worker.uninitRegions.m[event.RegionId] = time.Now()
worker.uninitRegions.Unlock()

// send resolved event when starting a single event feed
select {
Expand Down
115 changes: 91 additions & 24 deletions cdc/kv/region_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,19 +108,30 @@ type regionWorker struct {
rtsManager *resolvedTsManager
rtsUpdateCh chan *regionResolvedTs

uninitRegions struct {
sync.Mutex
m map[uint64]time.Time
}

enableOldValue bool
storeAddr string
}

func newRegionWorker(s *eventFeedSession, limiter *rate.Limiter, addr string) *regionWorker {
worker := &regionWorker{
session: s,
limiter: limiter,
inputCh: make(chan *regionStatefulEvent, 1024),
outputCh: s.eventCh,
statesManager: newRegionStateManager(-1),
rtsManager: newResolvedTsManager(),
rtsUpdateCh: make(chan *regionResolvedTs, 1024),
session: s,
limiter: limiter,
inputCh: make(chan *regionStatefulEvent, 1024),
outputCh: s.eventCh,
statesManager: newRegionStateManager(-1),
rtsManager: newResolvedTsManager(),
rtsUpdateCh: make(chan *regionResolvedTs, 1024),
uninitRegions: struct {
sync.Mutex
m map[uint64]time.Time
}{
m: make(map[uint64]time.Time),
},
enableOldValue: s.enableOldValue,
storeAddr: addr,
}
Expand Down Expand Up @@ -180,6 +191,39 @@ func (w *regionWorker) handleSingleRegionError(ctx context.Context, err error, s
})
}

func (w *regionWorker) checkUnInitRegions(ctx context.Context) error {
checkInterval := time.Minute

failpoint.Inject("kvClientReconnectInterval", func(val failpoint.Value) {
reconnectInterval = time.Duration(val.(int)) * time.Second
})
failpoint.Inject("kvClientCheckUnInitRegionInterval", func(val failpoint.Value) {
checkInterval = time.Duration(val.(int)) * time.Second
})

ticker := time.NewTicker(checkInterval)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return errors.Trace(ctx.Err())
case <-ticker.C:
w.uninitRegions.Lock()
for regionID, lastReceivedEventTime := range w.uninitRegions.m {
sinceLastEvent := time.Since(lastReceivedEventTime)
if sinceLastEvent > reconnectInterval {
log.Warn("kv client reconnect triggered",
zap.Duration("duration", sinceLastEvent), zap.Uint64("region", regionID))
w.uninitRegions.Unlock()
return errReconnect
}
}
w.uninitRegions.Unlock()
}
}
}

func (w *regionWorker) resolveLock(ctx context.Context) error {
resolveLockInterval := 20 * time.Second
failpoint.Inject("kvClientResolveLockInterval", func(val failpoint.Value) {
Expand Down Expand Up @@ -234,9 +278,10 @@ func (w *regionWorker) resolveLock(ctx context.Context) error {
state.lock.RLock()
// recheck resolved ts from region state, which may be larger than that in resolved ts heap
sinceLastResolvedTs := currentTimeFromPD.Sub(oracle.GetTimeFromTS(state.lastResolvedTs))
if sinceLastResolvedTs >= resolveLockInterval && state.initialized {
if sinceLastResolvedTs >= resolveLockInterval {
if sinceLastResolvedTs > reconnectInterval {
log.Warn("kv client reconnect triggered", zap.Duration("duration", sinceLastResolvedTs))
state.lock.RUnlock()
return errReconnect
}
log.Warn("region not receiving resolved event from tikv or resolved ts is not pushing for too long time, try to resolve lock",
Expand Down Expand Up @@ -289,6 +334,11 @@ func (w *regionWorker) eventHandler(ctx context.Context) error {
event.state.lock.Lock()
if event.changeEvent != nil {
metricEventSize.Observe(float64(event.changeEvent.Event.Size()))
if !event.state.initialized {
w.uninitRegions.Lock()
w.uninitRegions.m[event.state.sri.verID.GetID()] = time.Now()
w.uninitRegions.Unlock()
}
switch x := event.changeEvent.Event.(type) {
case *cdcpb.Event_Entries_:
err = w.handleEventEntry(
Expand Down Expand Up @@ -332,25 +382,31 @@ func (w *regionWorker) eventHandler(ctx context.Context) error {
}
}

func (w *regionWorker) checkErrorReconnect(err error) error {
if errors.Cause(err) == errReconnect {
cancel, ok := w.session.getStreamCancel(w.storeAddr)
if ok {
// cancel the stream to trigger strem.Recv with context cancel error
// Note use context cancel is the only way to terminate a gRPC stream
cancel()
// Failover in stream.Recv has 0-100ms delay, the onRegionFail
// should be called after stream has been deleted. Add a delay here
// to avoid too frequent region rebuilt.
time.Sleep(time.Second)
}
// if stream is already deleted, just ignore errReconnect
return nil
}
return err
}

func (w *regionWorker) run(ctx context.Context) error {
wg, ctx := errgroup.WithContext(ctx)
wg.Go(func() error {
err := w.resolveLock(ctx)
if errors.Cause(err) == errReconnect {
cancel, ok := w.session.getStreamCancel(w.storeAddr)
if ok {
// cancel the stream to trigger strem.Recv with context cancel error
// Note use context cancel is the only way to terminate a gRPC stream
cancel()
// Failover in stream.Recv has 0-100ms delay, the onRegionFail
// should be called after stream has been deleted. Add a delay here
// to avoid too frequent region rebuilt.
time.Sleep(time.Second)
}
// if stream is already deleted, just ignore errReconnect
return nil
}
return err
return w.checkErrorReconnect(w.resolveLock(ctx))
})
wg.Go(func() error {
return w.checkErrorReconnect(w.checkUnInitRegions(ctx))
})
wg.Go(func() error {
return w.eventHandler(ctx)
Expand Down Expand Up @@ -390,6 +446,17 @@ func (w *regionWorker) handleEventEntry(
}
metricPullEventInitializedCounter.Inc()
state.initialized = true
select {
case w.rtsUpdateCh <- &regionResolvedTs{regionID: regionID, resolvedTs: state.sri.ts}:
default:
// rtsUpdateCh block often means too many regions are suffering
// lock resolve, the kv client status is not very healthy.
log.Warn("region is not upsert into rts manager", zap.Uint64("region-id", regionID))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is "upsert"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a special syntax in some RDBMS, which means insert or update, ref: https://en.wikipedia.org/wiki/Merge_(SQL)

}
w.uninitRegions.Lock()
delete(w.uninitRegions.m, regionID)
w.uninitRegions.Unlock()

cachedEvents := state.matcher.matchCachedRow()
for _, cachedEvent := range cachedEvents {
revent, err := assembleRowEvent(regionID, cachedEvent, w.enableOldValue)
Expand Down