Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

importinto: integrate global sort(without merge-sort) part 1 #46998

Merged
merged 22 commits into from
Sep 19, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
save sort data meta
  • Loading branch information
D3Hunter committed Sep 14, 2023
commit c4f5647e4fc1a6a17399860b5e75d680501d66fc
76 changes: 76 additions & 0 deletions br/pkg/lightning/backend/external/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,3 +241,79 @@ func GetMaxOverlapping(points []Endpoint) int {
}
return int(maxWeight)
}

// SortedDataMeta is the meta of sorted data.
type SortedDataMeta struct {
MinKey []byte `json:"min_key"`
MaxKey []byte `json:"max_key"`
TotalKVSize uint64 `json:"total_kv_size"`
DataFiles []string `json:"data_files"`
StatFiles []string `json:"stat_files"`
}

// NewSortedDataMeta creates a SortedDataMeta from a WriterSummary.
func NewSortedDataMeta(summary *WriterSummary) *SortedDataMeta {
meta := &SortedDataMeta{
MinKey: summary.Min.Clone(),
MaxKey: summary.Max.Clone(),
TotalKVSize: summary.TotalSize,
}
for _, f := range summary.MultipleFilesStats {
for _, filename := range f.Filenames {
meta.DataFiles = append(meta.DataFiles, filename[0])
meta.StatFiles = append(meta.StatFiles, filename[1])
}
}
return meta
}

// Merge merges the other SortedDataMeta into this one.
func (m *SortedDataMeta) Merge(other *SortedDataMeta) {
m.MinKey = NotNilMin(m.MinKey, other.MinKey)
m.MaxKey = NotNilMax(m.MaxKey, other.MaxKey)
m.TotalKVSize += other.TotalKVSize

m.DataFiles = append(m.DataFiles, other.DataFiles...)
m.StatFiles = append(m.StatFiles, other.StatFiles...)
}

// MergeSummary merges the WriterSummary into this SortedDataMeta.
func (m *SortedDataMeta) MergeSummary(summary *WriterSummary) {
m.MinKey = NotNilMin(m.MinKey, summary.Min)
m.MaxKey = NotNilMax(m.MaxKey, summary.Max)
m.TotalKVSize += summary.TotalSize
for _, f := range summary.MultipleFilesStats {
for _, filename := range f.Filenames {
m.DataFiles = append(m.DataFiles, filename[0])
m.StatFiles = append(m.StatFiles, filename[1])
}
}
}

// NotNilMin returns the smallest of a and b, ignoring nil values.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// NotNilMin returns the smallest of a and b, ignoring nil values.
// NotNilMin returns the smallest of a and b.

func NotNilMin(a, b []byte) []byte {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved from ddl/backfilling_dispatcher.go

if len(a) == 0 {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if both a and b nil?it will return b(nil)?

Copy link
Contributor Author

@D3Hunter D3Hunter Sep 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's moved from add-index, didn't change

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Benjamin2037 Yes, returning a nil value is expected.

return b
}
if len(b) == 0 {
return a
}
if bytes.Compare(a, b) < 0 {
return a
}
return b
}

