Skip to content

Commit

Permalink
address comment
Browse files Browse the repository at this point in the history
  • Loading branch information
tangenta committed Mar 14, 2024
1 parent 01be552 commit 3be8a9d
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 8 deletions.
39 changes: 36 additions & 3 deletions pkg/ddl/index_merge_tmp.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ type mergeIndexWorker struct {
originIdxKeys []kv.Key
tmpIdxKeys []kv.Key

needValidateKey bool
currentTempIndexPrefix []byte
currentIndexID int64
currentIndexColLen int
Expand All @@ -148,8 +149,37 @@ func newMergeTempIndexWorker(bfCtx *backfillCtx, t table.PhysicalTable, elements
}
}

func (w *mergeIndexWorker) validateTaskRange(taskRange *reorgBackfillTask) (skip bool, err error) {
tmpID, err := tablecodec.DecodeIndexID(taskRange.startKey)
if err != nil {
return false, err
}
startIndexID := tmpID & tablecodec.IndexIDMask
tmpID, err = tablecodec.DecodeIndexID(taskRange.endKey)
if err != nil {
return false, err
}
endIndexID := tmpID & tablecodec.IndexIDMask

w.needValidateKey = startIndexID != endIndexID
containsTargetID := false
for _, idx := range w.indexes {
idxInfo := idx.Meta()
if idxInfo.ID == startIndexID || idxInfo.ID == endIndexID {
containsTargetID = true
break
}
}
return !containsTargetID, nil
}

// BackfillData merge temp index data in txn.
func (w *mergeIndexWorker) BackfillData(taskRange reorgBackfillTask) (taskCtx backfillTaskContext, errInTxn error) {
skip, err := w.validateTaskRange(&taskRange)
if skip || err != nil {
return taskCtx, err
}

oprStartTime := time.Now()
ctx := kv.WithInternalSourceAndTaskType(context.Background(), w.jobContext.ddlJobSourceType(), kvutil.ExplicitTypeDDL)

Expand Down Expand Up @@ -230,13 +260,16 @@ func (w *mergeIndexWorker) GetCtx() *backfillCtx {
return w.backfillCtx
}

func (w *mergeIndexWorker) prefixIsChanged(newKey kv.Key) bool {
return len(w.currentTempIndexPrefix) == 0 || !bytes.HasPrefix(newKey, w.currentTempIndexPrefix)
}

func (w *mergeIndexWorker) updateCurrentIndexInfo(newIndexKey kv.Key) (skip bool, err error) {
tempIdxID, err := tablecodec.DecodeIndexID(newIndexKey)
if err != nil {
return false, err
}
idxID := tablecodec.IndexIDMask & tempIdxID
pfx := tablecodec.CutIndexKeyPrefix(newIndexKey)
var curIdx *model.IndexInfo
for _, idx := range w.indexes {
if idx.Meta().ID == idxID {
Expand All @@ -252,6 +285,7 @@ func (w *mergeIndexWorker) updateCurrentIndexInfo(newIndexKey kv.Key) (skip bool
// We should skip these dirty records.
return true, nil
}
pfx := tablecodec.CutIndexPrefix(newIndexKey)

w.currentTempIndexPrefix = kv.Key(pfx).Clone()
w.currentIndexID = idxID
Expand Down Expand Up @@ -286,8 +320,7 @@ func (w *mergeIndexWorker) fetchTempIndexVals(
return false, nil
}

if len(w.currentTempIndexPrefix) == 0 ||
!bytes.HasPrefix(indexKey, w.currentTempIndexPrefix) {
if w.needValidateKey && w.prefixIsChanged(indexKey) {
skip, err := w.updateCurrentIndexInfo(indexKey)
if err != nil || skip {
return skip, err
Expand Down
6 changes: 1 addition & 5 deletions pkg/tablecodec/tablecodec.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ const (
prefixLen = 1 + idLen /*tableID*/ + 2
// RecordRowKeyLen is public for calculating average row size.
RecordRowKeyLen = prefixLen + idLen /*handle*/
IndexKeyLen = prefixLen + idLen
tablePrefixLength = 1
recordPrefixSepLength = 2
metaPrefixLength = 1
Expand Down Expand Up @@ -106,11 +107,6 @@ func CutRowKeyPrefix(key kv.Key) []byte {
return key[prefixLen:]
}

// CutIndexKeyPrefix cuts the index key prefix.
func CutIndexKeyPrefix(key kv.Key) []byte {
return key[len(tablePrefix)+8+len(indexPrefixSep):]
}

// EncodeRecordKey encodes the recordPrefix, row handle into a kv.Key.
func EncodeRecordKey(recordPrefix kv.Key, h kv.Handle) kv.Key {
buf := make([]byte, 0, len(recordPrefix)+h.Len())
Expand Down

0 comments on commit 3be8a9d

Please sign in to comment.