Skip to content

Commit

Permalink
Merge branch 'master' into dev_index_usage
Browse files Browse the repository at this point in the history
  • Loading branch information
rebelice authored Sep 30, 2020
2 parents 840b448 + 03c6b5a commit 0533201
Show file tree
Hide file tree
Showing 199 changed files with 107,126 additions and 2,908 deletions.
5 changes: 2 additions & 3 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
# Contribution Guide
# Contributing Guide

See the [Contribution Guide](https://github.com/pingcap/community/blob/master/CONTRIBUTING.md) in the
[community](https://github.com/pingcap/community) repo.
See the [Contributing Guide](https://github.com/pingcap/community/blob/master/contributors/README.md) in the [community](https://github.com/pingcap/community) repository.
408 changes: 249 additions & 159 deletions bindinfo/bind_test.go

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions cmd/explaintest/r/explain_easy.result
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ HashJoin_23 4166.67 root left outer join, equal:[eq(test.t1.c2, test.t2.c1)]
└─TableFullScan_34 10000.00 cop[tikv] table:t2 keep order:false, stats:pseudo
explain update t1 set t1.c2 = 2 where t1.c1 = 1;
id estRows task access object operator info
Update_2 N/A root N/A
Update_3 N/A root N/A
└─Point_Get_1 1.00 root table:t1 handle:1
explain delete from t1 where t1.c2 = 1;
id estRows task access object operator info
Expand Down Expand Up @@ -687,7 +687,7 @@ begin;
insert into t values (1, 1);
explain update t set j = -j where i = 1 and j = 1;
id estRows task access object operator info
Update_2 N/A root N/A
Update_3 N/A root N/A
└─Point_Get_1 1.00 root table:t, index:i(i, j)
rollback;
drop table if exists t;
Expand Down
2 changes: 1 addition & 1 deletion cmd/explaintest/r/explain_easy_stats.result
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ HashJoin_22 2481.25 root left outer join, equal:[eq(test.t1.c2, test.t2.c1)]
└─TableRangeScan_32 1998.00 cop[tikv] table:t1 range:(1,+inf], keep order:false
explain update t1 set t1.c2 = 2 where t1.c1 = 1;
id estRows task access object operator info
Update_2 N/A root N/A
Update_3 N/A root N/A
└─Point_Get_1 1.00 root table:t1 handle:1
explain delete from t1 where t1.c2 = 1;
id estRows task access object operator info
Expand Down
17 changes: 12 additions & 5 deletions cmd/explaintest/r/explain_indexmerge.result
Original file line number Diff line number Diff line change
Expand Up @@ -97,19 +97,26 @@ label = "cop"
}

set session tidb_enable_index_merge = off;
explain select /*+ use_index_merge(t, primary, tb, tc) */ * from t where a <= 500000 or b <= 1000000 or c <= 3000000;
id estRows task access object operator info
IndexMerge_9 3560000.00 root
├─TableRangeScan_5(Build) 500000.00 cop[tikv] table:t range:[-inf,500000], keep order:false
├─IndexRangeScan_6(Build) 1000000.00 cop[tikv] table:t, index:tb(b) range:[-inf,1000000], keep order:false
├─IndexRangeScan_7(Build) 3000000.00 cop[tikv] table:t, index:tc(c) range:[-inf,3000000], keep order:false
└─TableRowIDScan_8(Probe) 3560000.00 cop[tikv] table:t keep order:false
explain select /*+ use_index_merge(t, tb, tc) */ * from t where b < 50 or c < 5000000;
id estRows task access object operator info
IndexMerge_8 5000000.00 root
IndexMerge_8 4999999.00 root
├─IndexRangeScan_5(Build) 49.00 cop[tikv] table:t, index:tb(b) range:[-inf,50), keep order:false
├─IndexRangeScan_6(Build) 4999999.00 cop[tikv] table:t, index:tc(c) range:[-inf,5000000), keep order:false
└─TableRowIDScan_7(Probe) 5000000.00 cop[tikv] table:t keep order:false
└─TableRowIDScan_7(Probe) 4999999.00 cop[tikv] table:t keep order:false
explain select /*+ use_index_merge(t, tb, tc) */ * from t where (b < 10000 or c < 10000) and (a < 10 or d < 10) and f < 10;
id estRows task access object operator info
IndexMerge_9 0.00 root
├─IndexRangeScan_5(Build) 9999.00 cop[tikv] table:t, index:tb(b) range:[-inf,10000), keep order:false
├─IndexRangeScan_6(Build) 9999.00 cop[tikv] table:t, index:tc(c) range:[-inf,10000), keep order:false
└─Selection_8(Probe) 0.00 cop[tikv] lt(test.t.f, 10), or(lt(test.t.a, 10), lt(test.t.d, 10))
└─TableRowIDScan_7 19998.00 cop[tikv] table:t keep order:false
└─TableRowIDScan_7 19978.00 cop[tikv] table:t keep order:false
explain select /*+ use_index_merge(t, tb) */ * from t where b < 50 or c < 5000000;
id estRows task access object operator info
TableReader_7 4999999.00 root data:Selection_6
Expand All @@ -122,7 +129,7 @@ TableReader_7 4999999.00 root data:Selection_6
└─TableFullScan_5 5000000.00 cop[tikv] table:t keep order:false
explain select /*+ use_index_merge(t, primary, tb) */ * from t where a < 50 or b < 5000000;
id estRows task access object operator info
IndexMerge_8 5000000.00 root
IndexMerge_8 4999999.00 root
├─TableRangeScan_5(Build) 49.00 cop[tikv] table:t range:[-inf,50), keep order:false
├─IndexRangeScan_6(Build) 4999999.00 cop[tikv] table:t, index:tb(b) range:[-inf,5000000), keep order:false
└─TableRowIDScan_7(Probe) 5000000.00 cop[tikv] table:t keep order:false
└─TableRowIDScan_7(Probe) 4999999.00 cop[tikv] table:t keep order:false
2 changes: 1 addition & 1 deletion cmd/explaintest/r/partition_pruning.result
Original file line number Diff line number Diff line change
Expand Up @@ -1950,7 +1950,7 @@ partition p2 values less than (2000)
insert into t8 values ('1985-05-05'),('1995-05-05');
explain select * from t8 where a < '1980-02-02';
id estRows task access object operator info
TableReader_7 3323.33 root partition:all data:Selection_6
TableReader_7 3323.33 root partition:p0,p1 data:Selection_6
└─Selection_6 3323.33 cop[tikv] lt(test.t8.a, 1980-02-02 00:00:00.000000)
└─TableFullScan_5 10000.00 cop[tikv] table:t8 keep order:false, stats:pseudo
create table t9 (a date not null) partition by RANGE(TO_DAYS(a)) (
Expand Down
1 change: 1 addition & 0 deletions cmd/explaintest/t/explain_indexmerge.test
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ explain select * from t where (b < 10000 or c < 10000) and (a < 10 or d < 10) an
explain format="dot" select * from t where (a < 50 or b < 50) and f > 100;
set session tidb_enable_index_merge = off;
# be forced to use IndexMerge
explain select /*+ use_index_merge(t, primary, tb, tc) */ * from t where a <= 500000 or b <= 1000000 or c <= 3000000;
explain select /*+ use_index_merge(t, tb, tc) */ * from t where b < 50 or c < 5000000;
explain select /*+ use_index_merge(t, tb, tc) */ * from t where (b < 10000 or c < 10000) and (a < 10 or d < 10) and f < 10;
explain select /*+ use_index_merge(t, tb) */ * from t where b < 50 or c < 5000000;
Expand Down
25 changes: 18 additions & 7 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,11 +494,8 @@ type TiKVClient struct {
// and if no activity is seen even after that the connection is closed.
GrpcKeepAliveTimeout uint `toml:"grpc-keepalive-timeout" json:"grpc-keepalive-timeout"`
// CommitTimeout is the max time which command 'commit' will wait.
CommitTimeout string `toml:"commit-timeout" json:"commit-timeout"`
// EnableAsyncCommit enables async commit for all transactions.
EnableAsyncCommit bool `toml:"enable-async-commit" json:"enable-async-commit"`
AsyncCommitKeysLimit uint `toml:"async-commit-keys-limit" json:"async-commit-keys-limit"`

CommitTimeout string `toml:"commit-timeout" json:"commit-timeout"`
AsyncCommit AsyncCommit `toml:"async-commit" json:"async-commit"`
// MaxBatchSize is the max batch size when calling batch commands API.
MaxBatchSize uint `toml:"max-batch-size" json:"max-batch-size"`
// If TiKV load is greater than this, TiDB will wait for a while to avoid little batch.
Expand All @@ -522,6 +519,16 @@ type TiKVClient struct {
TTLRefreshedTxnSize int64 `toml:"ttl-refreshed-txn-size" json:"ttl-refreshed-txn-size"`
}

// AsyncCommit is the config for the async commit feature.
type AsyncCommit struct {
// Whether to enable the async commit feature.
Enable bool `toml:"enable" json:"enable"`
// Use async commit only if the number of keys does not exceed KeysLimit.
KeysLimit uint `toml:"keys-limit" json:"keys-limit"`
// Use async commit only if the total size of keys does not exceed TotalKeySizeLimit.
TotalKeySizeLimit uint64 `toml:"total-key-size-limit" json:"total-key-size-limit"`
}

// CoprocessorCache is the config for coprocessor cache.
type CoprocessorCache struct {
// Whether to enable the copr cache. The copr cache saves the result from TiKV Coprocessor in the memory and
Expand Down Expand Up @@ -688,8 +695,12 @@ var defaultConf = Config{
GrpcKeepAliveTime: 10,
GrpcKeepAliveTimeout: 3,
CommitTimeout: "41s",
EnableAsyncCommit: false,
AsyncCommitKeysLimit: 256,
AsyncCommit: AsyncCommit{
Enable: false,
// FIXME: Find an appropriate default limit.
KeysLimit: 256,
TotalKeySizeLimit: 4 * 1024, // 4 KiB
},

MaxBatchSize: 128,
OverloadThreshold: 200,
Expand Down
16 changes: 10 additions & 6 deletions config/config.toml.example
Original file line number Diff line number Diff line change
Expand Up @@ -361,12 +361,6 @@ grpc-keepalive-timeout = 3
# Max time for commit command, must be twice bigger than raft election timeout.
commit-timeout = "41s"

# Enable async commit for all transactions.
enable-async-commit = false
# The maximum allowed keys in a async commit transaction. Transactions with more keys than the limit
# will be committed with normal 2PC way.
async-commit-keys-limit = 256

# Max batch size in gRPC.
max-batch-size = 128
# Overload threshold of TiKV.
Expand Down Expand Up @@ -395,6 +389,16 @@ store-liveness-timeout = "5s"
# If the size(in byte) of a transaction is large than `ttl-refreshed-txn-size`, it update the lock TTL during the 2PC.
ttl-refreshed-txn-size = 33554432

[tikv-client.async-commit]
# Whether to enable the async commit feature. This feature reduces the latency of the two-phase commit.
enable = false
# The maximum allowed keys in an async commit transaction. Transactions with more keys than the limit
# will be committed with normal 2PC way.
keys-limit = 256
# The maximum length total of keys in bytes. Transactions will be committed with the normal 2PC way
# if the limit is exceeded.
total-key-size-limit = 4096

[tikv-client.copr-cache]
# Whether to enable the copr cache. The copr cache saves the result from TiKV Coprocessor in the memory and
# reuses the result when corresponding data in TiKV is unchanged, on a region basis.
Expand Down
11 changes: 7 additions & 4 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,12 +196,14 @@ deprecate-integer-display-length = true
txn-total-size-limit=2000
[tikv-client]
commit-timeout="41s"
enable-async-commit=true
async-commit-keys-limit=123
max-batch-size=128
region-cache-ttl=6000
store-limit=0
ttl-refreshed-txn-size=8192
[tikv-client.async-commit]
enable=true
keys-limit=123
total-key-size-limit=1024
[stmt-summary]
enable=false
enable-internal-query=true
Expand Down Expand Up @@ -235,8 +237,9 @@ spilled-file-encryption-method = "plaintext"
c.Assert(conf.AlterPrimaryKey, Equals, true)

c.Assert(conf.TiKVClient.CommitTimeout, Equals, "41s")
c.Assert(conf.TiKVClient.EnableAsyncCommit, Equals, true)
c.Assert(conf.TiKVClient.AsyncCommitKeysLimit, Equals, uint(123))
c.Assert(conf.TiKVClient.AsyncCommit.Enable, Equals, true)
c.Assert(conf.TiKVClient.AsyncCommit.KeysLimit, Equals, uint(123))
c.Assert(conf.TiKVClient.AsyncCommit.TotalKeySizeLimit, Equals, uint64(1024))
c.Assert(conf.TiKVClient.MaxBatchSize, Equals, uint(128))
c.Assert(conf.TiKVClient.RegionCacheTTL, Equals, uint(6000))
c.Assert(conf.TiKVClient.StoreLimit, Equals, int64(0))
Expand Down
56 changes: 44 additions & 12 deletions ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/terror"
ddlutil "github.com/pingcap/tidb/ddl/util"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/kv"
Expand Down Expand Up @@ -71,10 +72,12 @@ type backfillResult struct {
// backfillTaskContext is the context of the batch adding indices or updating column values.
// After finishing the batch adding indices or updating column values, result in backfillTaskContext will be merged into backfillResult.
type backfillTaskContext struct {
nextHandle kv.Handle
done bool
addedCount int
scanCount int
nextHandle kv.Handle
done bool
addedCount int
scanCount int
warnings map[errors.ErrorID]*terror.Error
warningsCount map[errors.ErrorID]int64
}

type backfillWorker struct {
Expand Down Expand Up @@ -130,7 +133,7 @@ func (r *reorgBackfillTask) String() string {
if r.endIncluded {
rightParenthesis = "]"
}
return "physicalTableID" + strconv.FormatInt(r.physicalTableID, 10) + "_" + "[" + r.startHandle.String() + "," + r.endHandle.String() + rightParenthesis
return "physicalTableID_" + strconv.FormatInt(r.physicalTableID, 10) + "_" + "[" + r.startHandle.String() + "," + r.endHandle.String() + rightParenthesis
}

func logSlowOperations(elapsed time.Duration, slowMsg string, threshold uint32) {
Expand All @@ -150,10 +153,26 @@ func mergeBackfillCtxToResult(taskCtx *backfillTaskContext, result *backfillResu
result.scanCount += taskCtx.scanCount
}

func mergeWarningsAndWarningsCount(partWarnings, totalWarnings map[errors.ErrorID]*terror.Error, partWarningsCount, totalWarningsCount map[errors.ErrorID]int64) (map[errors.ErrorID]*terror.Error, map[errors.ErrorID]int64) {
for _, warn := range partWarnings {
if _, ok := totalWarningsCount[warn.ID()]; ok {
totalWarningsCount[warn.ID()] += partWarningsCount[warn.ID()]
} else {
totalWarningsCount[warn.ID()] = partWarningsCount[warn.ID()]
totalWarnings[warn.ID()] = warn
}
}
return totalWarnings, totalWarningsCount
}

// handleBackfillTask backfills range [task.startHandle, task.endHandle) handle's index to table.
func (w *backfillWorker) handleBackfillTask(d *ddlCtx, task *reorgBackfillTask, bf backfiller) *backfillResult {
handleRange := *task
result := &backfillResult{addedCount: 0, nextHandle: handleRange.startHandle, err: nil}
result := &backfillResult{
err: nil,
addedCount: 0,
nextHandle: handleRange.startHandle,
}
lastLogCount := 0
lastLogTime := time.Now()
startTime := lastLogTime
Expand All @@ -177,7 +196,17 @@ func (w *backfillWorker) handleBackfillTask(d *ddlCtx, task *reorgBackfillTask,

bf.AddMetricInfo(float64(taskCtx.addedCount))
mergeBackfillCtxToResult(&taskCtx, result)

// Although `handleRange` is for data in one region, but back fill worker still split it into many
// small reorg batch size slices and reorg them in many different kv txn.
// If a task failed, it may contained some committed small kv txn which has already finished the
// small range reorganization.
// In the next round of reorganization, the target handle range may overlap with last committed
// small ranges. This will cause the `redo` action in reorganization.
// So for added count and warnings collection, it is recommended to collect the statistics in every
// successfully committed small ranges rather than fetching it in the total result.
w.ddlWorker.reorgCtx.increaseRowCount(int64(taskCtx.addedCount))
w.ddlWorker.reorgCtx.mergeWarnings(taskCtx.warnings, taskCtx.warningsCount)

if num := result.scanCount - lastLogCount; num >= 30000 {
lastLogCount = result.scanCount
Expand Down Expand Up @@ -303,11 +332,10 @@ func (w *worker) handleReorgTasks(reorgInfo *reorgInfo, totalAddedCount *int64,

if err != nil {
// Update the reorg handle that has been processed.
err1 := kv.RunInNewTxn(reorgInfo.d.store, true, func(txn kv.Transaction) error {
return errors.Trace(reorgInfo.UpdateReorgMeta(txn, nextHandle, reorgInfo.EndHandle, reorgInfo.PhysicalTableID))
})
err1 := reorgInfo.UpdateReorgMeta(nextHandle)
metrics.BatchAddIdxHistogram.WithLabelValues(metrics.LblError).Observe(elapsedTime.Seconds())
logutil.BgLogger().Warn("[ddl] backfill worker handle batch tasks failed",
zap.ByteString("elementType", reorgInfo.currElement.TypeKey), zap.Int64("elementID", reorgInfo.currElement.ID),
zap.Int64("totalAddedCount", *totalAddedCount), zap.String("startHandle", toString(startHandle)),
zap.String("nextHandle", toString(nextHandle)), zap.Int64("batchAddedCount", taskAddedCount),
zap.String("taskFailedError", err.Error()), zap.String("takeTime", elapsedTime.String()),
Expand All @@ -318,7 +346,9 @@ func (w *worker) handleReorgTasks(reorgInfo *reorgInfo, totalAddedCount *int64,
// nextHandle will be updated periodically in runReorgJob, so no need to update it here.
w.reorgCtx.setNextHandle(nextHandle)
metrics.BatchAddIdxHistogram.WithLabelValues(metrics.LblOK).Observe(elapsedTime.Seconds())
logutil.BgLogger().Info("[ddl] backfill worker handle batch tasks successful", zap.Int64("totalAddedCount", *totalAddedCount), zap.String("startHandle", toString(startHandle)),
logutil.BgLogger().Info("[ddl] backfill workers successfully processed batch",
zap.ByteString("elementType", reorgInfo.currElement.TypeKey), zap.Int64("elementID", reorgInfo.currElement.ID),
zap.Int64("totalAddedCount", *totalAddedCount), zap.String("startHandle", toString(startHandle)),
zap.String("nextHandle", toString(nextHandle)), zap.Int64("batchAddedCount", taskAddedCount), zap.String("takeTime", elapsedTime.String()))
return nil
}
Expand Down Expand Up @@ -385,6 +415,8 @@ var (
TestCheckWorkerNumCh = make(chan struct{})
// TestCheckWorkerNumber use for test adjust backfill worker.
TestCheckWorkerNumber = int32(16)
// TestCheckReorgTimeout is used to mock timeout when reorg data.
TestCheckReorgTimeout = int32(0)
)

func loadDDLReorgVars(w *worker) error {
Expand Down Expand Up @@ -481,12 +513,12 @@ func (w *worker) writePhysicalTableRecord(t table.PhysicalTable, bfWorkerType ba
sessCtx.GetSessionVars().StmtCtx.IsDDLJobInQueue = true

if bfWorkerType == typeAddIndexWorker {
idxWorker := newAddIndexWorker(sessCtx, w, i, t, indexInfo, decodeColMap)
idxWorker := newAddIndexWorker(sessCtx, w, i, t, indexInfo, decodeColMap, reorgInfo.ReorgMeta.SQLMode)
idxWorker.priority = job.Priority
backfillWorkers = append(backfillWorkers, idxWorker.backfillWorker)
go idxWorker.backfillWorker.run(reorgInfo.d, idxWorker)
} else {
updateWorker := newUpdateColumnWorker(sessCtx, w, i, t, oldColInfo, colInfo, decodeColMap)
updateWorker := newUpdateColumnWorker(sessCtx, w, i, t, oldColInfo, colInfo, decodeColMap, reorgInfo.ReorgMeta.SQLMode)
updateWorker.priority = job.Priority
backfillWorkers = append(backfillWorkers, updateWorker.backfillWorker)
go updateWorker.backfillWorker.run(reorgInfo.d, updateWorker)
Expand Down
Loading

0 comments on commit 0533201

Please sign in to comment.