// NotNilMax returns the largest of a and b, ignoring nil values.
func NotNilMax(a, b []byte) []byte {
if len(a) == 0 {
return b
}
if len(b) == 0 {
return a
}
if bytes.Compare(a, b) > 0 {
return a
}
return b
}
44 changes: 9 additions & 35 deletions ddl/backfilling_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,12 +309,14 @@ func generateMergeSortPlan(
hex.EncodeToString(startKey), hex.EncodeToString(endKey))
}
m := &BackfillSubTaskMeta{
MinKey: startKey,
MaxKey: endKey,
DataFiles: dataFiles,
StatFiles: statFiles,
SortedDataMeta: external.SortedDataMeta{
MinKey: startKey,
MaxKey: endKey,
DataFiles: dataFiles,
StatFiles: statFiles,
TotalKVSize: totalSize / uint64(len(instanceIDs)),
},
RangeSplitKeys: rangeSplitKeys,
TotalKVSize: totalSize / uint64(len(instanceIDs)),
}
metaBytes, err := json.Marshal(m)
if err != nil {
Expand Down Expand Up @@ -386,8 +388,8 @@ func getSummaryFromLastStep(
}
// Skip empty subtask.MinKey/MaxKey because it means
// no records need to be written in this subtask.
minKey = notNilMin(minKey, subtask.MinKey)
maxKey = notNilMax(maxKey, subtask.MaxKey)
minKey = external.NotNilMin(minKey, subtask.MinKey)
maxKey = external.NotNilMax(maxKey, subtask.MaxKey)
totalKVSize += subtask.TotalKVSize

allDataFiles = append(allDataFiles, subtask.DataFiles...)
Expand All @@ -409,31 +411,3 @@ func redactCloudStorageURI(
}
gTask.Meta = metaBytes
}

// notNilMin returns the smaller of a and b, ignoring nil values.
func notNilMin(a, b []byte) []byte {
if len(a) == 0 {
return b
}
if len(b) == 0 {
return a
}
if bytes.Compare(a, b) < 0 {
return a
}
return b
}

// notNilMax returns the larger of a and b, ignoring nil values.
func notNilMax(a, b []byte) []byte {
if len(a) == 0 {
return b
}
if len(b) == 0 {
return a
}
if bytes.Compare(a, b) > 0 {
return a
}
return b
}
9 changes: 3 additions & 6 deletions ddl/stage_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"encoding/json"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/br/pkg/lightning/backend/external"
"github.com/pingcap/tidb/ddl/ingest"
"github.com/pingcap/tidb/disttask/framework/proto"
"github.com/pingcap/tidb/disttask/framework/scheduler"
Expand All @@ -44,12 +45,8 @@ type BackfillSubTaskMeta struct {
StartKey []byte `json:"start_key"`
EndKey []byte `json:"end_key"`

DataFiles []string `json:"data_files"`
StatFiles []string `json:"stat_files"`
RangeSplitKeys [][]byte `json:"range_split_keys"`
MinKey []byte `json:"min_key"`
MaxKey []byte `json:"max_key"`
TotalKVSize uint64 `json:"total_kv_size"`
RangeSplitKeys [][]byte `json:"range_split_keys"`
external.SortedDataMeta `json:",inline"`
}

