Skip to content

Commit

Permalink
kv-client(cdc): correct conditions of canceling grpc streams (#10237) (
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Dec 4, 2023
1 parent 011f81a commit 0050e31
Show file tree
Hide file tree
Showing 12 changed files with 98 additions and 46 deletions.
11 changes: 6 additions & 5 deletions cdc/kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,9 +378,10 @@ func newEventFeedSession(
startTs uint64,
eventCh chan<- model.RegionFeedEvent,
) *eventFeedSession {
id := strconv.FormatUint(allocID(), 10)
id := allocID()
idStr := strconv.FormatUint(id, 10)
rangeLock := regionlock.NewRegionRangeLock(
totalSpan.StartKey, totalSpan.EndKey, startTs,
id, totalSpan.StartKey, totalSpan.EndKey, startTs,
client.changefeed.Namespace+"."+client.changefeed.ID)
return &eventFeedSession{
client: client,
Expand All @@ -393,7 +394,7 @@ func newEventFeedSession(
eventCh: eventCh,
rangeLock: rangeLock,
lockResolver: lockResolver,
id: id,
id: idStr,
regionChSizeGauge: clientChannelSize.WithLabelValues("region"),
errChSizeGauge: clientChannelSize.WithLabelValues("err"),
rangeChSizeGauge: clientChannelSize.WithLabelValues("range"),
Expand Down Expand Up @@ -1015,7 +1016,7 @@ func (s *eventFeedSession) receiveFromStream(

// always create a new region worker, because `receiveFromStream` is ensured
// to call exactly once from outer code logic
worker := newRegionWorker(parentCtx, s.changefeed, s, addr)
worker := newRegionWorker(parentCtx, s.changefeed, s, addr, pendingRegions)
defer worker.evictAllRegions()

ctx, cancel := context.WithCancel(parentCtx)
Expand Down Expand Up @@ -1061,7 +1062,7 @@ func (s *eventFeedSession) receiveFromStream(
})
if err != nil {
if status.Code(errors.Cause(err)) == codes.Canceled {
log.Debug(
log.Info(
"receive from stream canceled",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
Expand Down
5 changes: 5 additions & 0 deletions cdc/kv/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,11 @@ var (
[]string{"namespace", "changefeed"})
)

// GetGlobalGrpcMetrics gets the global grpc metrics.
func GetGlobalGrpcMetrics() *grpc_prometheus.ClientMetrics {
return grpcMetrics
}

// InitMetrics registers all metrics in the kv package
func InitMetrics(registry *prometheus.Registry) {
registry.MustRegister(eventFeedErrorCounter)
Expand Down
7 changes: 6 additions & 1 deletion cdc/kv/region_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ type regionWorker struct {

// how many pending input events
inputPending int32

pendingRegions *syncRegionFeedStateMap
}

func newRegionWorkerMetrics(changefeedID model.ChangeFeedID) *regionWorkerMetrics {
Expand Down Expand Up @@ -146,6 +148,7 @@ func newRegionWorkerMetrics(changefeedID model.ChangeFeedID) *regionWorkerMetric

func newRegionWorker(
ctx context.Context, changefeedID model.ChangeFeedID, s *eventFeedSession, addr string,
pendingRegions *syncRegionFeedStateMap,
) *regionWorker {
return &regionWorker{
parentCtx: ctx,
Expand All @@ -160,6 +163,8 @@ func newRegionWorker(
concurrency: int(s.client.config.KVClient.WorkerConcurrent),
metrics: newRegionWorkerMetrics(changefeedID),
inputPending: 0,

pendingRegions: pendingRegions,
}
}

Expand Down Expand Up @@ -195,7 +200,7 @@ func (w *regionWorker) checkShouldExit() error {
empty := w.checkRegionStateEmpty()
// If there is no region maintained by this region worker, exit it and
// cancel the gRPC stream.
if empty {
if empty && w.pendingRegions.len() == 0 {
w.cancelStream(time.Duration(0))
return cerror.ErrRegionWorkerExit.GenWithStackByArgs()
}
Expand Down
4 changes: 2 additions & 2 deletions cdc/kv/region_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func TestRegionWokerHandleEventEntryEventOutOfOrder(t *testing.T) {
&tikv.RPCContext{}), 0)
state.sri.lockedRange = &regionlock.LockedRange{}
state.start()
worker := newRegionWorker(ctx, model.ChangeFeedID{}, s, "")
worker := newRegionWorker(ctx, model.ChangeFeedID{}, s, "", newSyncRegionFeedStateMap())
require.Equal(t, 2, cap(worker.outputCh))

// Receive prewrite2 with empty value.
Expand Down Expand Up @@ -322,7 +322,7 @@ func TestRegionWorkerHandleEventsBeforeStartTs(t *testing.T) {
s1.sri.lockedRange = &regionlock.LockedRange{}
s1.sri.lockedRange.CheckpointTs.Store(9)
s1.start()
w := newRegionWorker(ctx, model.ChangeFeedID{}, s, "")
w := newRegionWorker(ctx, model.ChangeFeedID{}, s, "", newSyncRegionFeedStateMap())

err := w.handleResolvedTs(ctx, &resolvedTsEvent{
resolvedTs: 5,
Expand Down
9 changes: 2 additions & 7 deletions cdc/kv/regionlock/region_range_lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,12 +140,6 @@ func (e *rangeLockEntry) String() string {
len(e.waiters))
}

var currentID uint64 = 0

func allocID() uint64 {
return atomic.AddUint64(&currentID, 1)
}

// RegionRangeLock is specifically used for kv client to manage exclusive region ranges. Acquiring lock will be blocked
// if part of its range is already locked. It also manages checkpoint ts of all ranges. The ranges are marked by a
// version number, which should comes from the Region's Epoch version. The version is used to compare which range is
Expand All @@ -166,10 +160,11 @@ type RegionRangeLock struct {

// NewRegionRangeLock creates a new RegionRangeLock.
func NewRegionRangeLock(
id uint64,
startKey, endKey []byte, startTs uint64, changefeedLogInfo string,
) *RegionRangeLock {
return &RegionRangeLock{
id: allocID(),
id: id,
totalSpan: tablepb.Span{StartKey: startKey, EndKey: endKey},
changefeedLogInfo: changefeedLogInfo,
rangeCheckpointTs: newRangeTsMap(startKey, endKey, startTs),
Expand Down
8 changes: 4 additions & 4 deletions cdc/kv/regionlock/region_range_lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func TestRegionRangeLock(t *testing.T) {
t.Parallel()

ctx := context.TODO()
l := NewRegionRangeLock([]byte("a"), []byte("h"), math.MaxUint64, "")
l := NewRegionRangeLock(1, []byte("a"), []byte("h"), math.MaxUint64, "")
mustLockRangeSuccess(ctx, t, l, "a", "e", 1, 1, math.MaxUint64)
unlockRange(l, "a", "e", 1, 1, 100)

Expand All @@ -107,7 +107,7 @@ func TestRegionRangeLock(t *testing.T) {
func TestRegionRangeLockStale(t *testing.T) {
t.Parallel()

l := NewRegionRangeLock([]byte("a"), []byte("z"), math.MaxUint64, "")
l := NewRegionRangeLock(1, []byte("a"), []byte("z"), math.MaxUint64, "")
ctx := context.TODO()
mustLockRangeSuccess(ctx, t, l, "c", "g", 1, 10, math.MaxUint64)
mustLockRangeSuccess(ctx, t, l, "j", "n", 2, 8, math.MaxUint64)
Expand All @@ -130,7 +130,7 @@ func TestRegionRangeLockLockingRegionID(t *testing.T) {
t.Parallel()

ctx := context.TODO()
l := NewRegionRangeLock([]byte("a"), []byte("z"), math.MaxUint64, "")
l := NewRegionRangeLock(1, []byte("a"), []byte("z"), math.MaxUint64, "")
mustLockRangeSuccess(ctx, t, l, "c", "d", 1, 10, math.MaxUint64)

mustLockRangeStale(ctx, t, l, "e", "f", 1, 5, "e", "f")
Expand Down Expand Up @@ -166,7 +166,7 @@ func TestRegionRangeLockCanBeCancelled(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithCancel(context.Background())
l := NewRegionRangeLock([]byte("a"), []byte("z"), math.MaxUint64, "")
l := NewRegionRangeLock(1, []byte("a"), []byte("z"), math.MaxUint64, "")
mustLockRangeSuccess(ctx, t, l, "g", "h", 1, 10, math.MaxUint64)
wait := mustLockRangeWait(ctx, t, l, "g", "h", 1, 12)
cancel()
Expand Down
6 changes: 3 additions & 3 deletions cdc/kv/shared_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ func (s *SharedClient) createRegionRequest(sri singleRegionInfo) *cdcpb.ChangeDa

func (s *SharedClient) appendRequest(r *requestedStore, sri singleRegionInfo) {
offset := r.nextStream.Add(1) % uint32(len(r.streams))
log.Debug("event feed will request a region",
log.Info("event feed will request a region",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Uint64("streamID", r.streams[offset].streamID),
Expand Down Expand Up @@ -578,7 +578,7 @@ func (s *SharedClient) handleError(ctx context.Context, errInfo regionErrorInfo)
switch eerr := err.(type) {
case *eventError:
innerErr := eerr.err
log.Debug("cdc error",
log.Info("cdc region error",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Any("subscriptionID", errInfo.requestedTable.subscriptionID),
Expand Down Expand Up @@ -746,7 +746,7 @@ func (s *SharedClient) newRequestedTable(
eventCh chan<- MultiplexingEvent,
) *requestedTable {
cfName := s.changefeed.String()
rangeLock := regionlock.NewRegionRangeLock(span.StartKey, span.EndKey, startTs, cfName)
rangeLock := regionlock.NewRegionRangeLock(uint64(subID), span.StartKey, span.EndKey, startTs, cfName)

rt := &requestedTable{
subscriptionID: subID,
Expand Down
2 changes: 1 addition & 1 deletion cdc/kv/shared_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func TestConnectToOfflineOrFailedTiKV(t *testing.T) {

pdClient = &mockPDClient{Client: pdClient, versionGen: defaultVersionGen}

grpcPool := sharedconn.NewConnAndClientPool(&security.Credential{})
grpcPool := sharedconn.NewConnAndClientPool(&security.Credential{}, nil)

regionCache := tikv.NewRegionCache(pdClient)

Expand Down
43 changes: 27 additions & 16 deletions cdc/kv/sharedconn/conn_and_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func StatusIsEOF(status *grpcstatus.Status) bool {
// ConnAndClientPool is a pool of ConnAndClient.
type ConnAndClientPool struct {
credential *security.Credential
grpcMetrics *grpc_prometheus.ClientMetrics
maxStreamsPerConn int

sync.Mutex
Expand Down Expand Up @@ -74,14 +75,23 @@ type connArray struct {
}

// NewConnAndClientPool creates a new ConnAndClientPool.
func NewConnAndClientPool(credential *security.Credential, maxStreamsPerConn ...int) *ConnAndClientPool {
return newConnAndClientPool(credential, 1000)
func NewConnAndClientPool(
credential *security.Credential,
grpcMetrics *grpc_prometheus.ClientMetrics,
maxStreamsPerConn ...int,
) *ConnAndClientPool {
return newConnAndClientPool(credential, grpcMetrics, 1000)
}

func newConnAndClientPool(credential *security.Credential, maxStreamsPerConn int) *ConnAndClientPool {
func newConnAndClientPool(
credential *security.Credential,
grpcMetrics *grpc_prometheus.ClientMetrics,
maxStreamsPerConn int,
) *ConnAndClientPool {
stores := make(map[string]*connArray, 64)
return &ConnAndClientPool{
credential: credential,
grpcMetrics: grpcMetrics,
maxStreamsPerConn: maxStreamsPerConn,
stores: stores,
}
Expand All @@ -105,7 +115,7 @@ func (c *ConnAndClientPool) Connect(ctx context.Context, addr string) (cc *ConnA

conns.Unlock()
var conn *Conn
if conn, err = conns.connect(ctx, c.credential); err != nil {
if conn, err = conns.connect(ctx); err != nil {
return
}
if conn != nil {
Expand Down Expand Up @@ -162,11 +172,11 @@ func (c *ConnAndClient) Release() {
}
}

func (c *connArray) connect(ctx context.Context, credential *security.Credential) (conn *Conn, err error) {
func (c *connArray) connect(ctx context.Context) (conn *Conn, err error) {
if c.inConnecting.CompareAndSwap(false, true) {
defer c.inConnecting.Store(false)
var clientConn *grpc.ClientConn
if clientConn, err = connect(ctx, credential, c.addr); err != nil {
if clientConn, err = c.pool.connect(ctx, c.addr); err != nil {
return
}

Expand Down Expand Up @@ -240,21 +250,17 @@ func (c *connArray) sort(locked bool) {
})
}

func connect(ctx context.Context, credential *security.Credential, target string) (*grpc.ClientConn, error) {
grpcTLSOption, err := credential.ToGRPCDialOption()
func (c *ConnAndClientPool) connect(ctx context.Context, target string) (*grpc.ClientConn, error) {
grpcTLSOption, err := c.credential.ToGRPCDialOption()
if err != nil {
return nil, err
}

return grpc.DialContext(
ctx,
target,
dialOptions := []grpc.DialOption{
grpcTLSOption,
grpc.WithInitialWindowSize(grpcInitialWindowSize),
grpc.WithInitialConnWindowSize(grpcInitialConnWindowSize),
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(grpcMaxCallRecvMsgSize)),
grpc.WithUnaryInterceptor(grpcMetrics.UnaryClientInterceptor()),
grpc.WithStreamInterceptor(grpcMetrics.StreamClientInterceptor()),
grpc.WithConnectParams(grpc.ConnectParams{
Backoff: backoff.Config{
BaseDelay: time.Second,
Expand All @@ -269,7 +275,14 @@ func connect(ctx context.Context, credential *security.Credential, target string
Timeout: 3 * time.Second,
PermitWithoutStream: true,
}),
)
}

if c.grpcMetrics != nil {
dialOptions = append(dialOptions, grpc.WithUnaryInterceptor(c.grpcMetrics.UnaryClientInterceptor()))
dialOptions = append(dialOptions, grpc.WithStreamInterceptor(c.grpcMetrics.StreamClientInterceptor()))
}

return grpc.DialContext(ctx, target, dialOptions...)
}

const (
Expand All @@ -290,8 +303,6 @@ const (
rpcMetaFeatureStreamMultiplexing string = "stream-multiplexing"
)

var grpcMetrics = grpc_prometheus.NewClientMetrics()

func getContextFromFeatures(ctx context.Context, features []string) context.Context {
return metadata.NewOutgoingContext(
ctx,
Expand Down
45 changes: 40 additions & 5 deletions cdc/kv/sharedconn/conn_and_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func TestConnAndClientPool(t *testing.T) {
require.NotNil(t, svc)
defer svc.GracefulStop()

pool := newConnAndClientPool(&security.Credential{}, 2)
pool := newConnAndClientPool(&security.Credential{}, nil, 2)
cc1, err := pool.Connect(context.Background(), addr)
require.Nil(t, err)
require.NotNil(t, cc1)
Expand Down Expand Up @@ -95,7 +95,7 @@ func TestConnAndClientPoolForV2(t *testing.T) {
require.NotNil(t, svc)
defer svc.GracefulStop()

pool := newConnAndClientPool(&security.Credential{}, 2)
pool := newConnAndClientPool(&security.Credential{}, nil, 2)
cc1, err := pool.Connect(context.Background(), addr)
require.Nil(t, err)
require.NotNil(t, cc1)
Expand All @@ -106,11 +106,12 @@ func TestConnAndClientPoolForV2(t *testing.T) {
}

func TestConnectToUnavailable(t *testing.T) {
pool := newConnAndClientPool(&security.Credential{}, nil, 1)

targets := []string{"127.0.0.1:9999", "2.2.2.2:9999"}
for _, target := range targets {
ctx := context.Background()

conn, err := connect(ctx, &security.Credential{}, target)
conn, err := pool.connect(ctx, target)
require.NotNil(t, conn)
require.Nil(t, err)

Expand All @@ -136,7 +137,7 @@ func TestConnectToUnavailable(t *testing.T) {
require.NotNil(t, svc)
defer svc.GracefulStop()

conn, err := connect(context.Background(), &security.Credential{}, addr)
conn, err := pool.connect(context.Background(), addr)
require.NotNil(t, conn)
require.Nil(t, err)

Expand All @@ -151,6 +152,40 @@ func TestConnectToUnavailable(t *testing.T) {
require.Nil(t, conn.Close())
}

func TestCancelStream(t *testing.T) {
service := make(chan *grpc.Server, 1)
var addr string
var wg sync.WaitGroup
defer wg.Wait()
wg.Add(1)
go func() {
defer wg.Done()
require.Nil(t, runGrpcService(&srv{}, &addr, service))
}()

svc := <-service
require.NotNil(t, svc)
defer svc.GracefulStop()

connCtx, connCancel := context.WithCancel(context.Background())
defer connCancel()

pool := newConnAndClientPool(&security.Credential{}, nil, 1)
conn, err := pool.connect(connCtx, addr)
require.NotNil(t, conn)
require.Nil(t, err)

rpcCtx, rpcCancel := context.WithCancel(context.Background())
rpc := cdcpb.NewChangeDataClient(conn)
client, err := rpc.EventFeed(rpcCtx)
require.Nil(t, err)

rpcCancel()
_, err = client.Recv()
require.Equal(t, grpccodes.Canceled, grpcstatus.Code(err))
require.Nil(t, conn.Close())
}

func runGrpcService(srv cdcpb.ChangeDataServer, addr *string, service chan<- *grpc.Server) error {
defer close(service)
lis, err := net.Listen("tcp", "127.0.0.1:0")
Expand Down
Loading

0 comments on commit 0050e31

Please sign in to comment.