Skip to content

Commit

Permalink
lightning: generate and send region job more smoothly (#42780)
Browse files Browse the repository at this point in the history
close #42456
  • Loading branch information
lance6716 authored Apr 13, 2023
1 parent 0e59c9f commit 44aa4cf
Show file tree
Hide file tree
Showing 7 changed files with 923 additions and 419 deletions.
1 change: 1 addition & 0 deletions br/pkg/lightning/backend/local/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ go_test(
"//br/pkg/lightning/backend/encode",
"//br/pkg/lightning/backend/kv",
"//br/pkg/lightning/common",
"//br/pkg/lightning/config",
"//br/pkg/lightning/log",
"//br/pkg/lightning/mydump",
"//br/pkg/membuf",
Expand Down
81 changes: 7 additions & 74 deletions br/pkg/lightning/backend/local/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,14 +107,13 @@ type Engine struct {
// flush and ingest sst hold the rlock, other operation hold the wlock.
mutex sync.RWMutex

ctx context.Context
cancel context.CancelFunc
sstDir string
sstMetasChan chan metaOrFlush
ingestErr common.OnceError
wg sync.WaitGroup
sstIngester sstIngester
finishedRanges syncedRanges
ctx context.Context
cancel context.CancelFunc
sstDir string
sstMetasChan chan metaOrFlush
ingestErr common.OnceError
wg sync.WaitGroup
sstIngester sstIngester

// sst seq lock
seqLock sync.Mutex
Expand Down Expand Up @@ -922,72 +921,6 @@ func (e *Engine) loadEngineMeta() error {
return nil
}

// sortAndMergeRanges sort the ranges and merge range that overlaps with each other into a single range.
func sortAndMergeRanges(ranges []Range) []Range {
if len(ranges) == 0 {
return ranges
}

slices.SortFunc(ranges, func(i, j Range) bool {
return bytes.Compare(i.start, j.start) < 0
})

curEnd := ranges[0].end
i := 0
for j := 1; j < len(ranges); j++ {
if bytes.Compare(curEnd, ranges[j].start) >= 0 {
if bytes.Compare(curEnd, ranges[j].end) < 0 {
curEnd = ranges[j].end
}
} else {
ranges[i].end = curEnd
i++
ranges[i].start = ranges[j].start
curEnd = ranges[j].end
}
}
ranges[i].end = curEnd
return ranges[:i+1]
}

func filterOverlapRange(ranges []Range, finishedRanges []Range) []Range {
if len(ranges) == 0 || len(finishedRanges) == 0 {
return ranges
}

result := make([]Range, 0)
for _, r := range ranges {
start := r.start
end := r.end
for len(finishedRanges) > 0 && bytes.Compare(finishedRanges[0].start, end) < 0 {
fr := finishedRanges[0]
if bytes.Compare(fr.start, start) > 0 {
result = append(result, Range{start: start, end: fr.start})
}
if bytes.Compare(fr.end, start) > 0 {
start = fr.end
}
if bytes.Compare(fr.end, end) > 0 {
break
}
finishedRanges = finishedRanges[1:]
}
if bytes.Compare(start, end) < 0 {
result = append(result, Range{start: start, end: end})
}
}
return result
}

func (e *Engine) unfinishedRanges(ranges []Range) []Range {
e.finishedRanges.Lock()
defer e.finishedRanges.Unlock()

e.finishedRanges.ranges = sortAndMergeRanges(e.finishedRanges.ranges)

return filterOverlapRange(ranges, e.finishedRanges.ranges)
}

func (e *Engine) newKVIter(ctx context.Context, opts *pebble.IterOptions) Iter {
if bytes.Compare(opts.LowerBound, normalIterStartKey) < 0 {
newOpts := *opts
Expand Down
Loading

0 comments on commit 44aa4cf

Please sign in to comment.