// NewBackfillSubtaskExecutor creates a new backfill subtask executor.
Expand Down
4 changes: 2 additions & 2 deletions disttask/importinto/encode_and_sort_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func newChunkWorker(ctx context.Context, op *encodeAndSortOperator, workerID int
indexWriterFn := func(indexID int64) *external.Writer {
builder := external.NewWriterBuilder().
SetOnCloseFunc(func(summary *external.WriterSummary) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I plan to let the "WriterSummary" result be returned when Writer.Close, not passing it around using OnClose. What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

op.sharedVars.addIndexSummary(indexID, summary)
op.sharedVars.mergeIndexSummary(indexID, summary)
})
prefix := path.Join(strconv.Itoa(int(op.taskID)), strconv.Itoa(int(op.subtaskID)))
writerID := path.Join("index", strconv.Itoa(int(indexID)), strconv.Itoa(int(workerID)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Considering two nodes are running same subtask (maybe the old one suffers network partition and framework dispatch a new node), we should use random writerID to avoid overwritten. Or maybe WriterBuilder should assign UUID internally 🤔

The result file path will be returned by OnClose, so caller seems don't need to care about the writerID

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Considering two nodes are running same subtask

makes sense, will change it to a uuid here

so caller seems don't need to care about the writerID

that would be better

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Expand All @@ -151,7 +151,7 @@ func newChunkWorker(ctx context.Context, op *encodeAndSortOperator, workerID int

// sorted data kv storage path: /{taskID}/{subtaskID}/data/{workerID}
builder := external.NewWriterBuilder().
SetOnCloseFunc(op.sharedVars.setDataSummary)
SetOnCloseFunc(op.sharedVars.mergeDataSummary)
prefix := path.Join(strconv.Itoa(int(op.taskID)), strconv.Itoa(int(op.subtaskID)))
writerID := path.Join("data", strconv.Itoa(int(workerID)))
writer := builder.Build(op.tableImporter.GlobalSortStore, prefix, writerID)
Expand Down
28 changes: 21 additions & 7 deletions disttask/importinto/proto.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ type ImportStepMeta struct {
// the max id is same among all allocator types for now, since we're using same base, see
// NewPanickingAllocators for more info.
MaxIDs map[autoid.AllocatorType]int64

SortedDataMeta *external.SortedDataMeta
// SortedIndexMetas is a map from index id to its sorted kv meta.
SortedIndexMetas map[int64]*external.SortedDataMeta
}

// PostProcessStepMeta is the meta of post process step.
Expand All @@ -92,21 +96,31 @@ type SharedVars struct {
mu sync.Mutex
Checksum *verification.KVChecksum

SortedDataSummary *external.WriterSummary
// SortedIndexSummaries is a map from index id to its sorted kv summary.
SortedIndexSummaries map[int64]*external.WriterSummary
SortedDataMeta *external.SortedDataMeta
// SortedIndexMetas is a map from index id to its sorted kv meta.
SortedIndexMetas map[int64]*external.SortedDataMeta
}

func (sv *SharedVars) setDataSummary(summary *external.WriterSummary) {
func (sv *SharedVars) mergeDataSummary(summary *external.WriterSummary) {
sv.mu.Lock()
defer sv.mu.Unlock()
sv.SortedDataSummary = summary
if sv.SortedDataMeta == nil {
sv.SortedDataMeta = external.NewSortedDataMeta(summary)
return
}
sv.SortedDataMeta.MergeSummary(summary)
}

func (sv *SharedVars) addIndexSummary(indexID int64, summary *external.WriterSummary) {
func (sv *SharedVars) mergeIndexSummary(indexID int64, summary *external.WriterSummary) {
sv.mu.Lock()
defer sv.mu.Unlock()
sv.SortedIndexSummaries[indexID] = summary
meta, ok := sv.SortedIndexMetas[indexID]
if !ok {
meta = external.NewSortedDataMeta(summary)
sv.SortedIndexMetas[indexID] = meta
return
}
meta.MergeSummary(summary)
}

// importStepMinimalTask is the minimal task of IMPORT INTO.
Expand Down
14 changes: 8 additions & 6 deletions disttask/importinto/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,12 +126,12 @@ func (s *importStepExecutor) RunSubtask(ctx context.Context, subtask *proto.Subt
}
}
sharedVars := &SharedVars{
TableImporter: s.tableImporter,
DataEngine: dataEngine,
IndexEngine: indexEngine,
Progress: asyncloaddata.NewProgress(false),
Checksum: &verification.KVChecksum{},
SortedIndexSummaries: make(map[int64]*external.WriterSummary),
TableImporter: s.tableImporter,
DataEngine: dataEngine,
IndexEngine: indexEngine,
Progress: asyncloaddata.NewProgress(false),
Checksum: &verification.KVChecksum{},
SortedIndexMetas: make(map[int64]*external.SortedDataMeta),
}
s.sharedVars.Store(subtaskMeta.ID, sharedVars)

Expand Down Expand Up @@ -216,6 +216,8 @@ func (s *importStepExecutor) OnFinished(ctx context.Context, subtask *proto.Subt
autoid.AutoIncrementType: allocators.Get(autoid.AutoIncrementType).Base(),
autoid.AutoRandomType: allocators.Get(autoid.AutoRandomType).Base(),
}
subtaskMeta.SortedDataMeta = sharedVars.SortedDataMeta
subtaskMeta.SortedIndexMetas = sharedVars.SortedIndexMetas
s.sharedVars.Delete(subtaskMeta.ID)
newMeta, err := json.Marshal(subtaskMeta)
if err != nil {
Expand Down