Skip to content

Commit

Permalink
client: Fix the issue that TiKV reconnecting may make CDC stuck for a…
Browse files Browse the repository at this point in the history
… while (pingcap#531)
  • Loading branch information
MyonKeminta authored May 8, 2020
1 parent fa54e00 commit f8ba9c8
Show file tree
Hide file tree
Showing 2 changed files with 145 additions and 13 deletions.
30 changes: 17 additions & 13 deletions cdc/kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,18 +89,18 @@ type regionErrorInfo struct {
}

type regionFeedState struct {
sri singleRegionInfo
requestID uint64
eventCh chan *cdcpb.Event
stopped int32
sri singleRegionInfo
requestID uint64
regionEventCh chan *cdcpb.Event
stopped int32
}

func newRegionFeedState(sri singleRegionInfo, requestID uint64) *regionFeedState {
return &regionFeedState{
sri: sri,
requestID: requestID,
eventCh: make(chan *cdcpb.Event, 16),
stopped: 0,
sri: sri,
requestID: requestID,
regionEventCh: make(chan *cdcpb.Event, 16),
stopped: 0,
}
}

Expand Down Expand Up @@ -290,14 +290,16 @@ func (c *CDCClient) getConn(ctx context.Context, addr string) (*grpc.ClientConn,
}

func (c *CDCClient) newStream(ctx context.Context, addr string) (stream cdcpb.ChangeData_EventFeedClient, err error) {
err = retry.Run(50*time.Millisecond, 20, func() error {
err = retry.Run(50*time.Millisecond, 3, func() error {
conn, err := c.getConn(ctx, addr)
if err != nil {
log.Info("get connection to store failed, retry later", zap.String("addr", addr), zap.Error(err))
return errors.Trace(err)
}
client := cdcpb.NewChangeDataClient(conn)
stream, err = client.EventFeed(ctx)
if err != nil {
log.Info("establish stream to store failed, retry later", zap.String("addr", addr), zap.Error(err))
return errors.Trace(err)
}
log.Debug("created stream to store", zap.String("addr", addr))
Expand Down Expand Up @@ -617,6 +619,8 @@ MainLoop:
stream, ok := streams[rpcCtx.Addr]
// Establish the stream if it has not been connected yet.
if !ok {
log.Info("creating new stream to store to send request",
zap.Uint64("regionID", sri.verID.GetID()), zap.Uint64("requestID", requestID), zap.String("addr", rpcCtx.Addr))
stream, err = s.client.newStream(ctx, rpcCtx.Addr)
if err != nil {
// if get stream failed, maybe the store is down permanently, we should try to relocate the active store
Expand Down Expand Up @@ -705,7 +709,7 @@ func (s *eventFeedSession) partialRegionFeed(
ctx context.Context,
state *regionFeedState,
) error {
receiver := state.eventCh
receiver := state.regionEventCh
defer func() {
state.markStopped()
// Workaround to avoid remaining messages in the channel blocks the receiver thread.
Expand Down Expand Up @@ -913,7 +917,7 @@ func (s *eventFeedSession) receiveFromStream(
// TODO: Should we have better way to handle the errors?
if err == io.EOF {
for _, state := range regionStates {
close(state.eventCh)
close(state.regionEventCh)
}
return nil
}
Expand All @@ -926,7 +930,7 @@ func (s *eventFeedSession) receiveFromStream(

for _, state := range regionStates {
select {
case state.eventCh <- nil:
case state.regionEventCh <- nil:
case <-ctx.Done():
return ctx.Err()
}
Expand Down Expand Up @@ -991,7 +995,7 @@ func (s *eventFeedSession) receiveFromStream(
}

select {
case state.eventCh <- event:
case state.regionEventCh <- event:
case <-ctx.Done():
return ctx.Err()
}
Expand Down
128 changes: 128 additions & 0 deletions cdc/kv/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,21 @@ package kv

import (
"context"
"fmt"
"math/rand"
"net"
"sync"
"testing"
"time"

"github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/cdcpb"
"github.com/pingcap/ticdc/cdc/model"
"github.com/pingcap/ticdc/pkg/util"
"github.com/pingcap/tidb/store/mockstore/mocktikv"
"github.com/pingcap/tidb/store/tikv"
"google.golang.org/grpc"
)

func Test(t *testing.T) { check.TestingT(t) }
Expand Down Expand Up @@ -61,6 +70,125 @@ func (s *clientSuite) TestUpdateCheckpointTS(c *check.C) {
c.Assert(checkpointTS, check.Equals, maxValue)
}

type mockChangeDataService struct {
c *check.C
ch chan *cdcpb.ChangeDataEvent
}

func (s *mockChangeDataService) EventFeed(server cdcpb.ChangeData_EventFeedServer) error {
req, err := server.Recv()
s.c.Assert(err, check.IsNil)
initialized := &cdcpb.ChangeDataEvent{
Events: []*cdcpb.Event{
{
RegionId: req.RegionId,
RequestId: req.RequestId,
Event: &cdcpb.Event_Entries_{
Entries: &cdcpb.Event_Entries{
Entries: []*cdcpb.Event_Row{
{
Type: cdcpb.Event_INITIALIZED,
},
},
},
},
},
},
}
err = server.Send(initialized)
s.c.Assert(err, check.IsNil)
for e := range s.ch {
for _, event := range e.Events {
event.RequestId = req.RequestId
}
err := server.Send(e)
s.c.Assert(err, check.IsNil)
}
return nil
}

func newMockService(c *check.C, port int, ch chan *cdcpb.ChangeDataEvent, wg *sync.WaitGroup) *grpc.Server {
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
c.Assert(err, check.IsNil)
grpcServer := grpc.NewServer()
mockService := &mockChangeDataService{c: c, ch: ch}
cdcpb.RegisterChangeDataServer(grpcServer, mockService)
wg.Add(1)
go func() {
err := grpcServer.Serve(lis)
c.Assert(err, check.IsNil)
wg.Done()
}()
return grpcServer
}

// Use etcdSuite to workaround the race. See comments of `TestConnArray`.
func (s *etcdSuite) TestConnectOfflineTiKV(c *check.C) {
wg := &sync.WaitGroup{}
ch2 := make(chan *cdcpb.ChangeDataEvent, 10)
server2 := newMockService(c, 23376, ch2, wg)
defer func() {
close(ch2)
server2.Stop()
wg.Wait()
}()

cluster := mocktikv.NewCluster()
rpcClient, pdClient, err := mocktikv.NewTiKVAndPDClient(cluster, nil, "")
c.Assert(err, check.IsNil)
kvStorage, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0)
c.Assert(err, check.IsNil)

cluster.AddStore(1, "localhost:23375")
cluster.AddStore(2, "localhost:23376")
cluster.Bootstrap(3, []uint64{1, 2}, []uint64{4, 5}, 4)

cdcClient, err := NewCDCClient(pdClient, kvStorage.(tikv.Storage))
c.Assert(err, check.IsNil)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
eventCh := make(chan *model.RegionFeedEvent, 10)
wg.Add(1)
go func() {
err := cdcClient.EventFeed(ctx, util.Span{Start: []byte("a"), End: []byte("b")}, 1, eventCh)
c.Assert(errors.Cause(err), check.Equals, context.Canceled)
wg.Done()
}()

makeEvent := func(ts uint64) *cdcpb.ChangeDataEvent {
return &cdcpb.ChangeDataEvent{
Events: []*cdcpb.Event{
{
RegionId: 3,
Event: &cdcpb.Event_ResolvedTs{
ResolvedTs: ts,
},
},
},
}
}

checkEvent := func(event *model.RegionFeedEvent, ts uint64) {
c.Assert(event.Resolved.ResolvedTs, check.Equals, ts)
}

time.Sleep(time.Millisecond * 10)
cluster.ChangeLeader(3, 5)

ts, err := kvStorage.CurrentVersion()
c.Assert(err, check.IsNil)
ch2 <- makeEvent(ts.Ver)
var event *model.RegionFeedEvent
select {
case event = <-eventCh:
case <-time.After(time.Second):
c.Fatalf("reconnection not succeed in 1 second")
}

checkEvent(event, ts.Ver)
cancel()
}

// Use etcdSuite for some special reasons, the embed etcd uses zap as the only candidate
// logger and in the logger initializtion it also initializes the grpclog/loggerv2, which
// is not a thread-safe operation and it must be called before any gRPC functions
Expand Down

0 comments on commit f8ba9c8

Please sign in to comment.