Skip to content

Commit

Permalink
cdc: cherry picks pingcap#9982 and pingcap#10054 (pingcap#10165)
Browse files Browse the repository at this point in the history
  • Loading branch information
hicqu authored Nov 28, 2023
1 parent 8e71949 commit af9cb77
Show file tree
Hide file tree
Showing 11 changed files with 487 additions and 225 deletions.
147 changes: 108 additions & 39 deletions cdc/kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"io"
"math/rand"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
Expand All @@ -40,6 +41,7 @@ import (
"github.com/pingcap/tiflow/pkg/version"
"github.com/prometheus/client_golang/prometheus"
tidbkv "github.com/tikv/client-go/v2/kv"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikv"
pd "github.com/tikv/pd/client"
"go.uber.org/zap"
Expand Down Expand Up @@ -69,6 +71,8 @@ const (
// failed region will be reloaded via `BatchLoadRegionsWithKeyRange` API. So we
// don't need to force reload region anymore.
regionScheduleReload = false

resolveLockMinInterval = 10 * time.Second
)

// time interval to force kv client to terminate gRPC stream and reconnect
Expand Down Expand Up @@ -248,7 +252,8 @@ func (c *CDCClient) newStream(ctx context.Context, addr string, storeID uint64)
return nil
}
if c.config.Debug.EnableKVConnectBackOff {
newStreamErr = retry.Do(ctx, streamFunc, retry.WithBackoffBaseDelay(500),
newStreamErr = retry.Do(ctx, streamFunc,
retry.WithBackoffBaseDelay(100),
retry.WithMaxTries(2),
retry.WithIsRetryableErr(cerror.IsRetryableError),
)
Expand All @@ -268,7 +273,7 @@ func (c *CDCClient) EventFeed(
eventCh chan<- model.RegionFeedEvent,
) error {
s := newEventFeedSession(c, span, lockResolver, ts, eventCh)
return s.eventFeed(ctx, ts)
return s.eventFeed(ctx)
}

// RegionCount returns the number of captured regions.
Expand Down Expand Up @@ -362,7 +367,6 @@ type eventFeedSession struct {

type rangeRequestTask struct {
span regionspan.ComparableSpan
ts uint64
}

func newEventFeedSession(
Expand All @@ -372,9 +376,10 @@ func newEventFeedSession(
startTs uint64,
eventCh chan<- model.RegionFeedEvent,
) *eventFeedSession {
id := strconv.FormatUint(allocID(), 10)
id := allocID()
idStr := strconv.FormatUint(id, 10)
rangeLock := regionspan.NewRegionRangeLock(
totalSpan.Start, totalSpan.End, startTs,
id, totalSpan.Start, totalSpan.End, startTs,
client.changefeed.Namespace+"."+client.changefeed.ID)
return &eventFeedSession{
client: client,
Expand All @@ -387,7 +392,7 @@ func newEventFeedSession(
eventCh: eventCh,
rangeLock: rangeLock,
lockResolver: lockResolver,
id: id,
id: idStr,
regionChSizeGauge: clientChannelSize.WithLabelValues("region"),
errChSizeGauge: clientChannelSize.WithLabelValues("err"),
rangeChSizeGauge: clientChannelSize.WithLabelValues("range"),
Expand All @@ -403,7 +408,7 @@ func newEventFeedSession(
}
}

func (s *eventFeedSession) eventFeed(ctx context.Context, ts uint64) error {
func (s *eventFeedSession) eventFeed(ctx context.Context) error {
s.requestRangeCh = chann.NewDrainableChann[rangeRequestTask]()
s.regionCh = chann.NewDrainableChann[singleRegionInfo]()
s.regionRouter = chann.NewDrainableChann[singleRegionInfo]()
Expand All @@ -420,13 +425,11 @@ func (s *eventFeedSession) eventFeed(ctx context.Context, ts uint64) error {

g, ctx := errgroup.WithContext(ctx)

g.Go(func() error {
return s.dispatchRequest(ctx)
})
g.Go(func() error { return s.dispatchRequest(ctx) })

g.Go(func() error {
return s.requestRegionToStore(ctx, g)
})
g.Go(func() error { return s.requestRegionToStore(ctx, g) })

g.Go(func() error { return s.logSlowRegions(ctx) })

g.Go(func() error {
for {
Expand All @@ -444,7 +447,7 @@ func (s *eventFeedSession) eventFeed(ctx context.Context, ts uint64) error {
// Besides the count or frequency of range request is limited,
// we use ephemeral goroutine instead of permanent goroutine.
g.Go(func() error {
return s.divideAndSendEventFeedToRegions(ctx, task.span, task.ts)
return s.divideAndSendEventFeedToRegions(ctx, task.span)
})
}
}
Expand All @@ -465,24 +468,26 @@ func (s *eventFeedSession) eventFeed(ctx context.Context, ts uint64) error {
}
})

s.requestRangeCh.In() <- rangeRequestTask{span: s.totalSpan, ts: ts}
s.requestRangeCh.In() <- rangeRequestTask{span: s.totalSpan}
s.rangeChSizeGauge.Inc()

log.Info("event feed started",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Int64("tableID", s.tableID),
zap.String("tableName", s.tableName),
zap.Uint64("startTs", ts),
zap.Uint64("startTs", s.startTs),
zap.Stringer("span", s.totalSpan))

return g.Wait()
}

// scheduleDivideRegionAndRequest schedules a range to be divided by regions, and these regions will be then scheduled
// to send ChangeData requests.
func (s *eventFeedSession) scheduleDivideRegionAndRequest(ctx context.Context, span regionspan.ComparableSpan, ts uint64) {
task := rangeRequestTask{span: span, ts: ts}
// scheduleDivideRegionAndRequest schedules a range to be divided by regions,
// and these regions will be then scheduled to send ChangeData requests.
func (s *eventFeedSession) scheduleDivideRegionAndRequest(
ctx context.Context, span regionspan.ComparableSpan,
) {
task := rangeRequestTask{span: span}
select {
case s.requestRangeCh.In() <- task:
s.rangeChSizeGauge.Inc()
Expand All @@ -496,7 +501,7 @@ func (s *eventFeedSession) scheduleRegionRequest(ctx context.Context, sri single
handleResult := func(res regionspan.LockRangeResult) {
switch res.Status {
case regionspan.LockRangeStatusSuccess:
sri.resolvedTs = res.CheckpointTs
sri.lockedRange = res.LockedRange
select {
case s.regionCh.In() <- sri:
s.regionChSizeGauge.Inc()
Expand All @@ -508,12 +513,11 @@ func (s *eventFeedSession) scheduleRegionRequest(ctx context.Context, sri single
zap.String("changefeed", s.changefeed.ID),
zap.Uint64("regionID", sri.verID.GetID()),
zap.Stringer("span", sri.span),
zap.Uint64("resolvedTs", sri.resolvedTs),
zap.Any("retrySpans", res.RetryRanges))
for _, r := range res.RetryRanges {
// This call is always blocking, otherwise if scheduling in a new
// goroutine, it won't block the caller of `schedulerRegionRequest`.
s.scheduleDivideRegionAndRequest(ctx, r, sri.resolvedTs)
s.scheduleDivideRegionAndRequest(ctx, r)
}
case regionspan.LockRangeStatusCancel:
return
Expand All @@ -527,7 +531,7 @@ func (s *eventFeedSession) scheduleRegionRequest(ctx context.Context, sri single
// short sleep to wait region has split
time.Sleep(time.Second)
s.rangeLock.UnlockRange(sri.span.Start, sri.span.End,
sri.verID.GetID(), sri.verID.GetVer(), sri.resolvedTs)
sri.verID.GetID(), sri.verID.GetVer())
regionNum := val.(int)
retryRanges := make([]regionspan.ComparableSpan, 0, regionNum)
start := []byte("a")
Expand Down Expand Up @@ -556,7 +560,7 @@ func (s *eventFeedSession) scheduleRegionRequest(ctx context.Context, sri single
// CAUTION: Note that this should only be called in a context that the region has locked its range.
func (s *eventFeedSession) onRegionFail(ctx context.Context, errorInfo regionErrorInfo) {
s.rangeLock.UnlockRange(errorInfo.span.Start, errorInfo.span.End,
errorInfo.verID.GetID(), errorInfo.verID.GetVer(), errorInfo.resolvedTs)
errorInfo.verID.GetID(), errorInfo.verID.GetVer(), errorInfo.resolvedTs())
log.Info("region failed", zap.Stringer("span", &errorInfo.span),
zap.Any("regionId", errorInfo.verID.GetID()),
zap.Error(errorInfo.err))
Expand Down Expand Up @@ -606,7 +610,7 @@ func (s *eventFeedSession) requestRegionToStore(
RegionId: regionID,
RequestId: requestID,
RegionEpoch: regionEpoch,
CheckpointTs: sri.resolvedTs,
CheckpointTs: sri.resolvedTs(),
StartKey: sri.span.Start,
EndKey: sri.span.End,
ExtraOp: extraOp,
Expand Down Expand Up @@ -684,13 +688,13 @@ func (s *eventFeedSession) requestRegionToStore(
state := newRegionFeedState(sri, requestID)
pendingRegions.setByRequestID(requestID, state)

log.Debug("start new request",
log.Info("start new request",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Int64("tableID", s.tableID),
zap.String("tableName", s.tableName),
zap.String("addr", storeAddr),
zap.Any("request", req))
zap.Uint64("regionID", sri.verID.GetID()),
zap.String("addr", storeAddr))

err = stream.client.Send(req)

Expand Down Expand Up @@ -777,7 +781,7 @@ func (s *eventFeedSession) dispatchRequest(ctx context.Context) error {
Region: sri.verID.GetID(),
},
},
ResolvedTs: sri.resolvedTs,
ResolvedTs: sri.resolvedTs(),
},
}
select {
Expand All @@ -799,7 +803,7 @@ func (s *eventFeedSession) dispatchRequest(ctx context.Context) error {
zap.String("tableName", s.tableName),
zap.Uint64("regionID", sri.verID.GetID()),
zap.Stringer("span", sri.span),
zap.Uint64("resolvedTs", sri.resolvedTs))
zap.Uint64("resolvedTs", sri.resolvedTs()))
errInfo := newRegionErrorInfo(sri, &rpcCtxUnavailableErr{verID: sri.verID})
s.onRegionFail(ctx, errInfo)
continue
Expand All @@ -813,7 +817,7 @@ func (s *eventFeedSession) dispatchRequest(ctx context.Context) error {
// to region boundaries. When region merging happens, it's possible that it
// will produce some overlapping spans.
func (s *eventFeedSession) divideAndSendEventFeedToRegions(
ctx context.Context, span regionspan.ComparableSpan, ts uint64,
ctx context.Context, span regionspan.ComparableSpan,
) error {
limit := 20
nextSpan := span
Expand All @@ -826,7 +830,8 @@ func (s *eventFeedSession) divideAndSendEventFeedToRegions(
retryErr := retry.Do(ctx, func() error {
bo := tikv.NewBackoffer(ctx, tikvRequestMaxBackoff)
start := time.Now()
regions, err = s.client.regionCache.BatchLoadRegionsWithKeyRange(bo, nextSpan.Start, nextSpan.End, limit)
regions, err = s.client.regionCache.BatchLoadRegionsWithKeyRange(
bo, nextSpan.Start, nextSpan.End, limit)
scanRegionsDuration.Observe(time.Since(start).Seconds())
if err != nil {
return cerror.WrapError(cerror.ErrPDBatchLoadRegions, err)
Expand Down Expand Up @@ -855,15 +860,16 @@ func (s *eventFeedSession) divideAndSendEventFeedToRegions(

for _, tiRegion := range regions {
region := tiRegion.GetMeta()
partialSpan, err := regionspan.Intersect(s.totalSpan, regionspan.ComparableSpan{Start: region.StartKey, End: region.EndKey})
partialSpan, err := regionspan.Intersect(
s.totalSpan, regionspan.ComparableSpan{Start: region.StartKey, End: region.EndKey})
if err != nil {
return errors.Trace(err)
}
nextSpan.Start = region.EndKey
// the End key return by the PD API will be nil to represent the biggest key,
partialSpan = partialSpan.Hack()

sri := newSingleRegionInfo(tiRegion.VerID(), partialSpan, ts, nil)
sri := newSingleRegionInfo(tiRegion.VerID(), partialSpan, nil)
s.scheduleRegionRequest(ctx, sri)
// return if no more regions
if regionspan.EndCompare(nextSpan.Start, span.End) >= 0 {
Expand All @@ -882,17 +888,24 @@ func (s *eventFeedSession) handleError(ctx context.Context, errInfo regionErrorI
switch eerr := errors.Cause(err).(type) {
case *eventError:
innerErr := eerr.err
log.Info("cdc region error",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Int64("tableID", s.tableID),
zap.String("tableName", s.tableName),
zap.Stringer("error", innerErr))

if notLeader := innerErr.GetNotLeader(); notLeader != nil {
metricFeedNotLeaderCounter.Inc()
s.client.regionCache.UpdateLeader(errInfo.verID, notLeader.GetLeader(), errInfo.rpcCtx.AccessIdx)
} else if innerErr.GetEpochNotMatch() != nil {
// TODO: If only confver is updated, we don't need to reload the region from region cache.
metricFeedEpochNotMatchCounter.Inc()
s.scheduleDivideRegionAndRequest(ctx, errInfo.span, errInfo.resolvedTs)
s.scheduleDivideRegionAndRequest(ctx, errInfo.span)
return nil
} else if innerErr.GetRegionNotFound() != nil {
metricFeedRegionNotFoundCounter.Inc()
s.scheduleDivideRegionAndRequest(ctx, errInfo.span, errInfo.resolvedTs)
s.scheduleDivideRegionAndRequest(ctx, errInfo.span)
return nil
} else if duplicatedRequest := innerErr.GetDuplicateRequest(); duplicatedRequest != nil {
metricFeedDuplicateRequestCounter.Inc()
Expand Down Expand Up @@ -922,7 +935,7 @@ func (s *eventFeedSession) handleError(ctx context.Context, errInfo regionErrorI
}
case *rpcCtxUnavailableErr:
metricFeedRPCCtxUnavailable.Inc()
s.scheduleDivideRegionAndRequest(ctx, errInfo.span, errInfo.resolvedTs)
s.scheduleDivideRegionAndRequest(ctx, errInfo.span)
return nil
case *connectToStoreErr:
metricConnectToStoreErr.Inc()
Expand Down Expand Up @@ -1199,7 +1212,7 @@ func (s *eventFeedSession) sendRegionChangeEvents(
}
state.start()
worker.setRegionState(event.RegionId, state)
} else if state.isStopped() {
} else if state.isStale() {
log.Warn("drop event due to region feed stopped",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
Expand All @@ -1209,6 +1222,17 @@ func (s *eventFeedSession) sendRegionChangeEvents(
continue
}

switch x := event.Event.(type) {
case *cdcpb.Event_Error:
log.Info("event feed receives a region error",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Int64("tableID", s.tableID),
zap.String("tableName", s.tableName),
zap.Uint64("regionID", event.RegionId),
zap.Any("error", x.Error))
}

slot := worker.inputCalcSlot(event.RegionId)
statefulEvents[slot] = append(statefulEvents[slot], &regionStatefulEvent{
changeEvent: event,
Expand Down Expand Up @@ -1301,6 +1325,51 @@ func (s *eventFeedSession) getStreamCancel(storeAddr string) (cancel context.Can
return
}

func (s *eventFeedSession) logSlowRegions(ctx context.Context) error {
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
}

currTime := s.client.pdClock.CurrentTime()
attr := s.rangeLock.CollectLockedRangeAttrs(nil)
if attr.SlowestRegion.Initialized {
ckptTime := oracle.GetTimeFromTS(attr.SlowestRegion.CheckpointTs)
if currTime.Sub(ckptTime) > 2*resolveLockMinInterval {
log.Info("event feed finds a slow region",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Int64("tableID", s.tableID),
zap.String("tableName", s.tableName),
zap.Any("slowRegion", attr.SlowestRegion))
}
} else if currTime.Sub(attr.SlowestRegion.Created) > 10*time.Minute {
log.Info("event feed initializes a region too slow",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Int64("tableID", s.tableID),
zap.String("tableName", s.tableName),
zap.Any("slowRegion", attr.SlowestRegion))
}
if len(attr.Holes) > 0 {
holes := make([]string, 0, len(attr.Holes))
for _, hole := range attr.Holes {
holes = append(holes, fmt.Sprintf("[%s,%s)", hole.Start, hole.End))
}
log.Info("event feed holes exist",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Int64("tableID", s.tableID),
zap.String("tableName", s.tableName),
zap.String("holes", strings.Join(holes, ", ")))
}
}
}

func assembleRowEvent(regionID uint64, entry *cdcpb.Event_Row) (model.RegionFeedEvent, error) {
var opType model.OpType
switch entry.GetOpType() {
Expand Down
Loading

0 comments on commit af9cb77

Please sign in to comment.