Skip to content

Commit

Permalink
Merge branch 'master' into pingcap#18056
Browse files Browse the repository at this point in the history
  • Loading branch information
crazycs520 authored Oct 10, 2020
2 parents c1f9e6c + cacc3db commit 0aad9ac
Show file tree
Hide file tree
Showing 120 changed files with 3,575 additions and 1,796 deletions.
5 changes: 2 additions & 3 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
# Contribution Guide
# Contributing Guide

See the [Contribution Guide](https://github.com/pingcap/community/blob/master/CONTRIBUTING.md) in the
[community](https://github.com/pingcap/community) repo.
See the [Contributing Guide](https://github.com/pingcap/community/blob/master/contributors/README.md) in the [community](https://github.com/pingcap/community) repository.
5 changes: 5 additions & 0 deletions cmd/explaintest/r/explain.result
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,8 @@ StreamAgg_8 8000.00 root group by:Column#7, funcs:group_concat(Column#5, Column
└─TableReader_15 10000.00 root data:TableFullScan_14
└─TableFullScan_14 10000.00 cop[tikv] table:t keep order:true, stats:pseudo
drop table t;
drop view if exists v;
create view v as select cast(replace(substring_index(substring_index("",',',1),':',-1),'"','') as CHAR(32)) as event_id;
desc v;
Field Type Null Key Default Extra
event_id varchar(32) YES NULL
21 changes: 11 additions & 10 deletions cmd/explaintest/r/explain_easy.result
Original file line number Diff line number Diff line change
Expand Up @@ -556,28 +556,29 @@ Projection_4 10.00 root plus(1, test.t.nb)->Column#5
explain select * from t ta left outer join t tb on ta.nb = tb.nb and ta.a > 1 where ifnull(ta.nb, 1) or ta.nb is null;
id estRows task access object operator info
HashJoin_7 8320.83 root left outer join, equal:[eq(test.t.nb, test.t.nb)], left cond:[gt(test.t.a, 1)]
├─TableReader_13(Build) 10000.00 root data:TableFullScan_12
│ └─TableFullScan_12 10000.00 cop[tikv] table:tb keep order:false, stats:pseudo
├─TableReader_14(Build) 6656.67 root data:Selection_13
│ └─Selection_13 6656.67 cop[tikv] or(test.t.nb, 0)
│ └─TableFullScan_12 10000.00 cop[tikv] table:tb keep order:false, stats:pseudo
└─TableReader_11(Probe) 6656.67 root data:Selection_10
└─Selection_10 6656.67 cop[tikv] or(test.t.nb, isnull(test.t.nb))
└─Selection_10 6656.67 cop[tikv] or(test.t.nb, 0)
└─TableFullScan_9 10000.00 cop[tikv] table:ta keep order:false, stats:pseudo
explain select * from t ta right outer join t tb on ta.nb = tb.nb and ta.a > 1 where ifnull(tb.nb, 1) or tb.nb is null;
id estRows task access object operator info
HashJoin_7 6656.67 root right outer join, equal:[eq(test.t.nb, test.t.nb)]
├─TableReader_11(Build) 3333.33 root data:Selection_10
│ └─Selection_10 3333.33 cop[tikv] gt(test.t.a, 1)
├─TableReader_11(Build) 2218.89 root data:Selection_10
│ └─Selection_10 2218.89 cop[tikv] gt(test.t.a, 1), or(test.t.nb, 0)
│ └─TableFullScan_9 10000.00 cop[tikv] table:ta keep order:false, stats:pseudo
└─TableReader_14(Probe) 6656.67 root data:Selection_13
└─Selection_13 6656.67 cop[tikv] or(test.t.nb, isnull(test.t.nb))
└─Selection_13 6656.67 cop[tikv] or(test.t.nb, 0)
└─TableFullScan_12 10000.00 cop[tikv] table:tb keep order:false, stats:pseudo
explain select * from t ta inner join t tb on ta.nb = tb.nb and ta.a > 1 where ifnull(tb.nb, 1) or tb.nb is null;
id estRows task access object operator info
HashJoin_9 4166.67 root inner join, equal:[eq(test.t.nb, test.t.nb)]
├─TableReader_12(Build) 3333.33 root data:Selection_11
│ └─Selection_11 3333.33 cop[tikv] gt(test.t.a, 1)
HashJoin_9 2773.61 root inner join, equal:[eq(test.t.nb, test.t.nb)]
├─TableReader_12(Build) 2218.89 root data:Selection_11
│ └─Selection_11 2218.89 cop[tikv] gt(test.t.a, 1), or(test.t.nb, 0)
│ └─TableFullScan_10 10000.00 cop[tikv] table:ta keep order:false, stats:pseudo
└─TableReader_15(Probe) 6656.67 root data:Selection_14
└─Selection_14 6656.67 cop[tikv] or(test.t.nb, isnull(test.t.nb))
└─Selection_14 6656.67 cop[tikv] or(test.t.nb, 0)
└─TableFullScan_13 10000.00 cop[tikv] table:tb keep order:false, stats:pseudo
explain select ifnull(t.nc, 1) in (select count(*) from t s , t t1 where s.a = t.a and s.a = t1.a) from t;
id estRows task access object operator info
Expand Down
4 changes: 2 additions & 2 deletions cmd/explaintest/r/partition_pruning.result
Original file line number Diff line number Diff line change
Expand Up @@ -2333,8 +2333,8 @@ TableReader_7 250.00 root partition:all data:Selection_6
└─TableFullScan_5 10000.00 cop[tikv] table:t1 keep order:false, stats:pseudo
explain select * from t1 where a>=0 and a <= 0xFFFFFFFFFFFFFFFF;
id estRows task access object operator info
TableReader_7 250.00 root partition:all data:Selection_6
└─Selection_6 250.00 cop[tikv] ge(test.t1.a, 0), le(test.t1.a, 18446744073709551615)
TableReader_7 3323.33 root partition:all data:Selection_6
└─Selection_6 3323.33 cop[tikv] le(test.t1.a, 18446744073709551615)
└─TableFullScan_5 10000.00 cop[tikv] table:t1 keep order:false, stats:pseudo
drop table t1;
create table t1 (a bigint) partition by range(a+0) (
Expand Down
4 changes: 4 additions & 0 deletions cmd/explaintest/t/explain.test
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,7 @@ set session tidb_hashagg_final_concurrency = 1;
explain select group_concat(a) from t group by id;
explain select group_concat(a, b) from t group by id;
drop table t;

drop view if exists v;
create view v as select cast(replace(substring_index(substring_index("",',',1),':',-1),'"','') as CHAR(32)) as event_id;
desc v;
27 changes: 19 additions & 8 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,11 +494,8 @@ type TiKVClient struct {
// and if no activity is seen even after that the connection is closed.
GrpcKeepAliveTimeout uint `toml:"grpc-keepalive-timeout" json:"grpc-keepalive-timeout"`
// CommitTimeout is the max time which command 'commit' will wait.
CommitTimeout string `toml:"commit-timeout" json:"commit-timeout"`
// EnableAsyncCommit enables async commit for all transactions.
EnableAsyncCommit bool `toml:"enable-async-commit" json:"enable-async-commit"`
AsyncCommitKeysLimit uint `toml:"async-commit-keys-limit" json:"async-commit-keys-limit"`

CommitTimeout string `toml:"commit-timeout" json:"commit-timeout"`
AsyncCommit AsyncCommit `toml:"async-commit" json:"async-commit"`
// MaxBatchSize is the max batch size when calling batch commands API.
MaxBatchSize uint `toml:"max-batch-size" json:"max-batch-size"`
// If TiKV load is greater than this, TiDB will wait for a while to avoid little batch.
Expand All @@ -522,6 +519,16 @@ type TiKVClient struct {
TTLRefreshedTxnSize int64 `toml:"ttl-refreshed-txn-size" json:"ttl-refreshed-txn-size"`
}

// AsyncCommit is the config for the async commit feature.
type AsyncCommit struct {
// Whether to enable the async commit feature.
Enable bool `toml:"enable" json:"enable"`
// Use async commit only if the number of keys does not exceed KeysLimit.
KeysLimit uint `toml:"keys-limit" json:"keys-limit"`
// Use async commit only if the total size of keys does not exceed TotalKeySizeLimit.
TotalKeySizeLimit uint64 `toml:"total-key-size-limit" json:"total-key-size-limit"`
}

// CoprocessorCache is the config for coprocessor cache.
type CoprocessorCache struct {
// Whether to enable the copr cache. The copr cache saves the result from TiKV Coprocessor in the memory and
Expand Down Expand Up @@ -687,8 +694,12 @@ var defaultConf = Config{
GrpcKeepAliveTime: 10,
GrpcKeepAliveTimeout: 3,
CommitTimeout: "41s",
EnableAsyncCommit: false,
AsyncCommitKeysLimit: 256,
AsyncCommit: AsyncCommit{
Enable: false,
// FIXME: Find an appropriate default limit.
KeysLimit: 256,
TotalKeySizeLimit: 4 * 1024, // 4 KiB
},

MaxBatchSize: 128,
OverloadThreshold: 200,
Expand Down Expand Up @@ -871,7 +882,7 @@ func (c *Config) Valid() error {
if c.Security.SkipGrantTable && !hasRootPrivilege() {
return fmt.Errorf("TiDB run with skip-grant-table need root privilege")
}
if _, ok := ValidStorage[c.Store]; !ok {
if !ValidStorage[c.Store] {
nameList := make([]string, 0, len(ValidStorage))
for k, v := range ValidStorage {
if v {
Expand Down
16 changes: 10 additions & 6 deletions config/config.toml.example
Original file line number Diff line number Diff line change
Expand Up @@ -356,12 +356,6 @@ grpc-keepalive-timeout = 3
# Max time for commit command, must be twice bigger than raft election timeout.
commit-timeout = "41s"

# Enable async commit for all transactions.
enable-async-commit = false
# The maximum allowed keys in a async commit transaction. Transactions with more keys than the limit
# will be committed with normal 2PC way.
async-commit-keys-limit = 256

# Max batch size in gRPC.
max-batch-size = 128
# Overload threshold of TiKV.
Expand Down Expand Up @@ -390,6 +384,16 @@ store-liveness-timeout = "5s"
# If the size(in byte) of a transaction is large than `ttl-refreshed-txn-size`, it update the lock TTL during the 2PC.
ttl-refreshed-txn-size = 33554432

[tikv-client.async-commit]
# Whether to enable the async commit feature. This feature reduces the latency of the two-phase commit.
enable = false
# The maximum allowed keys in an async commit transaction. Transactions with more keys than the limit
# will be committed with normal 2PC way.
keys-limit = 256
# The maximum length total of keys in bytes. Transactions will be committed with the normal 2PC way
# if the limit is exceeded.
total-key-size-limit = 4096

[tikv-client.copr-cache]
# Whether to enable the copr cache. The copr cache saves the result from TiKV Coprocessor in the memory and
# reuses the result when corresponding data in TiKV is unchanged, on a region basis.
Expand Down
11 changes: 7 additions & 4 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,12 +196,14 @@ deprecate-integer-display-length = true
txn-total-size-limit=2000
[tikv-client]
commit-timeout="41s"
enable-async-commit=true
async-commit-keys-limit=123
max-batch-size=128
region-cache-ttl=6000
store-limit=0
ttl-refreshed-txn-size=8192
[tikv-client.async-commit]
enable=true
keys-limit=123
total-key-size-limit=1024
[stmt-summary]
enable=false
enable-internal-query=true
Expand Down Expand Up @@ -235,8 +237,9 @@ spilled-file-encryption-method = "plaintext"
c.Assert(conf.AlterPrimaryKey, Equals, true)

c.Assert(conf.TiKVClient.CommitTimeout, Equals, "41s")
c.Assert(conf.TiKVClient.EnableAsyncCommit, Equals, true)
c.Assert(conf.TiKVClient.AsyncCommitKeysLimit, Equals, uint(123))
c.Assert(conf.TiKVClient.AsyncCommit.Enable, Equals, true)
c.Assert(conf.TiKVClient.AsyncCommit.KeysLimit, Equals, uint(123))
c.Assert(conf.TiKVClient.AsyncCommit.TotalKeySizeLimit, Equals, uint64(1024))
c.Assert(conf.TiKVClient.MaxBatchSize, Equals, uint(128))
c.Assert(conf.TiKVClient.RegionCacheTTL, Equals, uint(6000))
c.Assert(conf.TiKVClient.StoreLimit, Equals, int64(0))
Expand Down
45 changes: 38 additions & 7 deletions ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/terror"
ddlutil "github.com/pingcap/tidb/ddl/util"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/kv"
Expand Down Expand Up @@ -71,10 +72,12 @@ type backfillResult struct {
// backfillTaskContext is the context of the batch adding indices or updating column values.
// After finishing the batch adding indices or updating column values, result in backfillTaskContext will be merged into backfillResult.
type backfillTaskContext struct {
nextHandle kv.Handle
done bool
addedCount int
scanCount int
nextHandle kv.Handle
done bool
addedCount int
scanCount int
warnings map[errors.ErrorID]*terror.Error
warningsCount map[errors.ErrorID]int64
}

type backfillWorker struct {
Expand Down Expand Up @@ -150,10 +153,26 @@ func mergeBackfillCtxToResult(taskCtx *backfillTaskContext, result *backfillResu
result.scanCount += taskCtx.scanCount
}

func mergeWarningsAndWarningsCount(partWarnings, totalWarnings map[errors.ErrorID]*terror.Error, partWarningsCount, totalWarningsCount map[errors.ErrorID]int64) (map[errors.ErrorID]*terror.Error, map[errors.ErrorID]int64) {
for _, warn := range partWarnings {
if _, ok := totalWarningsCount[warn.ID()]; ok {
totalWarningsCount[warn.ID()] += partWarningsCount[warn.ID()]
} else {
totalWarningsCount[warn.ID()] = partWarningsCount[warn.ID()]
totalWarnings[warn.ID()] = warn
}
}
return totalWarnings, totalWarningsCount
}

// handleBackfillTask backfills range [task.startHandle, task.endHandle) handle's index to table.
func (w *backfillWorker) handleBackfillTask(d *ddlCtx, task *reorgBackfillTask, bf backfiller) *backfillResult {
handleRange := *task
result := &backfillResult{addedCount: 0, nextHandle: handleRange.startHandle, err: nil}
result := &backfillResult{
err: nil,
addedCount: 0,
nextHandle: handleRange.startHandle,
}
lastLogCount := 0
lastLogTime := time.Now()
startTime := lastLogTime
Expand All @@ -177,7 +196,17 @@ func (w *backfillWorker) handleBackfillTask(d *ddlCtx, task *reorgBackfillTask,

bf.AddMetricInfo(float64(taskCtx.addedCount))
mergeBackfillCtxToResult(&taskCtx, result)

// Although `handleRange` is for data in one region, but back fill worker still split it into many
// small reorg batch size slices and reorg them in many different kv txn.
// If a task failed, it may contained some committed small kv txn which has already finished the
// small range reorganization.
// In the next round of reorganization, the target handle range may overlap with last committed
// small ranges. This will cause the `redo` action in reorganization.
// So for added count and warnings collection, it is recommended to collect the statistics in every
// successfully committed small ranges rather than fetching it in the total result.
w.ddlWorker.reorgCtx.increaseRowCount(int64(taskCtx.addedCount))
w.ddlWorker.reorgCtx.mergeWarnings(taskCtx.warnings, taskCtx.warningsCount)

if num := result.scanCount - lastLogCount; num >= 30000 {
lastLogCount = result.scanCount
Expand Down Expand Up @@ -386,6 +415,8 @@ var (
TestCheckWorkerNumCh = make(chan struct{})
// TestCheckWorkerNumber use for test adjust backfill worker.
TestCheckWorkerNumber = int32(16)
// TestCheckReorgTimeout is used to mock timeout when reorg data.
TestCheckReorgTimeout = int32(0)
)

func loadDDLReorgVars(w *worker) error {
Expand Down Expand Up @@ -482,12 +513,12 @@ func (w *worker) writePhysicalTableRecord(t table.PhysicalTable, bfWorkerType ba
sessCtx.GetSessionVars().StmtCtx.IsDDLJobInQueue = true

if bfWorkerType == typeAddIndexWorker {
idxWorker := newAddIndexWorker(sessCtx, w, i, t, indexInfo, decodeColMap)
idxWorker := newAddIndexWorker(sessCtx, w, i, t, indexInfo, decodeColMap, reorgInfo.ReorgMeta.SQLMode)
idxWorker.priority = job.Priority
backfillWorkers = append(backfillWorkers, idxWorker.backfillWorker)
go idxWorker.backfillWorker.run(reorgInfo.d, idxWorker)
} else {
updateWorker := newUpdateColumnWorker(sessCtx, w, i, t, oldColInfo, colInfo, decodeColMap)
updateWorker := newUpdateColumnWorker(sessCtx, w, i, t, oldColInfo, colInfo, decodeColMap, reorgInfo.ReorgMeta.SQLMode)
updateWorker.priority = job.Priority
backfillWorkers = append(backfillWorkers, updateWorker.backfillWorker)
go updateWorker.backfillWorker.run(reorgInfo.d, updateWorker)
Expand Down
Loading

0 comments on commit 0aad9ac

Please sign in to comment.