diff --git a/br/pkg/lightning/backend/external/BUILD.bazel b/br/pkg/lightning/backend/external/BUILD.bazel index 3b8a2775752f9..20352573ea6c5 100644 --- a/br/pkg/lightning/backend/external/BUILD.bazel +++ b/br/pkg/lightning/backend/external/BUILD.bazel @@ -30,7 +30,6 @@ go_library( "//br/pkg/storage", "//pkg/kv", "//pkg/metrics", - "//pkg/sessionctx/variable", "//pkg/util/hack", "//pkg/util/logutil", "//pkg/util/mathutil", @@ -39,7 +38,6 @@ go_library( "@com_github_cockroachdb_pebble//:pebble", "@com_github_docker_go_units//:go-units", "@com_github_google_uuid//:uuid", - "@com_github_jfcg_sorty_v2//:sorty", "@com_github_pingcap_errors//:errors", "@org_golang_x_sync//errgroup", "@org_uber_go_atomic//:atomic", diff --git a/br/pkg/lightning/backend/external/engine.go b/br/pkg/lightning/backend/external/engine.go index 9c11a89c658a7..ac9f72485a18c 100644 --- a/br/pkg/lightning/backend/external/engine.go +++ b/br/pkg/lightning/backend/external/engine.go @@ -145,7 +145,7 @@ func (e *Engine) getAdjustedConcurrency() int { if e.checkHotspot { // estimate we will open at most 1000 files, so if e.dataFiles is small we can // try to concurrently process ranges. - adjusted := int(MergeSortOverlapThreshold) / len(e.dataFiles) + adjusted := maxCloudStorageConnections / len(e.dataFiles) return min(adjusted, 8) } adjusted := min(e.workerConcurrency, maxCloudStorageConnections/len(e.dataFiles)) diff --git a/br/pkg/lightning/backend/external/engine_test.go b/br/pkg/lightning/backend/external/engine_test.go index 7d2a6eca318ca..8592de4c5e21a 100644 --- a/br/pkg/lightning/backend/external/engine_test.go +++ b/br/pkg/lightning/backend/external/engine_test.go @@ -337,7 +337,7 @@ func TestGetAdjustedConcurrency(t *testing.T) { dataFiles: genFiles(100), } require.Equal(t, 8, e.getAdjustedConcurrency()) - e.dataFiles = genFiles(1000) + e.dataFiles = genFiles(8000) require.Equal(t, 1, e.getAdjustedConcurrency()) e.checkHotspot = false diff --git a/br/pkg/lightning/backend/external/iter.go b/br/pkg/lightning/backend/external/iter.go index 0cbda86c6a02a..768f7bd979a92 100644 --- a/br/pkg/lightning/backend/external/iter.go +++ b/br/pkg/lightning/backend/external/iter.go @@ -86,8 +86,8 @@ type mergeIter[T heapElem, R sortedReader[T]] struct { curr T lastReaderIdx int err error - - // determines whether to check reader hotspot, if hotspot is detected, we will + + // determines whether to check reader hotspot, if hotspot is detected, we will // try read this file concurrently. checkHotspot bool hotspotMap map[int]int diff --git a/br/pkg/lightning/backend/external/writer.go b/br/pkg/lightning/backend/external/writer.go index f60814338e57c..cb3ad887114e1 100644 --- a/br/pkg/lightning/backend/external/writer.go +++ b/br/pkg/lightning/backend/external/writer.go @@ -22,21 +22,17 @@ import ( "path/filepath" "slices" "strconv" - "sync" "time" "github.com/docker/go-units" - "github.com/jfcg/sorty/v2" "github.com/pingcap/errors" "github.com/pingcap/tidb/br/pkg/lightning/backend" "github.com/pingcap/tidb/br/pkg/lightning/backend/encode" "github.com/pingcap/tidb/br/pkg/lightning/backend/kv" "github.com/pingcap/tidb/br/pkg/lightning/common" - "github.com/pingcap/tidb/br/pkg/membuf" "github.com/pingcap/tidb/br/pkg/storage" tidbkv "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/metrics" - "github.com/pingcap/tidb/pkg/sessionctx/variable" "github.com/pingcap/tidb/pkg/util/logutil" "github.com/pingcap/tidb/pkg/util/size" "go.uber.org/zap" @@ -106,10 +102,6 @@ type WriterBuilder struct { propKeysDist uint64 onClose OnCloseFunc keyDupeEncoding bool - // This mutex is used to make sure the writer is flushed mutually exclusively in a TiDB server. - mu *sync.Mutex - - bufferPool *membuf.Pool } // NewWriterBuilder creates a WriterBuilder. @@ -159,24 +151,12 @@ func (b *WriterBuilder) SetOnCloseFunc(onClose OnCloseFunc) *WriterBuilder { return b } -// SetBufferPool sets the buffer pool of the writer. -func (b *WriterBuilder) SetBufferPool(bufferPool *membuf.Pool) *WriterBuilder { - b.bufferPool = bufferPool - return b -} - // SetKeyDuplicationEncoding sets if the writer can distinguish duplicate key. func (b *WriterBuilder) SetKeyDuplicationEncoding(val bool) *WriterBuilder { b.keyDupeEncoding = val return b } -// SetMutex sets the mutex of the writer. -func (b *WriterBuilder) SetMutex(mu *sync.Mutex) *WriterBuilder { - b.mu = mu - return b -} - // SetBlockSize sets the block size of pre-allocated buf in the writer. func (b *WriterBuilder) SetBlockSize(blockSize int) *WriterBuilder { b.blockSize = blockSize @@ -190,10 +170,6 @@ func (b *WriterBuilder) Build( prefix string, writerID string, ) *Writer { - bp := b.bufferPool - if bp == nil { - bp = membuf.NewPool() - } filenamePrefix := filepath.Join(prefix, writerID) keyAdapter := common.KeyAdapter(common.NoopKeyAdapter{}) if b.keyDupeEncoding { @@ -219,7 +195,6 @@ func (b *WriterBuilder) Build( multiFileStats: make([]MultipleFilesStat, 1), fileMinKeys: make([]tidbkv.Key, 0, multiFileStatNum), fileMaxKeys: make([]tidbkv.Key, 0, multiFileStatNum), - shareMu: b.mu, } ret.multiFileStats[0].Filenames = make([][2]string, 0, multiFileStatNum) return ret @@ -312,8 +287,6 @@ type Writer struct { minKey tidbkv.Key maxKey tidbkv.Key totalSize uint64 - // This mutex is used to make sure the writer is flushed mutually exclusively in a TiDB server. - shareMu *sync.Mutex } // WriteRow implements ingest.Writer. @@ -406,10 +379,6 @@ func (w *Writer) flushKVs(ctx context.Context, fromClose bool) (err error) { if len(w.kvLocations) == 0 { return nil } - if w.shareMu != nil { - w.shareMu.Lock() - defer w.shareMu.Unlock() - } logger := logutil.Logger(ctx) dataFile, statFile, dataWriter, statWriter, err := w.createStorageWriter(ctx) @@ -470,23 +439,9 @@ func (w *Writer) flushKVs(ctx context.Context, fromClose bool) (err error) { }() sortStart := time.Now() - if w.shareMu != nil { - sorty.MaxGor = min(8, uint64(variable.GetDDLReorgWorkerCounter())) - sorty.Sort(len(w.kvLocations), func(i, j, r, s int) bool { - posi, posj := w.kvLocations[i], w.kvLocations[j] - if bytes.Compare(w.getKeyByLoc(posi), w.getKeyByLoc(posj)) < 0 { - if r != s { - w.kvLocations[r], w.kvLocations[s] = w.kvLocations[s], w.kvLocations[r] - } - return true - } - return false - }) - } else { - slices.SortFunc(w.kvLocations, func(i, j kvLocation) int { - return bytes.Compare(w.getKeyByLoc(i), w.getKeyByLoc(j)) - }) - } + slices.SortFunc(w.kvLocations, func(i, j kvLocation) int { + return bytes.Compare(w.getKeyByLoc(i), w.getKeyByLoc(j)) + }) sortDuration = time.Since(sortStart) writeStartTime = time.Now() diff --git a/br/pkg/lightning/backend/kv/kv2sql.go b/br/pkg/lightning/backend/kv/kv2sql.go index 66173f9f1e9e4..bb0de7783330c 100644 --- a/br/pkg/lightning/backend/kv/kv2sql.go +++ b/br/pkg/lightning/backend/kv/kv2sql.go @@ -104,7 +104,7 @@ func (t *TableKVDecoder) IterRawIndexKeys(h kv.Handle, rawRow []byte, fn func([] } iter := index.GenIndexKVIter(t.se.Vars.StmtCtx, indexValues, h, nil) for iter.Valid() { - indexKey, _, _, err := iter.Next(indexBuffer) + indexKey, _, _, err := iter.Next(indexBuffer, nil) if err != nil { return err } diff --git a/br/pkg/lightning/backend/local/local.go b/br/pkg/lightning/backend/local/local.go index 8950f8a6a3a7e..53d7da49d71cf 100644 --- a/br/pkg/lightning/backend/local/local.go +++ b/br/pkg/lightning/backend/local/local.go @@ -1579,11 +1579,6 @@ func (local *Backend) GetRegionSplitSizeKeys(ctx context.Context) (finalSize int return GetRegionSplitSizeKeys(ctx, local.pdCtl.GetPDClient(), local.tls) } -// GetMutex returns the mutex of the backend. -func (local *Backend) GetMutex() *sync.Mutex { - return &local.mu -} - // expose these variables to unit test. var ( testJobToWorkerCh = make(chan *regionJob) diff --git a/pkg/ddl/BUILD.bazel b/pkg/ddl/BUILD.bazel index 1e20056214603..a45cf55377f00 100644 --- a/pkg/ddl/BUILD.bazel +++ b/pkg/ddl/BUILD.bazel @@ -181,6 +181,7 @@ go_test( "attributes_sql_test.go", "backfilling_dispatcher_test.go", "backfilling_test.go", + "bench_test.go", "cancel_test.go", "cluster_test.go", "column_change_test.go", diff --git a/pkg/ddl/backfilling_operators.go b/pkg/ddl/backfilling_operators.go index 6d6190c2d1ff1..75ced7ca4831e 100644 --- a/pkg/ddl/backfilling_operators.go +++ b/pkg/ddl/backfilling_operators.go @@ -20,7 +20,6 @@ import ( "fmt" "path" "strconv" - "sync" "sync/atomic" "time" @@ -42,7 +41,6 @@ import ( "github.com/pingcap/tidb/pkg/resourcemanager/pool/workerpool" "github.com/pingcap/tidb/pkg/resourcemanager/util" "github.com/pingcap/tidb/pkg/sessionctx" - "github.com/pingcap/tidb/pkg/sessionctx/variable" "github.com/pingcap/tidb/pkg/table" "github.com/pingcap/tidb/pkg/table/tables" "github.com/pingcap/tidb/pkg/tablecodec" @@ -172,7 +170,6 @@ func NewWriteIndexToExternalStoragePipeline( totalRowCount *atomic.Int64, metricCounter prometheus.Counter, onClose external.OnCloseFunc, - bcctx ingest.BackendCtx, ) (*operator.AsyncPipeline, error) { indexes := make([]table.Index, 0, len(idxInfos)) for _, idxInfo := range idxInfos { @@ -189,8 +186,7 @@ func NewWriteIndexToExternalStoragePipeline( for i := 0; i < poolSize; i++ { srcChkPool <- chunk.NewChunkWithCapacity(copCtx.GetBase().FieldTypes, copReadBatchSize()) } - readerCnt := int(variable.GetDDLReorgWorkerCounter()) - writerCnt := 1 + readerCnt, writerCnt := expectedIngestWorkerCnt() backend, err := storage.ParseBackend(extStoreURI, nil) if err != nil { @@ -201,21 +197,16 @@ func NewWriteIndexToExternalStoragePipeline( return nil, err } - var shareMu *sync.Mutex - if bcctx != nil { - shareMu = bcctx.GetLocalBackend().GetMutex() - } - memTotal, err := memory.MemTotal() if err != nil { return nil, err } - memSize := (memTotal / 2) / uint64(len(indexes)) + memSize := (memTotal / 2) / uint64(writerCnt) / uint64(len(indexes)) srcOp := NewTableScanTaskSource(ctx, store, tbl, startKey, endKey) scanOp := NewTableScanOperator(ctx, sessPool, copCtx, srcChkPool, readerCnt) writeOp := NewWriteExternalStoreOperator( - ctx, copCtx, sessPool, jobID, subtaskID, tbl, indexes, extStore, srcChkPool, writerCnt, onClose, shareMu, memSize) + ctx, copCtx, sessPool, jobID, subtaskID, tbl, indexes, extStore, srcChkPool, writerCnt, onClose, memSize) sinkOp := newIndexWriteResultSink(ctx, nil, tbl, indexes, totalRowCount, metricCounter) operator.Compose[TableScanTask](srcOp, scanOp) @@ -499,7 +490,6 @@ func NewWriteExternalStoreOperator( srcChunkPool chan *chunk.Chunk, concurrency int, onClose external.OnCloseFunc, - shareMu *sync.Mutex, memoryQuota uint64, ) *WriteExternalStoreOperator { pool := workerpool.NewWorkerPool( @@ -512,7 +502,6 @@ func NewWriteExternalStoreOperator( builder := external.NewWriterBuilder(). SetOnCloseFunc(onClose). SetKeyDuplicationEncoding(index.Meta().Unique). - SetMutex(shareMu). SetMemorySizeLimit(memoryQuota) writerID := uuid.New().String() prefix := path.Join(strconv.Itoa(int(jobID)), strconv.Itoa(int(subtaskID))) diff --git a/pkg/ddl/backfilling_read_index.go b/pkg/ddl/backfilling_read_index.go index 1f6427ee2f9c2..1e54c7738533e 100644 --- a/pkg/ddl/backfilling_read_index.go +++ b/pkg/ddl/backfilling_read_index.go @@ -274,5 +274,5 @@ func (r *readIndexExecutor) buildExternalStorePipeline( metrics.GenerateReorgLabel("add_idx_rate", r.job.SchemaName, tbl.Meta().Name.O)) return NewWriteIndexToExternalStoragePipeline( opCtx, d.store, r.cloudStorageURI, r.d.sessPool, sessCtx, r.job.ID, subtaskID, - tbl, r.indexes, start, end, totalRowCount, counter, onClose, r.bc) + tbl, r.indexes, start, end, totalRowCount, counter, onClose) } diff --git a/pkg/ddl/bench_test.go b/pkg/ddl/bench_test.go new file mode 100644 index 0000000000000..d976cbdf1932f --- /dev/null +++ b/pkg/ddl/bench_test.go @@ -0,0 +1,100 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ddl_test + +import ( + "testing" + + "github.com/pingcap/tidb/pkg/ddl" + "github.com/pingcap/tidb/pkg/ddl/copr" + "github.com/pingcap/tidb/pkg/kv" + "github.com/pingcap/tidb/pkg/parser/model" + "github.com/pingcap/tidb/pkg/table" + "github.com/pingcap/tidb/pkg/table/tables" + "github.com/pingcap/tidb/pkg/testkit" + "github.com/pingcap/tidb/pkg/types" + "github.com/pingcap/tidb/pkg/util/chunk" + "github.com/stretchr/testify/require" +) + +func BenchmarkExtractDatumByOffsets(b *testing.B) { + store, dom := testkit.CreateMockStoreAndDomain(b) + tk := testkit.NewTestKit(b, store) + tk.MustExec("use test") + + tk.MustExec("drop table if exists t;") + tk.MustExec("create table t (a bigint, b int, index idx (b));") + for i := 0; i < 8; i++ { + tk.MustExec("insert into t values (?, ?)", i, i) + } + tbl, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t")) + require.NoError(b, err) + tblInfo := tbl.Meta() + idxInfo := tblInfo.FindIndexByName("idx") + copCtx, err := copr.NewCopContextSingleIndex(tblInfo, idxInfo, tk.Session(), "") + require.NoError(b, err) + startKey := tbl.RecordPrefix() + endKey := startKey.PrefixNext() + txn, err := store.Begin() + require.NoError(b, err) + copChunk := ddl.FetchChunk4Test(copCtx, tbl.(table.PhysicalTable), startKey, endKey, store, 10) + require.NoError(b, err) + require.NoError(b, txn.Rollback()) + + handleDataBuf := make([]types.Datum, len(copCtx.GetBase().HandleOutputOffsets)) + + iter := chunk.NewIterator4Chunk(copChunk) + row := iter.Begin() + c := copCtx.GetBase() + offsets := copCtx.IndexColumnOutputOffsets(idxInfo.ID) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + ddl.ExtractDatumByOffsetsForTest(row, offsets, c.ExprColumnInfos, handleDataBuf) + } +} + +func BenchmarkGenerateIndexKV(b *testing.B) { + store, dom := testkit.CreateMockStoreAndDomain(b) + tk := testkit.NewTestKit(b, store) + tk.MustExec("use test") + + tk.MustExec("drop table if exists t;") + tk.MustExec("create table t (a bigint, b int, index idx (b));") + for i := 0; i < 8; i++ { + tk.MustExec("insert into t values (?, ?)", i, i) + } + tbl, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t")) + require.NoError(b, err) + tblInfo := tbl.Meta() + idxInfo := tblInfo.FindIndexByName("idx") + + index := tables.NewIndex(tblInfo.ID, tblInfo, idxInfo) + sctx := tk.Session().GetSessionVars().StmtCtx + idxDt := []types.Datum{types.NewIntDatum(10)} + buf := make([]byte, 0, 64) + handle := kv.IntHandle(1) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + buf = buf[:0] + iter := index.GenIndexKVIter(sctx, idxDt, handle, nil) + _, _, _, err = iter.Next(buf, nil) + if err != nil { + break + } + } + require.NoError(b, err) +} diff --git a/pkg/ddl/ddl_worker.go b/pkg/ddl/ddl_worker.go index 3b022345c8edb..12bfeef693ed2 100644 --- a/pkg/ddl/ddl_worker.go +++ b/pkg/ddl/ddl_worker.go @@ -117,6 +117,7 @@ type JobContext struct { tp string resourceGroupName string + cloudStorageURI string } // NewJobContext returns a new ddl job context. diff --git a/pkg/ddl/export_test.go b/pkg/ddl/export_test.go index 1435c9876f9af..0dec47ea651fc 100644 --- a/pkg/ddl/export_test.go +++ b/pkg/ddl/export_test.go @@ -59,10 +59,14 @@ func FetchChunk4Test(copCtx copr.CopContext, tbl table.PhysicalTable, startKey, } func ConvertRowToHandleAndIndexDatum( + handleDataBuf, idxDataBuf []types.Datum, row chunk.Row, copCtx copr.CopContext, idxID int64) (kv.Handle, []types.Datum, error) { c := copCtx.GetBase() - idxData := extractDatumByOffsets(row, copCtx.IndexColumnOutputOffsets(idxID), c.ExprColumnInfos, nil) - handleData := extractDatumByOffsets(row, c.HandleOutputOffsets, c.ExprColumnInfos, nil) + idxData := extractDatumByOffsets(row, copCtx.IndexColumnOutputOffsets(idxID), c.ExprColumnInfos, idxDataBuf) + handleData := extractDatumByOffsets(row, c.HandleOutputOffsets, c.ExprColumnInfos, handleDataBuf) handle, err := buildHandle(handleData, c.TableInfo, c.PrimaryKeyInfo, stmtctx.NewStmtCtxWithTimeZone(time.Local)) return handle, idxData, err } + +// ExtractDatumByOffsetsForTest is used for test. +var ExtractDatumByOffsetsForTest = extractDatumByOffsets diff --git a/pkg/ddl/index.go b/pkg/ddl/index.go index aff05bb2ca49d..f26032c9234d9 100644 --- a/pkg/ddl/index.go +++ b/pkg/ddl/index.go @@ -644,6 +644,7 @@ SwitchIndexState: } return ver, err } + loadCloudStorageURI(w, job) if reorgTp.NeedMergeProcess() { // Increase telemetryAddIndexIngestUsage telemetryAddIndexIngestUsage.Inc() @@ -791,6 +792,12 @@ func pickBackfillType(ctx context.Context, job *model.Job, unique bool, d *ddlCt return model.ReorgTypeTxnMerge, nil } +func loadCloudStorageURI(w *worker, job *model.Job) { + jc := w.jobContext(job.ID, job.ReorgMeta) + jc.cloudStorageURI = variable.CloudStorageURI.Load() + job.ReorgMeta.UseCloudStorage = len(jc.cloudStorageURI) > 0 +} + // cleanupSortPath is used to clean up the temp data of the previous jobs. // Because we don't remove all the files after the support of checkpoint, // there maybe some stale files in the sort path if TiDB is killed during the backfill process. @@ -1636,7 +1643,7 @@ func (w *addIndexTxnWorker) batchCheckUniqueKey(txn kv.Transaction, idxRecords [ if cnt < len(w.idxKeyBufs) { buf = w.idxKeyBufs[cnt] } - key, val, distinct, err := iter.Next(buf) + key, val, distinct, err := iter.Next(buf, nil) if err != nil { return errors.Trace(err) } @@ -1797,25 +1804,24 @@ func writeChunkToLocal( } }() for row := iter.Begin(); row != iter.End(); row = iter.Next() { - handleDataBuf = handleDataBuf[:0] handleDataBuf := extractDatumByOffsets(row, c.HandleOutputOffsets, c.ExprColumnInfos, handleDataBuf) - handle, err := buildHandle(handleDataBuf, c.TableInfo, c.PrimaryKeyInfo, sCtx) + h, err := buildHandle(handleDataBuf, c.TableInfo, c.PrimaryKeyInfo, sCtx) if err != nil { return 0, nil, errors.Trace(err) } for i, index := range indexes { idxID := index.Meta().ID - idxDataBuf = idxDataBuf[:0] idxDataBuf = extractDatumByOffsets( row, copCtx.IndexColumnOutputOffsets(idxID), c.ExprColumnInfos, idxDataBuf) + idxData := idxDataBuf[:len(index.Meta().Columns)] rsData := getRestoreData(c.TableInfo, copCtx.IndexInfo(idxID), c.PrimaryKeyInfo, handleDataBuf) - err = writeOneKVToLocal(ctx, writers[i], index, sCtx, writeBufs, idxDataBuf, rsData, handle) + err = writeOneKVToLocal(ctx, writers[i], index, sCtx, writeBufs, idxData, rsData, h) if err != nil { return 0, nil, errors.Trace(err) } } count++ - lastHandle = handle + lastHandle = h } return count, lastHandle, nil } @@ -1842,7 +1848,7 @@ func writeOneKVToLocal( ) error { iter := index.GenIndexKVIter(sCtx, idxDt, handle, rsData) for iter.Valid() { - key, idxVal, _, err := iter.Next(writeBufs.IndexKeyBuf) + key, idxVal, _, err := iter.Next(writeBufs.IndexKeyBuf, writeBufs.RowValBuf) if err != nil { return errors.Trace(err) } @@ -1860,6 +1866,7 @@ func writeOneKVToLocal( failpoint.Return(errors.New("mock engine error")) }) writeBufs.IndexKeyBuf = key + writeBufs.RowValBuf = idxVal } return nil } @@ -2109,11 +2116,12 @@ func (w *worker) executeDistGlobalTask(reorgInfo *reorgInfo) error { elemIDs = append(elemIDs, elem.ID) } + job := reorgInfo.Job taskMeta := &BackfillGlobalMeta{ Job: *reorgInfo.Job.Clone(), EleIDs: elemIDs, EleTypeKey: reorgInfo.currElement.TypeKey, - CloudStorageURI: variable.CloudStorageURI.Load(), + CloudStorageURI: w.jobContext(job.ID, job.ReorgMeta).cloudStorageURI, } metaData, err := json.Marshal(taskMeta) diff --git a/pkg/ddl/index_cop.go b/pkg/ddl/index_cop.go index 2e797591e806a..0266143c94eef 100644 --- a/pkg/ddl/index_cop.go +++ b/pkg/ddl/index_cop.go @@ -365,10 +365,9 @@ func constructTableScanPB(sCtx sessionctx.Context, tblInfo *model.TableInfo, col } func extractDatumByOffsets(row chunk.Row, offsets []int, expCols []*expression.Column, buf []types.Datum) []types.Datum { - for _, offset := range offsets { + for i, offset := range offsets { c := expCols[offset] - rowDt := row.GetDatum(offset, c.GetType()) - buf = append(buf, rowDt) + row.DatumWithBuffer(offset, c.GetType(), &buf[i]) } return buf } diff --git a/pkg/ddl/index_cop_test.go b/pkg/ddl/index_cop_test.go index e81f2703798ef..6b33e3bb5037e 100644 --- a/pkg/ddl/index_cop_test.go +++ b/pkg/ddl/index_cop_test.go @@ -53,11 +53,16 @@ func TestAddIndexFetchRowsFromCoprocessor(t *testing.T) { iter := chunk.NewIterator4Chunk(copChunk) handles := make([]kv.Handle, 0, copChunk.NumRows()) values := make([][]types.Datum, 0, copChunk.NumRows()) + handleDataBuf := make([]types.Datum, len(copCtx.GetBase().HandleOutputOffsets)) + idxDataBuf := make([]types.Datum, len(idxInfo.Columns)) + for row := iter.Begin(); row != iter.End(); row = iter.Next() { - handle, idxDatum, err := ddl.ConvertRowToHandleAndIndexDatum(row, copCtx, idxInfo.ID) + handle, idxDatum, err := ddl.ConvertRowToHandleAndIndexDatum(handleDataBuf, idxDataBuf, row, copCtx, idxInfo.ID) require.NoError(t, err) handles = append(handles, handle) - values = append(values, idxDatum) + copiedIdxDatum := make([]types.Datum, len(idxDatum)) + copy(copiedIdxDatum, idxDatum) + values = append(values, copiedIdxDatum) } return handles, values } diff --git a/pkg/ddl/multi_schema_change.go b/pkg/ddl/multi_schema_change.go index 1380aeb25241f..9956446910e7a 100644 --- a/pkg/ddl/multi_schema_change.go +++ b/pkg/ddl/multi_schema_change.go @@ -198,6 +198,7 @@ func appendToSubJobs(m *model.MultiSchemaInfo, job *model.Job) error { Revertible: true, CtxVars: job.CtxVars, ReorgTp: reorgTp, + UseCloud: false, }) return nil } diff --git a/pkg/executor/admin.go b/pkg/executor/admin.go index 7741bbdd7680f..ebe370ecd5da5 100644 --- a/pkg/executor/admin.go +++ b/pkg/executor/admin.go @@ -447,7 +447,7 @@ func (e *RecoverIndexExec) batchMarkDup(txn kv.Transaction, rows []recoverRows) if cnt < len(e.idxKeyBufs) { buf = e.idxKeyBufs[cnt] } - key, _, distinct, err := iter.Next(buf) + key, _, distinct, err := iter.Next(buf, nil) if err != nil { return err } diff --git a/pkg/executor/batch_checker.go b/pkg/executor/batch_checker.go index 5463ea8916586..0970145c7dae6 100644 --- a/pkg/executor/batch_checker.go +++ b/pkg/executor/batch_checker.go @@ -192,7 +192,7 @@ func getKeysNeedCheckOneRow(ctx sessionctx.Context, t table.Table, row []types.D // due to we only care about distinct key. iter := v.GenIndexKVIter(ctx.GetSessionVars().StmtCtx, colVals, kv.IntHandle(0), nil) for iter.Valid() { - key, _, distinct, err1 := iter.Next(nil) + key, _, distinct, err1 := iter.Next(nil, nil) if err1 != nil { return nil, err1 } diff --git a/pkg/executor/executor.go b/pkg/executor/executor.go index d94a7b924fed4..0f742760ac641 100644 --- a/pkg/executor/executor.go +++ b/pkg/executor/executor.go @@ -520,10 +520,17 @@ func (e *DDLJobRetriever) appendJobToChunk(req *chunk.Chunk, job *model.Job, che func showAddIdxReorgTp(job *model.Job) string { if job.Type == model.ActionAddIndex || job.Type == model.ActionAddPrimaryKey { if job.ReorgMeta != nil { + sb := strings.Builder{} tp := job.ReorgMeta.ReorgTp.String() if len(tp) > 0 { - return " /* " + tp + " */" + sb.WriteString(" /* ") + sb.WriteString(tp) + if job.ReorgMeta.ReorgTp == model.ReorgTypeLitMerge && job.ReorgMeta.UseCloudStorage { + sb.WriteString(" cloud") + } + sb.WriteString(" */") } + return sb.String() } } return "" @@ -531,10 +538,17 @@ func showAddIdxReorgTp(job *model.Job) string { func showAddIdxReorgTpInSubJob(subJob *model.SubJob) string { if subJob.Type == model.ActionAddIndex || subJob.Type == model.ActionAddPrimaryKey { + sb := strings.Builder{} tp := subJob.ReorgTp.String() if len(tp) > 0 { - return " /* " + tp + " */" + sb.WriteString(" /* ") + sb.WriteString(tp) + if subJob.ReorgTp == model.ReorgTypeLitMerge && subJob.UseCloud { + sb.WriteString(" cloud") + } + sb.WriteString(" */") } + return sb.String() } return "" } diff --git a/pkg/parser/model/ddl.go b/pkg/parser/model/ddl.go index ec12b06bde7ae..f0aa1af73e1c8 100644 --- a/pkg/parser/model/ddl.go +++ b/pkg/parser/model/ddl.go @@ -301,6 +301,7 @@ type SubJob struct { CtxVars []interface{} `json:"-"` SchemaVer int64 `json:"schema_version"` ReorgTp ReorgType `json:"reorg_tp"` + UseCloud bool `json:"use_cloud"` } // IsNormal returns true if the sub-job is normally running. @@ -369,6 +370,7 @@ func (sub *SubJob) FromProxyJob(proxyJob *Job, ver int64) { sub.RowCount = proxyJob.RowCount sub.SchemaVer = ver sub.ReorgTp = proxyJob.ReorgMeta.ReorgTp + sub.UseCloud = proxyJob.ReorgMeta.UseCloudStorage } // JobMeta is meta info of Job. diff --git a/pkg/parser/model/reorg.go b/pkg/parser/model/reorg.go index 25cd9f287ac5a..68a9f27a0d374 100644 --- a/pkg/parser/model/reorg.go +++ b/pkg/parser/model/reorg.go @@ -29,6 +29,7 @@ type DDLReorgMeta struct { Location *TimeZoneLocation `json:"location"` ReorgTp ReorgType `json:"reorg_tp"` IsDistReorg bool `json:"is_dist_reorg"` + UseCloudStorage bool `json:"use_cloud_storage"` ResourceGroupName string `json:"resource_group_name"` Version int64 `json:"version"` } diff --git a/pkg/store/mockstore/unistore/tikv/mvcc.go b/pkg/store/mockstore/unistore/tikv/mvcc.go index 4a1fcf34b784b..7f8f8eb9104e3 100644 --- a/pkg/store/mockstore/unistore/tikv/mvcc.go +++ b/pkg/store/mockstore/unistore/tikv/mvcc.go @@ -1000,6 +1000,7 @@ func encodeFromOldRow(oldRow, buf []byte) ([]byte, error) { datums = append(datums, d) } var encoder rowcodec.Encoder + buf = buf[:0] return encoder.Encode(stmtctx.NewStmtCtx(), colIDs, datums, buf) } diff --git a/pkg/table/index.go b/pkg/table/index.go index a1f239e71ac6e..9849cb2c0e4f3 100644 --- a/pkg/table/index.go +++ b/pkg/table/index.go @@ -69,12 +69,6 @@ func WithCtx(ctx context.Context) CreateIdxOptFunc { } } -// IndexIter is index kvs iter. -type IndexIter interface { - Next(kb []byte) ([]byte, []byte, bool, error) - Valid() bool -} - // Index is the interface for index data on KV store. type Index interface { // Meta returns IndexInfo. @@ -86,15 +80,79 @@ type Index interface { // Delete supports delete from statement. Delete(sc *stmtctx.StatementContext, txn kv.Transaction, indexedValues []types.Datum, h kv.Handle) error // GenIndexKVIter generate index key and value for multi-valued index, use iterator to reduce the memory allocation. - GenIndexKVIter(sc *stmtctx.StatementContext, indexedValue []types.Datum, h kv.Handle, handleRestoreData []types.Datum) IndexIter + GenIndexKVIter(sc *stmtctx.StatementContext, indexedValue []types.Datum, h kv.Handle, handleRestoreData []types.Datum) IndexKVGenerator // Exist supports check index exists or not. Exist(sc *stmtctx.StatementContext, txn kv.Transaction, indexedValues []types.Datum, h kv.Handle) (bool, kv.Handle, error) // GenIndexKey generates an index key. If the index is a multi-valued index, use GenIndexKVIter instead. GenIndexKey(sc *stmtctx.StatementContext, indexedValues []types.Datum, h kv.Handle, buf []byte) (key []byte, distinct bool, err error) // GenIndexValue generates an index value. - GenIndexValue(sc *stmtctx.StatementContext, distinct bool, indexedValues []types.Datum, h kv.Handle, restoredData []types.Datum) ([]byte, error) + GenIndexValue(sc *stmtctx.StatementContext, distinct bool, indexedValues []types.Datum, h kv.Handle, restoredData []types.Datum, buf []byte) ([]byte, error) // FetchValues fetched index column values in a row. // Param columns is a reused buffer, if it is not nil, FetchValues will fill the index values in it, // and return the buffer, if it is nil, FetchValues will allocate the buffer instead. FetchValues(row []types.Datum, columns []types.Datum) ([]types.Datum, error) } + +// IndexKVGenerator generates kv for an index. +// It could be also used for generating multi-value indexes. +type IndexKVGenerator struct { + index Index + sCtx *stmtctx.StatementContext + handle kv.Handle + handleRestoreData []types.Datum + + // Only used by multi-value index. + allIdxVals [][]types.Datum + i int + // Only used by non multi-value index. + idxVals []types.Datum +} + +// NewIndexKVGenerator creates a new IndexKVGenerator. +func NewIndexKVGenerator( + index Index, + stmtCtx *stmtctx.StatementContext, + handle kv.Handle, + handleRestoredData []types.Datum, + mvIndexData [][]types.Datum, + singleIdxData []types.Datum, +) IndexKVGenerator { + return IndexKVGenerator{ + index: index, + sCtx: stmtCtx, + handle: handle, + handleRestoreData: handleRestoredData, + allIdxVals: mvIndexData, + i: 0, + idxVals: singleIdxData, + } +} + +// Next returns the next index key and value. +// For non multi-value indexes, there is only one index kv. +func (iter *IndexKVGenerator) Next(keyBuf, valBuf []byte) ([]byte, []byte, bool, error) { + var val []types.Datum + if len(iter.allIdxVals) == 0 { + val = iter.idxVals + } else { + val = iter.allIdxVals[iter.i] + } + key, distinct, err := iter.index.GenIndexKey(iter.sCtx, val, iter.handle, keyBuf) + if err != nil { + return nil, nil, false, err + } + idxVal, err := iter.index.GenIndexValue(iter.sCtx, distinct, val, iter.handle, iter.handleRestoreData, valBuf) + if err != nil { + return nil, nil, false, err + } + iter.i++ + return key, idxVal, distinct, err +} + +// Valid returns true if the generator is not exhausted. +func (iter *IndexKVGenerator) Valid() bool { + if len(iter.allIdxVals) == 0 { + return iter.i == 0 + } + return iter.i < len(iter.allIdxVals) +} diff --git a/pkg/table/tables/index.go b/pkg/table/tables/index.go index f15017bc49868..a8ceeec60d571 100644 --- a/pkg/table/tables/index.go +++ b/pkg/table/tables/index.go @@ -97,11 +97,12 @@ func (c *index) GenIndexKey(sc *stmtctx.StatementContext, indexedValues []types. } // GenIndexValue generates the index value. -func (c *index) GenIndexValue(sc *stmtctx.StatementContext, distinct bool, indexedValues []types.Datum, h kv.Handle, restoredData []types.Datum) ([]byte, error) { +func (c *index) GenIndexValue(sc *stmtctx.StatementContext, distinct bool, indexedValues []types.Datum, + h kv.Handle, restoredData []types.Datum, buf []byte) ([]byte, error) { c.initNeedRestoreData.Do(func() { c.needRestoredData = NeedRestoredData(c.idxInfo.Columns, c.tblInfo.Columns) }) - return tablecodec.GenIndexValuePortal(sc, c.tblInfo, c.idxInfo, c.needRestoredData, distinct, false, indexedValues, h, c.phyTblID, restoredData) + return tablecodec.GenIndexValuePortal(sc, c.tblInfo, c.idxInfo, c.needRestoredData, distinct, false, indexedValues, h, c.phyTblID, restoredData, buf) } // getIndexedValue will produce the result like: @@ -232,7 +233,8 @@ func (c *index) Create(sctx sessionctx.Context, txn kv.Transaction, indexedValue c.initNeedRestoreData.Do(func() { c.needRestoredData = NeedRestoredData(c.idxInfo.Columns, c.tblInfo.Columns) }) - idxVal, err := tablecodec.GenIndexValuePortal(sctx.GetSessionVars().StmtCtx, c.tblInfo, c.idxInfo, c.needRestoredData, distinct, opt.Untouched, value, h, c.phyTblID, handleRestoreData) + idxVal, err := tablecodec.GenIndexValuePortal(sctx.GetSessionVars().StmtCtx, c.tblInfo, c.idxInfo, + c.needRestoredData, distinct, opt.Untouched, value, h, c.phyTblID, handleRestoreData, nil) if err != nil { return nil, err } @@ -470,44 +472,13 @@ func (c *index) Delete(sc *stmtctx.StatementContext, txn kv.Transaction, indexed return nil } -func (c *index) GenIndexKVIter(sc *stmtctx.StatementContext, indexedValue []types.Datum, h kv.Handle, handleRestoreData []types.Datum) table.IndexIter { - indexedValues := c.getIndexedValue(indexedValue) - return &indexGenerator{ - c: c, - sctx: sc, - indexedVals: indexedValues, - h: h, - handleRestoreData: handleRestoreData, - i: 0, - } -} - -type indexGenerator struct { - c *index - sctx *stmtctx.StatementContext - indexedVals [][]types.Datum - h kv.Handle - handleRestoreData []types.Datum - - i int -} - -func (s *indexGenerator) Next(kb []byte) ([]byte, []byte, bool, error) { - val := s.indexedVals[s.i] - key, distinct, err := s.c.GenIndexKey(s.sctx, val, s.h, kb) - if err != nil { - return nil, nil, false, err - } - idxVal, err := s.c.GenIndexValue(s.sctx, distinct, val, s.h, s.handleRestoreData) - if err != nil { - return nil, nil, false, err +func (c *index) GenIndexKVIter(sc *stmtctx.StatementContext, indexedValue []types.Datum, + h kv.Handle, handleRestoreData []types.Datum) table.IndexKVGenerator { + var mvIndexValues [][]types.Datum + if c.Meta().MVIndex { + mvIndexValues = c.getIndexedValue(indexedValue) } - s.i++ - return key, idxVal, distinct, err -} - -func (s *indexGenerator) Valid() bool { - return s.i < len(s.indexedVals) + return table.NewIndexKVGenerator(c, sc, h, handleRestoreData, mvIndexValues, indexedValue) } const ( diff --git a/pkg/table/tables/mutation_checker_test.go b/pkg/table/tables/mutation_checker_test.go index bca9cbb53640d..2f73e4cab39c6 100644 --- a/pkg/table/tables/mutation_checker_test.go +++ b/pkg/table/tables/mutation_checker_test.go @@ -335,7 +335,7 @@ func buildIndexKeyValue(index table.Index, rowToInsert []types.Datum, sessVars * rsData := TryGetHandleRestoredDataWrapper(table.meta, rowToInsert, nil, indexInfo) value, err := tablecodec.GenIndexValuePortal( sessVars.StmtCtx, &tableInfo, indexInfo, NeedRestoredData(indexInfo.Columns, tableInfo.Columns), - distinct, false, indexedValues, handle, 0, rsData, + distinct, false, indexedValues, handle, 0, rsData, nil, ) if err != nil { return nil, nil, err diff --git a/pkg/tablecodec/tablecodec.go b/pkg/tablecodec/tablecodec.go index 8cfd7cfb112ce..723426d7356bd 100644 --- a/pkg/tablecodec/tablecodec.go +++ b/pkg/tablecodec/tablecodec.go @@ -340,6 +340,7 @@ func EncodeRow(sc *stmtctx.StatementContext, row []types.Datum, colIDs []int64, return nil, errors.Errorf("EncodeRow error: data and columnID count not match %d vs %d", len(row), len(colIDs)) } if e.Enable { + valBuf = valBuf[:0] return e.Encode(sc, colIDs, row, valBuf, checksums...) } return EncodeOldRow(sc, row, colIDs, valBuf, values) @@ -1137,7 +1138,11 @@ func GenIndexKey(sc *stmtctx.StatementContext, tblInfo *model.TableInfo, idxInfo } if !distinct && h != nil { if h.IsInt() { - key, err = codec.EncodeKey(sc, key, types.NewDatum(h.IntValue())) + // We choose the efficient path here instead of calling `codec.EncodeKey` + // because the int handle must be an int64, and it must be comparable. + // This remains correct until codec.encodeSignedInt is changed. + key = append(key, codec.IntHandleFlag) + key = codec.EncodeInt(key, h.IntValue()) } else { key = append(key, h.Encoded()...) } @@ -1423,11 +1428,13 @@ func TempIndexValueIsUntouched(b []byte) bool { // | In v5.0, restored data contains only non-binary data(except for char and _bin). In the above example, the restored data contains only the value of b. // | Besides, if the collation of b is _bin, then restored data is an integer indicate the spaces are truncated. Then we use sortKey // | and the restored data together to restore original data. -func GenIndexValuePortal(sc *stmtctx.StatementContext, tblInfo *model.TableInfo, idxInfo *model.IndexInfo, needRestoredData bool, distinct bool, untouched bool, indexedValues []types.Datum, h kv.Handle, partitionID int64, restoredData []types.Datum) ([]byte, error) { +func GenIndexValuePortal(sc *stmtctx.StatementContext, tblInfo *model.TableInfo, idxInfo *model.IndexInfo, + needRestoredData bool, distinct bool, untouched bool, indexedValues []types.Datum, h kv.Handle, + partitionID int64, restoredData []types.Datum, buf []byte) ([]byte, error) { if tblInfo.IsCommonHandle && tblInfo.CommonHandleVersion == 1 { - return GenIndexValueForClusteredIndexVersion1(sc, tblInfo, idxInfo, needRestoredData, distinct, untouched, indexedValues, h, partitionID, restoredData) + return GenIndexValueForClusteredIndexVersion1(sc, tblInfo, idxInfo, needRestoredData, distinct, untouched, indexedValues, h, partitionID, restoredData, buf) } - return genIndexValueVersion0(sc, tblInfo, idxInfo, needRestoredData, distinct, untouched, indexedValues, h, partitionID) + return genIndexValueVersion0(sc, tblInfo, idxInfo, needRestoredData, distinct, untouched, indexedValues, h, partitionID, buf) } // TryGetCommonPkColumnRestoredIds get the IDs of primary key columns which need restored data if the table has common handle. @@ -1453,8 +1460,15 @@ func TryGetCommonPkColumnRestoredIds(tbl *model.TableInfo) []int64 { } // GenIndexValueForClusteredIndexVersion1 generates the index value for the clustered index with version 1(New in v5.0.0). -func GenIndexValueForClusteredIndexVersion1(sc *stmtctx.StatementContext, tblInfo *model.TableInfo, idxInfo *model.IndexInfo, idxValNeedRestoredData bool, distinct bool, untouched bool, indexedValues []types.Datum, h kv.Handle, partitionID int64, handleRestoredData []types.Datum) ([]byte, error) { - idxVal := make([]byte, 0) +func GenIndexValueForClusteredIndexVersion1(sc *stmtctx.StatementContext, tblInfo *model.TableInfo, idxInfo *model.IndexInfo, + idxValNeedRestoredData bool, distinct bool, untouched bool, indexedValues []types.Datum, h kv.Handle, + partitionID int64, handleRestoredData []types.Datum, buf []byte) ([]byte, error) { + var idxVal []byte + if buf == nil { + idxVal = make([]byte, 0) + } else { + idxVal = buf[:0] + } idxVal = append(idxVal, 0) tailLen := 0 // Version info. @@ -1494,11 +1508,11 @@ func GenIndexValueForClusteredIndexVersion1(sc *stmtctx.StatementContext, tblInf } rd := rowcodec.Encoder{Enable: true} - rowRestoredValue, err := rd.Encode(sc, colIds, allRestoredData, nil) + var err error + idxVal, err = rd.Encode(sc, colIds, allRestoredData, idxVal) if err != nil { return nil, err } - idxVal = append(idxVal, rowRestoredValue...) } if untouched { @@ -1511,8 +1525,15 @@ func GenIndexValueForClusteredIndexVersion1(sc *stmtctx.StatementContext, tblInf } // genIndexValueVersion0 create index value for both local and global index. -func genIndexValueVersion0(sc *stmtctx.StatementContext, tblInfo *model.TableInfo, idxInfo *model.IndexInfo, idxValNeedRestoredData bool, distinct bool, untouched bool, indexedValues []types.Datum, h kv.Handle, partitionID int64) ([]byte, error) { - idxVal := make([]byte, 0) +func genIndexValueVersion0(sc *stmtctx.StatementContext, tblInfo *model.TableInfo, idxInfo *model.IndexInfo, + idxValNeedRestoredData bool, distinct bool, untouched bool, indexedValues []types.Datum, h kv.Handle, + partitionID int64, buf []byte) ([]byte, error) { + var idxVal []byte + if buf == nil { + idxVal = make([]byte, 0) + } else { + idxVal = buf[:0] + } idxVal = append(idxVal, 0) newEncode := false tailLen := 0 @@ -1530,11 +1551,12 @@ func genIndexValueVersion0(sc *stmtctx.StatementContext, tblInfo *model.TableInf colIds[i] = tblInfo.Columns[col.Offset].ID } rd := rowcodec.Encoder{Enable: true} - rowRestoredValue, err := rd.Encode(sc, colIds, indexedValues, nil) + // Encode row restored value. + var err error + idxVal, err = rd.Encode(sc, colIds, indexedValues, idxVal) if err != nil { return nil, err } - idxVal = append(idxVal, rowRestoredValue...) newEncode = true } @@ -1559,7 +1581,11 @@ func genIndexValueVersion0(sc *stmtctx.StatementContext, tblInfo *model.TableInf idxVal[0] = byte(tailLen) } else { // Old index value encoding. - idxVal = make([]byte, 0) + if buf == nil { + idxVal = make([]byte, 0) + } else { + idxVal = buf[:0] + } if distinct { idxVal = EncodeHandleInUniqueIndexValue(h, untouched) } @@ -1570,7 +1596,7 @@ func genIndexValueVersion0(sc *stmtctx.StatementContext, tblInfo *model.TableInf idxVal = append(idxVal, kv.UnCommitIndexKVFlag) } if len(idxVal) == 0 { - idxVal = []byte{'0'} + idxVal = append(idxVal, byte('0')) } } return idxVal, nil diff --git a/pkg/util/codec/codec.go b/pkg/util/codec/codec.go index 1c9519280d8d4..e6a3093cadcd4 100644 --- a/pkg/util/codec/codec.go +++ b/pkg/util/codec/codec.go @@ -51,6 +51,9 @@ const ( maxFlag byte = 250 ) +// IntHandleFlag is only used to encode int handle key. +const IntHandleFlag = intFlag + const ( sizeUint64 = unsafe.Sizeof(uint64(0)) sizeFloat64 = unsafe.Sizeof(float64(0)) diff --git a/pkg/util/rowcodec/bench_test.go b/pkg/util/rowcodec/bench_test.go index a5bc6bc0bc228..ef5698f27f9a7 100644 --- a/pkg/util/rowcodec/bench_test.go +++ b/pkg/util/rowcodec/bench_test.go @@ -57,6 +57,7 @@ func BenchmarkEncode(b *testing.B) { colIDs := []int64{1, 2, 3} var err error for i := 0; i < b.N; i++ { + buf = buf[:0] buf, err = xb.Encode(nil, colIDs, oldRow, buf) if err != nil { b.Fatal(err) diff --git a/pkg/util/rowcodec/encoder.go b/pkg/util/rowcodec/encoder.go index 92fbd566c96cb..26f746e2198a7 100644 --- a/pkg/util/rowcodec/encoder.go +++ b/pkg/util/rowcodec/encoder.go @@ -37,6 +37,7 @@ type Encoder struct { } // Encode encodes a row from a datums slice. +// `buf` is not truncated before encoding. func (encoder *Encoder) Encode(sc *stmtctx.StatementContext, colIDs []int64, values []types.Datum, buf []byte, checksums ...uint32) ([]byte, error) { encoder.reset() encoder.appendColVals(colIDs, values) @@ -46,7 +47,7 @@ func (encoder *Encoder) Encode(sc *stmtctx.StatementContext, colIDs []int64, val return nil, err } encoder.setChecksums(checksums...) - return encoder.row.toBytes(buf[:0]), nil + return encoder.row.toBytes(buf), nil } func (encoder *Encoder) reset() {