Skip to content

Commit

Permalink
kv, puller (ticdc): eliminating unnecessary goroutines, streamlining …
Browse files Browse the repository at this point in the history
…the kvClient's operation, and reducing code complexity. (pingcap#10878)

ref pingcap#10879
  • Loading branch information
asddongmen authored Apr 9, 2024
1 parent fe379c2 commit 8a3b776
Show file tree
Hide file tree
Showing 13 changed files with 537 additions and 463 deletions.
81 changes: 54 additions & 27 deletions cdc/kv/region_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,33 +31,60 @@ const (
stateRemoved uint32 = 2
)

type singleRegionInfo struct {
verID tikv.RegionVerID
type regionInfo struct {
verID tikv.RegionVerID
// The span of the region.
// Note(dongmen): The span doesn't always represent the whole span of a region.
// Instead, it is the portion of the region that belongs the subcribed table.
// Multiple tables can belong to the same region.
// For instance, consider region-1 with a span of [a, d).
// It contains 3 tables: t1[a, b), t2[b,c), and t3[c,d).
// If only table t1 is subscribed to, then the span of interest is [a,b).
span tablepb.Span
rpcCtx *tikv.RPCContext

requestedTable *requestedTable
lockedRange *regionlock.LockedRange
// The table that the region belongs to.
subscribedTable *subscribedTable
lockedRange *regionlock.LockedRange
}

func newSingleRegionInfo(
func (s regionInfo) isStoped() bool {
// lockedRange only nil when the region's subscribedTable is stopped.
return s.lockedRange == nil
}

func newRegionInfo(
verID tikv.RegionVerID,
span tablepb.Span,
rpcCtx *tikv.RPCContext,
) singleRegionInfo {
return singleRegionInfo{
verID: verID,
span: span,
rpcCtx: rpcCtx,
subscribedTable *subscribedTable,
) regionInfo {
return regionInfo{
verID: verID,
span: span,
rpcCtx: rpcCtx,
subscribedTable: subscribedTable,
}
}

func (s singleRegionInfo) resolvedTs() uint64 {
func (s regionInfo) resolvedTs() uint64 {
return s.lockedRange.ResolvedTs.Load()
}

type regionErrorInfo struct {
regionInfo
err error
}

func newRegionErrorInfo(info regionInfo, err error) regionErrorInfo {
return regionErrorInfo{
regionInfo: info,
err: err,
}
}

type regionFeedState struct {
sri singleRegionInfo
region regionInfo
requestID uint64
matcher *matcher

Expand All @@ -75,9 +102,9 @@ type regionFeedState struct {
}
}

func newRegionFeedState(sri singleRegionInfo, requestID uint64) *regionFeedState {
func newRegionFeedState(region regionInfo, requestID uint64) *regionFeedState {
return &regionFeedState{
sri: sri,
region: region,
requestID: requestID,
}
}
Expand Down Expand Up @@ -122,24 +149,24 @@ func (s *regionFeedState) takeError() (err error) {
}

func (s *regionFeedState) isInitialized() bool {
return s.sri.lockedRange.Initialzied.Load()
return s.region.lockedRange.Initialzied.Load()
}

func (s *regionFeedState) setInitialized() {
s.sri.lockedRange.Initialzied.Store(true)
s.region.lockedRange.Initialzied.Store(true)
}

func (s *regionFeedState) getRegionID() uint64 {
return s.sri.verID.GetID()
return s.region.verID.GetID()
}

func (s *regionFeedState) getLastResolvedTs() uint64 {
return s.sri.lockedRange.ResolvedTs.Load()
return s.region.lockedRange.ResolvedTs.Load()
}

// updateResolvedTs update the resolved ts of the current region feed
func (s *regionFeedState) updateResolvedTs(resolvedTs uint64) {
state := s.sri.lockedRange
state := s.region.lockedRange
for {
last := state.ResolvedTs.Load()
if last > resolvedTs {
Expand All @@ -149,22 +176,22 @@ func (s *regionFeedState) updateResolvedTs(resolvedTs uint64) {
break
}
}
if s.sri.requestedTable != nil {
s.sri.requestedTable.postUpdateRegionResolvedTs(
s.sri.verID.GetID(),
s.sri.verID.GetVer(),
if s.region.subscribedTable != nil {
s.region.subscribedTable.postUpdateRegionResolvedTs(
s.region.verID.GetID(),
s.region.verID.GetVer(),
state,
s.sri.span,
s.region.span,
)
}
}

func (s *regionFeedState) getRegionInfo() singleRegionInfo {
return s.sri
func (s *regionFeedState) getRegionInfo() regionInfo {
return s.region
}

func (s *regionFeedState) getRegionMeta() (uint64, tablepb.Span, string) {
return s.sri.verID.GetID(), s.sri.span, s.sri.rpcCtx.Addr
return s.region.verID.GetID(), s.region.span, s.region.rpcCtx.Addr
}

type syncRegionFeedStateMap struct {
Expand Down
14 changes: 8 additions & 6 deletions cdc/kv/region_state_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ func TestSyncRegionFeedStateMapConcurrentAccess(t *testing.T) {
return
default:
}
m.setByRequestID(1, &regionFeedState{sri: singleRegionInfo{lockedRange: &regionlock.LockedRange{}}})
m.setByRequestID(2, &regionFeedState{sri: singleRegionInfo{lockedRange: &regionlock.LockedRange{}}})
m.setByRequestID(3, &regionFeedState{sri: singleRegionInfo{lockedRange: &regionlock.LockedRange{}}})
m.setByRequestID(1, &regionFeedState{region: regionInfo{lockedRange: &regionlock.LockedRange{}}})
m.setByRequestID(2, &regionFeedState{region: regionInfo{lockedRange: &regionlock.LockedRange{}}})
m.setByRequestID(3, &regionFeedState{region: regionInfo{lockedRange: &regionlock.LockedRange{}}})
}
}()
wg.Add(1)
Expand Down Expand Up @@ -116,11 +116,13 @@ func (rsm *regionStateManagerWithSyncMap) delState(regionID uint64) {
}

func benchmarkGetRegionState(b *testing.B, bench func(b *testing.B, sm regionStateManagerInterface, count int)) {
state := newRegionFeedState(newSingleRegionInfo(
state := newRegionFeedState(newRegionInfo(
tikv.RegionVerID{},
spanz.ToSpan([]byte{}, spanz.UpperBoundKey),
&tikv.RPCContext{}), 0)
state.sri.lockedRange = &regionlock.LockedRange{}
&tikv.RPCContext{},
nil,
), 0)
state.region.lockedRange = &regionlock.LockedRange{}

regionCount := []int{100, 1000, 10000, 20000, 40000, 80000, 160000, 320000}
for _, count := range regionCount {
Expand Down
21 changes: 13 additions & 8 deletions cdc/kv/regionlock/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,26 +27,31 @@ func CheckRegionsLeftCover(regions []*metapb.Region, span tablepb.Span) bool {
return len(regions) > 0 && len(subRegions) == len(regions)
}

// CutRegionsLeftCoverSpan cuts regions at the position which doesn't cover span
// or is incontinuous with the previous one.
func CutRegionsLeftCoverSpan(regions []*metapb.Region, span tablepb.Span) []*metapb.Region {
// CutRegionsLeftCoverSpan processes a list of regions to remove those that
// do not cover the specified span or are discontinuous with the previous region.
// It returns a new slice containing only the continuous regions that cover the span.
func CutRegionsLeftCoverSpan(regions []*metapb.Region, spanToCover tablepb.Span) []*metapb.Region {
if len(regions) == 0 {
return nil
}

sort.Slice(regions, func(i, j int) bool {
return spanz.StartCompare(regions[i].StartKey, regions[j].StartKey) == -1
})
if spanz.StartCompare(regions[0].StartKey, span.StartKey) == 1 {

// If the start key of the first region is after the span's start key,
// no regions cover the span, return nil.
if spanz.StartCompare(regions[0].StartKey, spanToCover.StartKey) == 1 {
return nil
}

nextStart := regions[0].StartKey
nextStartKey := regions[0].StartKey
for i, region := range regions {
// incontinuous regions
if spanz.StartCompare(nextStart, region.StartKey) != 0 {
// If find discontinuous, return the regions up to the current index.
if spanz.StartCompare(nextStartKey, region.StartKey) != 0 {
return regions[:i]
}
nextStart = region.EndKey
nextStartKey = region.EndKey
}
return regions
}
Loading

0 comments on commit 8a3b776

Please sign in to comment.