diff --git a/bindinfo/bind_record.go b/bindinfo/bind_record.go index 6395bbaa278ba..50e8e0ba20784 100644 --- a/bindinfo/bind_record.go +++ b/bindinfo/bind_record.go @@ -115,6 +115,17 @@ type BindRecord struct { Bindings []Binding } +// Copy get the copy of bindRecord +func (br *BindRecord) Copy() *BindRecord { + nbr := &BindRecord{ + OriginalSQL: br.OriginalSQL, + Db: br.Db, + } + nbr.Bindings = make([]Binding, len(br.Bindings)) + copy(nbr.Bindings, br.Bindings) + return nbr +} + // HasEnabledBinding checks if there are any enabled bindings in bind record. func (br *BindRecord) HasEnabledBinding() bool { for _, binding := range br.Bindings { diff --git a/br/pkg/lightning/config/config.go b/br/pkg/lightning/config/config.go index b5236127f5ee1..d14f12066c0f4 100644 --- a/br/pkg/lightning/config/config.go +++ b/br/pkg/lightning/config/config.go @@ -343,32 +343,64 @@ type MaxError struct { // In TiDB backend, this also includes all possible SQL errors raised from INSERT, // such as unique key conflict when `on-duplicate` is set to `error`. // When tolerated, the row causing the error will be skipped, and adds 1 to the counter. + // The default value is zero, which means that such errors are not tolerated. Type atomic.Int64 `toml:"type" json:"type"` // Conflict is the maximum number of unique key conflicts in local backend accepted. // When tolerated, every pair of conflict adds 1 to the counter. // Those pairs will NOT be deleted from the target. Conflict resolution is performed separately. - // TODO Currently this is hard-coded to infinity. - Conflict atomic.Int64 `toml:"conflict" json:"-"` + // The default value is max int64, which means conflict errors will be recorded as much as possible. + // Sometime the actual number of conflict record logged will be greater than the value configured here, + // because conflict error data are recorded batch by batch. + // If the limit is reached in a single batch, the entire batch of records will be persisted before an error is reported. + Conflict atomic.Int64 `toml:"conflict" json:"conflict"` } func (cfg *MaxError) UnmarshalTOML(v interface{}) error { + defaultValMap := map[string]int64{ + "syntax": 0, + "charset": math.MaxInt64, + "type": 0, + "conflict": math.MaxInt64, + } + // set default value first + cfg.Syntax.Store(defaultValMap["syntax"]) + cfg.Charset.Store(defaultValMap["charset"]) + cfg.Type.Store(defaultValMap["type"]) + cfg.Conflict.Store(defaultValMap["conflict"]) switch val := v.(type) { case int64: // ignore val that is smaller than 0 - if val < 0 { - val = 0 + if val >= 0 { + // only set type error + cfg.Type.Store(val) } - cfg.Syntax.Store(0) - cfg.Charset.Store(math.MaxInt64) - cfg.Type.Store(val) - cfg.Conflict.Store(math.MaxInt64) return nil case map[string]interface{}: - // TODO support stuff like `max-error = { charset = 1000, type = 1000 }` if proved useful. + // support stuff like `max-error = { charset = 1000, type = 1000 }`. + getVal := func(k string, v interface{}) int64 { + defaultVal, ok := defaultValMap[k] + if !ok { + return 0 + } + iVal, ok := v.(int64) + if !ok || iVal < 0 { + return defaultVal + } + return iVal + } + for k, v := range val { + switch k { + case "type": + cfg.Type.Store(getVal(k, v)) + case "conflict": + cfg.Conflict.Store(getVal(k, v)) + } + } + return nil default: + return errors.Errorf("invalid max-error '%v', should be an integer or a map of string:int64", v) } - return errors.Errorf("invalid max-error '%v', should be an integer", v) } // DuplicateResolutionAlgorithm is the config type of how to resolve duplicates. @@ -805,8 +837,16 @@ func (cfg *Config) LoadFromTOML(data []byte) error { unusedGlobalKeyStrs[key.String()] = struct{}{} } +iterateUnusedKeys: for _, key := range unusedConfigKeys { keyStr := key.String() + switch keyStr { + // these keys are not counted as decoded by toml decoder, but actually they are decoded, + // because the corresponding unmarshal logic handles these key's decoding in a custom way + case "lightning.max-error.type", + "lightning.max-error.conflict": + continue iterateUnusedKeys + } if _, found := unusedGlobalKeyStrs[keyStr]; found { bothUnused = append(bothUnused, keyStr) } else { diff --git a/br/pkg/lightning/config/config_test.go b/br/pkg/lightning/config/config_test.go index 16db98845e80c..f590391740ec4 100644 --- a/br/pkg/lightning/config/config_test.go +++ b/br/pkg/lightning/config/config_test.go @@ -19,6 +19,7 @@ import ( "context" "flag" "fmt" + "math" "net" "net/http" "net/http/httptest" @@ -561,6 +562,126 @@ func TestDurationUnmarshal(t *testing.T) { require.Regexp(t, "time: unknown unit .?x.? in duration .?13x20s.?", err.Error()) } +func TestMaxErrorUnmarshal(t *testing.T) { + type testCase struct { + TOMLStr string + ExpectedValues map[string]int64 + ExpectErrStr string + CaseName string + } + for _, tc := range []*testCase{ + { + TOMLStr: `max-error = 123`, + ExpectedValues: map[string]int64{ + "syntax": 0, + "charset": math.MaxInt64, + "type": 123, + "conflict": math.MaxInt64, + }, + CaseName: "Normal_Int", + }, + { + TOMLStr: `max-error = -123`, + ExpectedValues: map[string]int64{ + "syntax": 0, + "charset": math.MaxInt64, + "type": 0, + "conflict": math.MaxInt64, + }, + CaseName: "Abnormal_Negative_Int", + }, + { + TOMLStr: `max-error = "abcde"`, + ExpectErrStr: "invalid max-error 'abcde', should be an integer or a map of string:int64", + CaseName: "Abnormal_String", + }, + { + TOMLStr: `[max-error] +syntax = 1 +charset = 2 +type = 3 +conflict = 4 +`, + ExpectedValues: map[string]int64{ + "syntax": 0, + "charset": math.MaxInt64, + "type": 3, + "conflict": 4, + }, + CaseName: "Normal_Map_All_Set", + }, + { + TOMLStr: `[max-error] +conflict = 1000 +`, + ExpectedValues: map[string]int64{ + "syntax": 0, + "charset": math.MaxInt64, + "type": 0, + "conflict": 1000, + }, + CaseName: "Normal_Map_Partial_Set", + }, + { + TOMLStr: `max-error = { conflict = 1000, type = 123 }`, + ExpectedValues: map[string]int64{ + "syntax": 0, + "charset": math.MaxInt64, + "type": 123, + "conflict": 1000, + }, + CaseName: "Normal_OneLineMap_Partial_Set", + }, + { + TOMLStr: `[max-error] +conflict = 1000 +not_exist = 123 +`, + ExpectedValues: map[string]int64{ + "syntax": 0, + "charset": math.MaxInt64, + "type": 0, + "conflict": 1000, + }, + CaseName: "Normal_Map_Partial_Set_Invalid_Key", + }, + { + TOMLStr: `[max-error] +conflict = 1000 +type = -123 +`, + ExpectedValues: map[string]int64{ + "syntax": 0, + "charset": math.MaxInt64, + "type": 0, + "conflict": 1000, + }, + CaseName: "Normal_Map_Partial_Set_Invalid_Value", + }, + { + TOMLStr: `[max-error] +conflict = 1000 +type = abc +`, + ExpectErrStr: `toml: line 3 (last key "max-error.type"): expected value but found "abc" instead`, + CaseName: "Normal_Map_Partial_Set_Invalid_ValueType", + }, + } { + targetLightningCfg := new(config.Lightning) + err := toml.Unmarshal([]byte(tc.TOMLStr), targetLightningCfg) + if len(tc.ExpectErrStr) > 0 { + require.Errorf(t, err, "test case: %s", tc.CaseName) + require.Equalf(t, tc.ExpectErrStr, err.Error(), "test case: %s", tc.CaseName) + } else { + require.NoErrorf(t, err, "test case: %s", tc.CaseName) + require.Equalf(t, tc.ExpectedValues["syntax"], targetLightningCfg.MaxError.Syntax.Load(), "test case: %s", tc.CaseName) + require.Equalf(t, tc.ExpectedValues["charset"], targetLightningCfg.MaxError.Charset.Load(), "test case: %s", tc.CaseName) + require.Equalf(t, tc.ExpectedValues["type"], targetLightningCfg.MaxError.Type.Load(), "test case: %s", tc.CaseName) + require.Equalf(t, tc.ExpectedValues["conflict"], targetLightningCfg.MaxError.Conflict.Load(), "test case: %s", tc.CaseName) + } + } +} + func TestDurationMarshalJSON(t *testing.T) { duration := config.Duration{} err := duration.UnmarshalText([]byte("13m20s")) diff --git a/br/pkg/lightning/errormanager/errormanager.go b/br/pkg/lightning/errormanager/errormanager.go index 373ba572779d4..4085226063d38 100644 --- a/br/pkg/lightning/errormanager/errormanager.go +++ b/br/pkg/lightning/errormanager/errormanager.go @@ -194,7 +194,8 @@ func (em *ErrorManager) RecordTypeError( if em.remainingError.Type.Dec() < 0 { threshold := em.configError.Type.Load() if threshold > 0 { - encodeErr = errors.Annotatef(encodeErr, "meet errors exceed the max-error.type threshold '%d'", + encodeErr = errors.Annotatef(encodeErr, + "The number of type errors exceeds the threshold configured by `max-error.type`: '%d'", em.configError.Type.Load()) } return encodeErr @@ -241,17 +242,20 @@ func (em *ErrorManager) RecordDataConflictError( tableName string, conflictInfos []DataConflictInfo, ) error { + var gerr error if len(conflictInfos) == 0 { return nil } if em.remainingError.Conflict.Sub(int64(len(conflictInfos))) < 0 { threshold := em.configError.Conflict.Load() - return errors.Errorf(" meet errors exceed the max-error.conflict threshold '%d'", threshold) + // Still need to record this batch of conflict records, and then return this error at last. + // Otherwise, if the max-error.conflict is set a very small value, non of the conflict errors will be recorded + gerr = errors.Errorf("The number of conflict errors exceeds the threshold configured by `max-error.conflict`: '%d'", threshold) } if em.db == nil { - return nil + return gerr } exec := common.SQLWithRetry{ @@ -259,7 +263,7 @@ func (em *ErrorManager) RecordDataConflictError( Logger: logger, HideQueryLog: redact.NeedRedact(), } - return exec.Transact(ctx, "insert data conflict error record", func(c context.Context, txn *sql.Tx) error { + if err := exec.Transact(ctx, "insert data conflict error record", func(c context.Context, txn *sql.Tx) error { sb := &strings.Builder{} fmt.Fprintf(sb, insertIntoConflictErrorData, em.schemaEscaped) var sqlArgs []interface{} @@ -279,7 +283,10 @@ func (em *ErrorManager) RecordDataConflictError( } _, err := txn.ExecContext(c, sb.String(), sqlArgs...) return err - }) + }); err != nil { + gerr = err + } + return gerr } func (em *ErrorManager) RecordIndexConflictError( @@ -290,17 +297,20 @@ func (em *ErrorManager) RecordIndexConflictError( conflictInfos []DataConflictInfo, rawHandles, rawRows [][]byte, ) error { + var gerr error if len(conflictInfos) == 0 { return nil } if em.remainingError.Conflict.Sub(int64(len(conflictInfos))) < 0 { threshold := em.configError.Conflict.Load() - return errors.Errorf(" meet errors exceed the max-error.conflict threshold %d", threshold) + // Still need to record this batch of conflict records, and then return this error at last. + // Otherwise, if the max-error.conflict is set a very small value, non of the conflict errors will be recorded + gerr = errors.Errorf("The number of conflict errors exceeds the threshold configured by `max-error.conflict`: '%d'", threshold) } if em.db == nil { - return nil + return gerr } exec := common.SQLWithRetry{ @@ -308,7 +318,7 @@ func (em *ErrorManager) RecordIndexConflictError( Logger: logger, HideQueryLog: redact.NeedRedact(), } - return exec.Transact(ctx, "insert index conflict error record", func(c context.Context, txn *sql.Tx) error { + if err := exec.Transact(ctx, "insert index conflict error record", func(c context.Context, txn *sql.Tx) error { sb := &strings.Builder{} fmt.Fprintf(sb, insertIntoConflictErrorIndex, em.schemaEscaped) var sqlArgs []interface{} @@ -331,7 +341,10 @@ func (em *ErrorManager) RecordIndexConflictError( } _, err := txn.ExecContext(c, sb.String(), sqlArgs...) return err - }) + }); err != nil { + gerr = err + } + return gerr } // ResolveAllConflictKeys query all conflicting rows (handle and their diff --git a/br/pkg/utils/json.go b/br/pkg/utils/json.go index 736032b4beedc..a29725db6b1b3 100644 --- a/br/pkg/utils/json.go +++ b/br/pkg/utils/json.go @@ -114,6 +114,10 @@ func makeJSONSchema(schema *backuppb.Schema) (*jsonSchema, error) { func fromJSONSchema(jSchema *jsonSchema) (*backuppb.Schema, error) { schema := jSchema.Schema + if schema == nil { + schema = &backuppb.Schema{} + } + var err error schema.Db, err = json.Marshal(jSchema.DB) if err != nil { diff --git a/br/pkg/utils/json_test.go b/br/pkg/utils/json_test.go index 8d8eeb6457332..3f03f287d92f1 100644 --- a/br/pkg/utils/json_test.go +++ b/br/pkg/utils/json_test.go @@ -204,6 +204,44 @@ var testMetaJSONs = [][]byte{ "is_raw_kv": true, "br_version": "BR\nRelease Version: v5.0.0-master\nGit Commit Hash: c0d60dae4998cf9ac40f02e5444731c15f0b2522\nGit Branch: HEAD\nGo Version: go1.13.4\nUTC Build Time: 2021-03-25 08:10:08\nRace Enabled: false" }`), + []byte(`{ + "files": [ + { + "sha256": "3ae857ef9b379d498ae913434f1d47c3e90a55f3a4cd9074950bfbd163d5e5fc", + "start_key": "7480000000000000115f720000000000000000", + "end_key": "7480000000000000115f72ffffffffffffffff00", + "name": "1_20_9_36adb8cedcd7af34708edff520499e712e2cfdcb202f5707dc9305a031d55a98_1675066275424_write.sst", + "end_version": 439108573623222300, + "crc64xor": 16261462091570213000, + "total_kvs": 15, + "total_bytes": 1679, + "cf": "write", + "size": 2514, + "cipher_iv": "56MTbxA4CaNILpirKnBxUw==" + } + ], + "schemas": [ + { + "db": { + "charset": "utf8mb4", + "collate": "utf8mb4_bin", + "db_name": { + "L": "test", + "O": "test" + }, + "id": 1, + "policy_ref_info": null, + "state": 5 + } + } + ], + "ddls": [], + "cluster_id": 7194351714070942000, + "cluster_version": "\"6.1.0\"\n", + "br_version": "BR\nRelease Version: v6.1.0\nGit Commit Hash: 1a89decdb192cbdce6a7b0020d71128bc964d30f\nGit Branch: heads/refs/tags/v6.1.0\nGo Version: go1.18.2\nUTC Build Time: 2022-06-05 05:09:12\nRace Enabled: false", + "end_version": 439108573623222300, + "new_collations_enabled": "True" + }`), } func TestEncodeAndDecode(t *testing.T) { diff --git a/br/tests/lightning_config_max_error/data/mytest.testtbl-schema.sql b/br/tests/lightning_config_max_error/data/mytest.testtbl-schema.sql new file mode 100644 index 0000000000000..93582d5178139 --- /dev/null +++ b/br/tests/lightning_config_max_error/data/mytest.testtbl-schema.sql @@ -0,0 +1,5 @@ +CREATE TABLE testtbl ( + id INTEGER PRIMARY KEY, + val1 VARCHAR(40) NOT NULL, + INDEX `idx_val1` (`val1`) +); diff --git a/br/tests/lightning_config_max_error/data/mytest.testtbl.csv b/br/tests/lightning_config_max_error/data/mytest.testtbl.csv new file mode 100644 index 0000000000000..021f6bbf7be1c --- /dev/null +++ b/br/tests/lightning_config_max_error/data/mytest.testtbl.csv @@ -0,0 +1,16 @@ +id,val1 +1,"aaa01" +2,"aaa01" +3,"aaa02" +4,"aaa02" +5,"aaa05" +6,"aaa06" +7,"aaa07" +8,"aaa08" +9,"aaa09" +10,"aaa10" +1,"bbb01" +2,"bbb02" +3,"bbb03" +4,"bbb04" +5,"bbb05" diff --git a/br/tests/lightning_config_max_error/err_config.toml b/br/tests/lightning_config_max_error/err_config.toml new file mode 100644 index 0000000000000..79447e685a8f5 --- /dev/null +++ b/br/tests/lightning_config_max_error/err_config.toml @@ -0,0 +1,8 @@ +[lightning.max-error] +conflict = 4 + +[mydumper.csv] +header = true + +[tikv-importer] +duplicate-resolution = 'remove' diff --git a/br/tests/lightning_config_max_error/normal_config.toml b/br/tests/lightning_config_max_error/normal_config.toml new file mode 100644 index 0000000000000..92e08739fe04a --- /dev/null +++ b/br/tests/lightning_config_max_error/normal_config.toml @@ -0,0 +1,8 @@ +[lightning.max-error] +conflict = 20 + +[mydumper.csv] +header = true + +[tikv-importer] +duplicate-resolution = 'remove' diff --git a/br/tests/lightning_config_max_error/normal_config_old_style.toml b/br/tests/lightning_config_max_error/normal_config_old_style.toml new file mode 100644 index 0000000000000..fe402d071f5e0 --- /dev/null +++ b/br/tests/lightning_config_max_error/normal_config_old_style.toml @@ -0,0 +1,8 @@ +[lightning] +max-error = 0 # this actually sets the type error + +[mydumper.csv] +header = true + +[tikv-importer] +duplicate-resolution = 'remove' diff --git a/br/tests/lightning_config_max_error/run.sh b/br/tests/lightning_config_max_error/run.sh new file mode 100755 index 0000000000000..1d850ae55f0d8 --- /dev/null +++ b/br/tests/lightning_config_max_error/run.sh @@ -0,0 +1,81 @@ +#!/bin/sh +# +# Copyright 2023 PingCAP, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -eux + +check_cluster_version 4 0 0 'local backend' || exit 0 + +mydir=$(dirname "${BASH_SOURCE[0]}") + +data_file="${mydir}/data/mytest.testtbl.csv" + +total_row_count=$( sed '1d' "${data_file}" | wc -l | xargs echo ) +uniq_row_count=$( sed '1d' "${data_file}" | awk -F, '{print $1}' | sort | uniq -c | awk '{print $1}' | grep -c '1' | xargs echo ) +duplicated_row_count=$(( ${total_row_count} - ${uniq_row_count} )) + +run_sql 'DROP TABLE IF EXISTS mytest.testtbl' +run_sql 'DROP TABLE IF EXISTS lightning_task_info.conflict_error_v1' + +stderr_file="/tmp/${TEST_NAME}.stderr" + +set +e +if run_lightning --backend local --config "${mydir}/err_config.toml" 2> "${stderr_file}"; then + echo "The lightning import doesn't fail as expected" >&2 + exit 1 +fi +set -e + +err_msg=$( cat << EOF +tidb lightning encountered error: collect local duplicate rows failed: The number of conflict errors exceeds the threshold configured by \`max-error.conflict\`: '4' +EOF +) +cat "${stderr_file}" +grep -q "${err_msg}" "${stderr_file}" + +run_sql 'SELECT COUNT(*) FROM lightning_task_info.conflict_error_v1' +# Although conflict error number exceeds the max-error limit, +# all the conflict errors are recorded, +# because recording of conflict errors are executed batch by batch (batch size 1024), +# this batch of conflict errors are all recorded +check_contains "COUNT(*): ${duplicated_row_count}" + +# import a second time + +run_sql 'DROP TABLE IF EXISTS mytest.testtbl' +run_sql 'DROP TABLE IF EXISTS lightning_task_info.conflict_error_v1' + +run_lightning --backend local --config "${mydir}/normal_config.toml" + +run_sql 'SELECT COUNT(*) FROM lightning_task_info.conflict_error_v1' +check_contains "COUNT(*): ${duplicated_row_count}" + +# Check remaining records in the target table +run_sql 'SELECT COUNT(*) FROM mytest.testtbl' +check_contains "COUNT(*): ${uniq_row_count}" + +# import a third time + +run_sql 'DROP TABLE IF EXISTS mytest.testtbl' +run_sql 'DROP TABLE IF EXISTS lightning_task_info.conflict_error_v1' + +run_lightning --backend local --config "${mydir}/normal_config_old_style.toml" + +run_sql 'SELECT COUNT(*) FROM lightning_task_info.conflict_error_v1' +check_contains "COUNT(*): ${duplicated_row_count}" + +# Check remaining records in the target table +run_sql 'SELECT COUNT(*) FROM mytest.testtbl' +check_contains "COUNT(*): ${uniq_row_count}" diff --git a/ddl/cluster.go b/ddl/cluster.go index 74ac8c05ca098..32695ff01b819 100644 --- a/ddl/cluster.go +++ b/ddl/cluster.go @@ -165,7 +165,8 @@ func isFlashbackSupportedDDLAction(action model.ActionType) bool { switch action { case model.ActionSetTiFlashReplica, model.ActionUpdateTiFlashReplicaStatus, model.ActionAlterPlacementPolicy, model.ActionAlterTablePlacement, model.ActionAlterTablePartitionPlacement, model.ActionCreatePlacementPolicy, - model.ActionDropPlacementPolicy, model.ActionModifySchemaDefaultPlacement: + model.ActionDropPlacementPolicy, model.ActionModifySchemaDefaultPlacement, + model.ActionAlterTableAttributes, model.ActionAlterTablePartitionAttributes: return false default: return true diff --git a/ddl/ddl_tiflash_api.go b/ddl/ddl_tiflash_api.go index 4b8fca2a91c0f..1ade909b93ee9 100644 --- a/ddl/ddl_tiflash_api.go +++ b/ddl/ddl_tiflash_api.go @@ -424,6 +424,14 @@ func (d *ddl) refreshTiFlashTicker(ctx sessionctx.Context, pollTiFlashContext *T return err } } + + failpoint.Inject("OneTiFlashStoreDown", func() { + for storeID, store := range pollTiFlashContext.TiFlashStores { + store.Store.StateName = "Down" + pollTiFlashContext.TiFlashStores[storeID] = store + break + } + }) pollTiFlashContext.PollCounter++ // Start to process every table. diff --git a/ddl/tiflashtest/ddl_tiflash_test.go b/ddl/tiflashtest/ddl_tiflash_test.go index d1d0368138b18..c3ec3a1d2b0fb 100644 --- a/ddl/tiflashtest/ddl_tiflash_test.go +++ b/ddl/tiflashtest/ddl_tiflash_test.go @@ -1334,3 +1334,23 @@ func TestTiFlashAvailableAfterAddPartition(t *testing.T) { require.NotNil(t, pi) require.Equal(t, len(pi.Definitions), 2) } + +func TestTiFlashAvailableAfterDownOneStore(t *testing.T) { + s, teardown := createTiFlashContext(t) + defer teardown() + tk := testkit.NewTestKit(t, s.store) + + tk.MustExec("use test") + tk.MustExec("drop table if exists ddltiflash") + tk.MustExec("create table ddltiflash(z int) PARTITION BY RANGE(z) (PARTITION p0 VALUES LESS THAN (10))") + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/OneTiFlashStoreDown", `return`)) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/domain/infosync/OneTiFlashStoreDown", `return`)) + defer func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/OneTiFlashStoreDown")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/domain/infosync/OneTiFlashStoreDown")) + }() + + tk.MustExec("alter table ddltiflash set tiflash replica 1") + time.Sleep(ddl.PollTiFlashInterval * RoundToBeAvailable * 3) + CheckTableAvailable(s.dom, t, 1, []string{}) +} diff --git a/domain/historical_stats.go b/domain/historical_stats.go index 07e82bafeb58c..6d4125b75f5d7 100644 --- a/domain/historical_stats.go +++ b/domain/historical_stats.go @@ -16,10 +16,13 @@ package domain import ( "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/statistics/handle" + "github.com/pingcap/tidb/util/logutil" + "go.uber.org/zap" ) var ( @@ -35,7 +38,21 @@ type HistoricalStatsWorker struct { // SendTblToDumpHistoricalStats send tableID to worker to dump historical stats func (w *HistoricalStatsWorker) SendTblToDumpHistoricalStats(tableID int64) { - w.tblCH <- tableID + send := enableDumpHistoricalStats.Load() + failpoint.Inject("sendHistoricalStats", func(val failpoint.Value) { + if val.(bool) { + send = true + } + }) + if !send { + return + } + select { + case w.tblCH <- tableID: + return + default: + logutil.BgLogger().Warn("discard dump historical stats task", zap.Int64("table-id", tableID)) + } } // DumpHistoricalStats dump stats by given tableID diff --git a/domain/infosync/tiflash_manager.go b/domain/infosync/tiflash_manager.go index 4d01c64de002d..d5cc46f95db95 100644 --- a/domain/infosync/tiflash_manager.go +++ b/domain/infosync/tiflash_manager.go @@ -31,6 +31,7 @@ import ( "github.com/gorilla/mux" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/tidb/ddl/placement" "github.com/pingcap/tidb/store/helper" "github.com/pingcap/tidb/tablecodec" @@ -89,10 +90,19 @@ func getTiFlashPeerWithoutLagCount(tiFlashStores map[int64]helper.StoreStat, tab for _, store := range tiFlashStores { regionReplica := make(map[int64]int) err := helper.CollectTiFlashStatus(store.Store.StatusAddress, tableID, ®ionReplica) + failpoint.Inject("OneTiFlashStoreDown", func() { + if store.Store.StateName == "Down" { + err = errors.New("mock TiFlasah down") + } + }) if err != nil { logutil.BgLogger().Error("Fail to get peer status from TiFlash.", zap.Int64("tableID", tableID)) - return 0, err + // Just skip down or offline or tomestone stores, because PD will migrate regions from these stores. + if store.Store.StateName == "Up" || store.Store.StateName == "Disconnected" { + return 0, err + } + continue } flashPeerCount += len(regionReplica) } diff --git a/executor/BUILD.bazel b/executor/BUILD.bazel index 8b0e824288c99..c8881bec70917 100644 --- a/executor/BUILD.bazel +++ b/executor/BUILD.bazel @@ -192,7 +192,6 @@ go_library( "//util/servermemorylimit", "//util/set", "//util/size", - "//util/slice", "//util/sqlexec", "//util/stmtsummary", "//util/stringutil", diff --git a/executor/historical_stats_test.go b/executor/historical_stats_test.go index 0b00d3182f019..becb1e82212f8 100644 --- a/executor/historical_stats_test.go +++ b/executor/historical_stats_test.go @@ -21,6 +21,7 @@ import ( "testing" "time" + "github.com/pingcap/failpoint" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/statistics/handle" @@ -30,6 +31,8 @@ import ( ) func TestRecordHistoryStatsAfterAnalyze(t *testing.T) { + failpoint.Enable("github.com/pingcap/tidb/domain/sendHistoricalStats", "return(true)") + defer failpoint.Disable("github.com/pingcap/tidb/domain/sendHistoricalStats") store, dom := testkit.CreateMockStoreAndDomain(t) tk := testkit.NewTestKit(t, store) @@ -150,6 +153,8 @@ func TestRecordHistoryStatsMetaAfterAnalyze(t *testing.T) { } func TestGCHistoryStatsAfterDropTable(t *testing.T) { + failpoint.Enable("github.com/pingcap/tidb/domain/sendHistoricalStats", "return(true)") + defer failpoint.Disable("github.com/pingcap/tidb/domain/sendHistoricalStats") store, dom := testkit.CreateMockStoreAndDomain(t) tk := testkit.NewTestKit(t, store) tk.MustExec("set global tidb_enable_historical_stats = 1") @@ -174,6 +179,7 @@ func TestGCHistoryStatsAfterDropTable(t *testing.T) { tableInfo.Meta().ID)).Check(testkit.Rows("1")) // drop the table and gc stats tk.MustExec("drop table t") + is = dom.InfoSchema() h.GCStats(is, 0) // assert stats_history tables delete the record of dropped table @@ -183,7 +189,56 @@ func TestGCHistoryStatsAfterDropTable(t *testing.T) { tableInfo.Meta().ID)).Check(testkit.Rows("0")) } +func TestAssertHistoricalStatsAfterAlterTable(t *testing.T) { + failpoint.Enable("github.com/pingcap/tidb/domain/sendHistoricalStats", "return(true)") + defer failpoint.Disable("github.com/pingcap/tidb/domain/sendHistoricalStats") + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("set global tidb_enable_historical_stats = 1") + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t(a int, b varchar(10),c int, KEY `idx` (`c`))") + tk.MustExec("analyze table test.t") + is := dom.InfoSchema() + tableInfo, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t")) + require.NoError(t, err) + // dump historical stats + h := dom.StatsHandle() + hsWorker := dom.GetHistoricalStatsWorker() + tblID := hsWorker.GetOneHistoricalStatsTable() + err = hsWorker.DumpHistoricalStats(tblID, h) + require.Nil(t, err) + + time.Sleep(1 * time.Second) + snapshot := oracle.GoTimeToTS(time.Now()) + jsTable, err := h.DumpHistoricalStatsBySnapshot("test", tableInfo.Meta(), snapshot) + require.NoError(t, err) + require.NotNil(t, jsTable) + require.NotEqual(t, jsTable.Version, uint64(0)) + originVersion := jsTable.Version + + // assert historical stats non-change after drop column + tk.MustExec("alter table t drop column b") + h.GCStats(is, 0) + snapshot = oracle.GoTimeToTS(time.Now()) + jsTable, err = h.DumpHistoricalStatsBySnapshot("test", tableInfo.Meta(), snapshot) + require.NoError(t, err) + require.NotNil(t, jsTable) + require.Equal(t, jsTable.Version, originVersion) + + // assert historical stats non-change after drop index + tk.MustExec("alter table t drop index idx") + h.GCStats(is, 0) + snapshot = oracle.GoTimeToTS(time.Now()) + jsTable, err = h.DumpHistoricalStatsBySnapshot("test", tableInfo.Meta(), snapshot) + require.NoError(t, err) + require.NotNil(t, jsTable) + require.Equal(t, jsTable.Version, originVersion) +} + func TestGCOutdatedHistoryStats(t *testing.T) { + failpoint.Enable("github.com/pingcap/tidb/domain/sendHistoricalStats", "return(true)") + defer failpoint.Disable("github.com/pingcap/tidb/domain/sendHistoricalStats") store, dom := testkit.CreateMockStoreAndDomain(t) tk := testkit.NewTestKit(t, store) tk.MustExec("set global tidb_enable_historical_stats = 1") @@ -219,6 +274,8 @@ func TestGCOutdatedHistoryStats(t *testing.T) { } func TestPartitionTableHistoricalStats(t *testing.T) { + failpoint.Enable("github.com/pingcap/tidb/domain/sendHistoricalStats", "return(true)") + defer failpoint.Disable("github.com/pingcap/tidb/domain/sendHistoricalStats") store, dom := testkit.CreateMockStoreAndDomain(t) tk := testkit.NewTestKit(t, store) tk.MustExec("set global tidb_enable_historical_stats = 1") @@ -246,6 +303,8 @@ PARTITION p0 VALUES LESS THAN (6) } func TestDumpHistoricalStatsByTable(t *testing.T) { + failpoint.Enable("github.com/pingcap/tidb/domain/sendHistoricalStats", "return(true)") + defer failpoint.Disable("github.com/pingcap/tidb/domain/sendHistoricalStats") store, dom := testkit.CreateMockStoreAndDomain(t) tk := testkit.NewTestKit(t, store) tk.MustExec("set global tidb_enable_historical_stats = 1") diff --git a/executor/set_test.go b/executor/set_test.go index 1b2b4186bb4a3..01a2fc7979efc 100644 --- a/executor/set_test.go +++ b/executor/set_test.go @@ -645,7 +645,7 @@ func TestSetVar(t *testing.T) { tk.MustQuery("select @@tidb_enable_tso_follower_proxy").Check(testkit.Rows("0")) require.Error(t, tk.ExecToErr("set tidb_enable_tso_follower_proxy = 1")) - tk.MustQuery("select @@tidb_enable_historical_stats").Check(testkit.Rows("0")) + tk.MustQuery("select @@tidb_enable_historical_stats").Check(testkit.Rows("1")) tk.MustExec("set global tidb_enable_historical_stats = 1") tk.MustQuery("select @@tidb_enable_historical_stats").Check(testkit.Rows("1")) tk.MustExec("set global tidb_enable_historical_stats = 0") diff --git a/executor/show.go b/executor/show.go index 36a9b8485822b..130f743d914bb 100644 --- a/executor/show.go +++ b/executor/show.go @@ -69,7 +69,6 @@ import ( "github.com/pingcap/tidb/util/memory" "github.com/pingcap/tidb/util/sem" "github.com/pingcap/tidb/util/set" - "github.com/pingcap/tidb/util/slice" "github.com/pingcap/tidb/util/sqlexec" "github.com/pingcap/tidb/util/stringutil" "golang.org/x/exp/slices" @@ -317,7 +316,10 @@ func (e *ShowExec) fetchShowBind() error { } else { tmp = domain.GetDomain(e.ctx).BindHandle().GetAllBindRecord() } - bindRecords := slice.Copy(tmp) + bindRecords := make([]*bindinfo.BindRecord, 0) + for _, bindRecord := range tmp { + bindRecords = append(bindRecords, bindRecord.Copy()) + } // Remove the invalid bindRecord. ind := 0 for _, bindData := range bindRecords { diff --git a/executor/tiflashtest/tiflash_test.go b/executor/tiflashtest/tiflash_test.go index e8cd94d889188..fec246a5c5057 100644 --- a/executor/tiflashtest/tiflash_test.go +++ b/executor/tiflashtest/tiflash_test.go @@ -1270,7 +1270,7 @@ func TestDisaggregatedTiFlash(t *testing.T) { tk.MustExec("set @@session.tidb_isolation_read_engines=\"tiflash\"") err = tk.ExecToErr("select * from t;") - require.Contains(t, err.Error(), "Please check tiflash_compute node is available") + require.Contains(t, err.Error(), "tiflash_compute node is unavailable") config.UpdateGlobal(func(conf *config.Config) { conf.DisaggregatedTiFlash = false @@ -1304,9 +1304,6 @@ func TestDisaggregatedTiFlashQuery(t *testing.T) { require.NoError(t, err) tk.MustExec("set @@session.tidb_isolation_read_engines=\"tiflash\"") - needCheckTiFlashComputeNode := "false" - failpoint.Enable("github.com/pingcap/tidb/planner/core/testDisaggregatedTiFlashQuery", fmt.Sprintf("return(%s)", needCheckTiFlashComputeNode)) - defer failpoint.Disable("github.com/pingcap/tidb/planner/core/testDisaggregatedTiFlashQuery") tk.MustExec("explain select max( tbl_1.col_1 ) as r0 , sum( tbl_1.col_1 ) as r1 , sum( tbl_1.col_8 ) as r2 from tbl_1 where tbl_1.col_8 != 68 or tbl_1.col_3 between null and 939 order by r0,r1,r2;") tk.MustExec("set @@tidb_partition_prune_mode = 'static';") diff --git a/metrics/grafana/tidb.json b/metrics/grafana/tidb.json index 25042b63a8bab..f8b125746a804 100644 --- a/metrics/grafana/tidb.json +++ b/metrics/grafana/tidb.json @@ -18049,6 +18049,108 @@ "align": false, "alignLevel": null } + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "description": "The TTL task statuses in each worker", + "fieldConfig": { + "defaults": {}, + "overrides": [] + }, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 100 + }, + "hiddenSeries": false, + "id": 294, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "rightSide": true, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "nullPointMode": "null", + "options": { + "alertThreshold": true + }, + "percentage": false, + "pluginVersion": "7.5.10", + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [ + { + "alias": "running", + "color": "#5794F2" + } + ], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "exemplar": true, + "expr": "sum(tidb_server_ttl_task_status{k8s_cluster=\"$k8s_cluster\",tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}) by (type, instance)", + "interval": "", + "legendFormat": "{{ instance }} {{ type }}", + "queryType": "randomWalk", + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "TTL Task Count By Status", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": "0", + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } } ], "title": "TTL", diff --git a/metrics/metrics.go b/metrics/metrics.go index 633aa551564bc..68a2729f3483c 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -214,6 +214,7 @@ func RegisterMetrics() { prometheus.MustRegister(TTLQueryDuration) prometheus.MustRegister(TTLProcessedExpiredRowsCounter) prometheus.MustRegister(TTLJobStatus) + prometheus.MustRegister(TTLTaskStatus) prometheus.MustRegister(TTLPhaseTime) prometheus.MustRegister(EMACPUUsageGauge) diff --git a/metrics/ttl.go b/metrics/ttl.go index ab7e47e615e28..754744e93d1d8 100644 --- a/metrics/ttl.go +++ b/metrics/ttl.go @@ -43,6 +43,14 @@ var ( Help: "The jobs count in the specified status", }, []string{LblType}) + TTLTaskStatus = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "tidb", + Subsystem: "server", + Name: "ttl_task_status", + Help: "The tasks count in the specified status", + }, []string{LblType}) + TTLPhaseTime = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: "tidb", diff --git a/planner/core/BUILD.bazel b/planner/core/BUILD.bazel index 3afbdf3b8a0bc..12c2730c59364 100644 --- a/planner/core/BUILD.bazel +++ b/planner/core/BUILD.bazel @@ -107,7 +107,6 @@ go_library( "//sessiontxn/staleread", "//statistics", "//statistics/handle", - "//store/driver/backoff", "//table", "//table/tables", "//table/temptable", diff --git a/planner/core/find_best_task.go b/planner/core/find_best_task.go index 6c5d2c49fddf0..1ecc9f995243c 100644 --- a/planner/core/find_best_task.go +++ b/planner/core/find_best_task.go @@ -2015,15 +2015,16 @@ func (ds *DataSource) convertToTableScan(prop *property.PhysicalProperty, candid } // In disaggregated tiflash mode, only MPP is allowed, cop and batchCop is deprecated. // So if prop.TaskTp is RootTaskType, have to use mppTask then convert to rootTask. - isDisaggregatedTiFlashPath := config.GetGlobalConfig().DisaggregatedTiFlash && ts.StoreType == kv.TiFlash + isDisaggregatedTiFlash := config.GetGlobalConfig().DisaggregatedTiFlash + isDisaggregatedTiFlashPath := isDisaggregatedTiFlash && ts.StoreType == kv.TiFlash canMppConvertToRootForDisaggregatedTiFlash := isDisaggregatedTiFlashPath && prop.TaskTp == property.RootTaskType && ds.SCtx().GetSessionVars().IsMPPAllowed() if prop.TaskTp == property.MppTaskType || canMppConvertToRootForDisaggregatedTiFlash { if ts.KeepOrder { return invalidTask, nil } - if prop.MPPPartitionTp != property.AnyType || (ts.isPartition && !canMppConvertToRootForDisaggregatedTiFlash) { + if prop.MPPPartitionTp != property.AnyType || (ts.isPartition && !isDisaggregatedTiFlash) { // If ts is a single partition, then this partition table is in static-only prune, then we should not choose mpp execution. - // But in disaggregated tiflash mode, we can only use mpp, so we add ExchangeSender and ExchangeReceiver above TableScan for static pruning partition table. + // But in disaggregated tiflash mode, we enable using mpp for static pruning partition table, because cop and batchCop is deprecated. ds.SCtx().GetSessionVars().RaiseWarningWhenMPPEnforced("MPP mode may be blocked because table `" + ds.tableInfo.Name.O + "`is a partition table which is not supported when `@@tidb_partition_prune_mode=static`.") return invalidTask, nil } @@ -2052,7 +2053,11 @@ func (ds *DataSource) convertToTableScan(prop *property.PhysicalProperty, candid // So have to return a rootTask, but prop requires mppTask, cannot meet this requirement. task = invalidTask } else if prop.TaskTp == property.RootTaskType { - // when got here, canMppConvertToRootForDisaggregatedTiFlash is true. + // When got here, canMppConvertToRootForDisaggregatedTiFlash is true. + // This is for situations like cannot generate mppTask for some operators. + // Such as when the build side of HashJoin is Projection, + // which cannot pushdown to tiflash(because TiFlash doesn't support some expr in Proj) + // So HashJoin cannot pushdown to tiflash. But we still want TableScan to run on tiflash. task = mppTask task = task.convertToRootTask(ds.ctx) if !task.invalid() { diff --git a/planner/core/logical_plan_builder.go b/planner/core/logical_plan_builder.go index 2d73534fc2e1e..f6e566bac43a3 100644 --- a/planner/core/logical_plan_builder.go +++ b/planner/core/logical_plan_builder.go @@ -27,7 +27,6 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" - "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/expression/aggregation" @@ -50,7 +49,6 @@ import ( "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/statistics" "github.com/pingcap/tidb/statistics/handle" - "github.com/pingcap/tidb/store/driver/backoff" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/table/temptable" @@ -67,7 +65,6 @@ import ( "github.com/pingcap/tidb/util/plancodec" "github.com/pingcap/tidb/util/set" "github.com/pingcap/tidb/util/size" - "github.com/tikv/client-go/v2/tikv" ) const ( @@ -692,13 +689,6 @@ func (ds *DataSource) setPreferredStoreType(hintInfo *tableHintInfo) { ds.preferStoreType = 0 return } - if config.GetGlobalConfig().DisaggregatedTiFlash && !isTiFlashComputeNodeAvailable(ds.ctx) { - // TiFlash is in disaggregated mode, need to make sure tiflash_compute node is available. - errMsg := "No available tiflash_compute node" - warning := ErrInternal.GenWithStack(errMsg) - ds.ctx.GetSessionVars().StmtCtx.AppendWarning(warning) - return - } for _, path := range ds.possibleAccessPaths { if path.StoreType == kv.TiFlash { ds.preferStoreType |= preferTiFlash @@ -716,15 +706,6 @@ func (ds *DataSource) setPreferredStoreType(hintInfo *tableHintInfo) { } } -func isTiFlashComputeNodeAvailable(ctx sessionctx.Context) bool { - bo := backoff.NewBackofferWithVars(context.Background(), 5000, nil) - stores, err := ctx.GetStore().(tikv.Storage).GetRegionCache().GetTiFlashComputeStores(bo.TiKVBackoffer()) - if err != nil || len(stores) == 0 { - return false - } - return true -} - func resetNotNullFlag(schema *expression.Schema, start, end int) { for i := start; i < end; i++ { col := *schema.Columns[i] diff --git a/planner/core/physical_plan_test.go b/planner/core/physical_plan_test.go index 640c0f04630c1..a8a7a1918cfc0 100644 --- a/planner/core/physical_plan_test.go +++ b/planner/core/physical_plan_test.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/executor" "github.com/pingcap/tidb/infoschema" @@ -2626,3 +2627,75 @@ func TestCountStarForTiFlash(t *testing.T) { tk.MustQuery("explain format = 'brief' " + ts).Check(testkit.Rows(output[i].Plan...)) } } + +func TestHashAggPushdownToTiFlashCompute(t *testing.T) { + var ( + input []string + output []struct { + SQL string + Plan []string + Warning []string + } + ) + planSuiteData := core.GetPlanSuiteData() + planSuiteData.LoadTestCases(t, &input, &output) + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + + tk.MustExec("use test") + tk.MustExec("drop table if exists tbl_15;") + tk.MustExec(`create table tbl_15 (col_89 text (473) collate utf8mb4_bin , + col_90 timestamp default '1976-04-03' , + col_91 tinyint unsigned not null , + col_92 tinyint , + col_93 double not null , + col_94 datetime not null default '1970-06-08' , + col_95 datetime default '2028-02-13' , + col_96 int unsigned not null default 2532480521 , + col_97 char (168) default '') partition by hash (col_91) partitions 4;`) + + tk.MustExec("drop table if exists tbl_16;") + tk.MustExec(`create table tbl_16 (col_98 text (246) not null , + col_99 decimal (30 ,19) , + col_100 mediumint unsigned , + col_101 text (410) collate utf8mb4_bin , + col_102 date not null , + col_103 timestamp not null default '2003-08-27' , + col_104 text (391) not null , + col_105 date default '2010-10-24' , + col_106 text (9) not null,primary key (col_100, col_98(5), col_103), + unique key idx_23 (col_100, col_106 (3), col_101 (3))) partition by hash (col_100) partitions 2;`) + + config.UpdateGlobal(func(conf *config.Config) { + conf.DisaggregatedTiFlash = true + }) + defer config.UpdateGlobal(func(conf *config.Config) { + conf.DisaggregatedTiFlash = false + }) + + dom := domain.GetDomain(tk.Session()) + is := dom.InfoSchema() + db, exists := is.SchemaByName(model.NewCIStr("test")) + require.True(t, exists) + for _, tblInfo := range db.Tables { + tableName := tblInfo.Name.L + if tableName == "tbl_15" || tableName == "tbl_16" { + tblInfo.TiFlashReplica = &model.TiFlashReplicaInfo{ + Count: 1, + Available: true, + } + } + } + + tk.MustExec("set @@tidb_allow_mpp=1; set @@tidb_enforce_mpp=1;") + tk.MustExec("set @@tidb_partition_prune_mode = 'static';") + tk.MustExec("set @@tidb_isolation_read_engines = 'tiflash';") + + for i, ts := range input { + testdata.OnRecord(func() { + output[i].SQL = ts + output[i].Plan = testdata.ConvertRowsToStrings(tk.MustQuery("explain format = 'brief' " + ts).Rows()) + }) + tk.MustQuery("explain format = 'brief' " + ts).Check(testkit.Rows(output[i].Plan...)) + } +} diff --git a/planner/core/planbuilder.go b/planner/core/planbuilder.go index d14a6bf51ea49..5535faa97ab92 100644 --- a/planner/core/planbuilder.go +++ b/planner/core/planbuilder.go @@ -26,7 +26,6 @@ import ( "unsafe" "github.com/pingcap/errors" - "github.com/pingcap/failpoint" "github.com/pingcap/tidb/bindinfo" "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/domain" @@ -1453,8 +1452,6 @@ func filterPathByIsolationRead(ctx sessionctx.Context, paths []*util.AccessPath, isolationReadEngines := ctx.GetSessionVars().GetIsolationReadEngines() availableEngine := map[kv.StoreType]struct{}{} var availableEngineStr string - var outputComputeNodeErrMsg bool - noTiFlashComputeNode := config.GetGlobalConfig().DisaggregatedTiFlash && !isTiFlashComputeNodeAvailable(ctx) for i := len(paths) - 1; i >= 0; i-- { // availableEngineStr is for warning message. if _, ok := availableEngine[paths[i].StoreType]; !ok { @@ -1464,20 +1461,7 @@ func filterPathByIsolationRead(ctx sessionctx.Context, paths []*util.AccessPath, } availableEngineStr += paths[i].StoreType.Name() } - _, exists := isolationReadEngines[paths[i].StoreType] - // Prune this path if: - // 1. path.StoreType doesn't exists in isolationReadEngines or - // 2. TiFlash is disaggregated and the number of tiflash_compute node is zero. - shouldPruneTiFlashCompute := noTiFlashComputeNode && exists && paths[i].StoreType == kv.TiFlash - failpoint.Inject("testDisaggregatedTiFlashQuery", func(val failpoint.Value) { - // Ignore check if tiflash_compute node number. - // After we support disaggregated tiflash in test framework, can delete this failpoint. - shouldPruneTiFlashCompute = val.(bool) - }) - if shouldPruneTiFlashCompute { - outputComputeNodeErrMsg = true - } - if (!exists && paths[i].StoreType != kv.TiDB) || shouldPruneTiFlashCompute { + if _, ok := isolationReadEngines[paths[i].StoreType]; !ok && paths[i].StoreType != kv.TiDB { paths = append(paths[:i], paths[i+1:]...) } } @@ -1486,11 +1470,7 @@ func filterPathByIsolationRead(ctx sessionctx.Context, paths []*util.AccessPath, if len(paths) == 0 { helpMsg := "" if engineVals == "tiflash" { - if outputComputeNodeErrMsg { - helpMsg = ". Please check tiflash_compute node is available" - } else { - helpMsg = ". Please check tiflash replica or ensure the query is readonly" - } + helpMsg = ". Please check tiflash replica or ensure the query is readonly" } err = ErrInternal.GenWithStackByArgs(fmt.Sprintf("No access path for table '%s' is found with '%v' = '%v', valid values can be '%s'%s.", tblName.String(), variable.TiDBIsolationReadEngines, engineVals, availableEngineStr, helpMsg)) diff --git a/planner/core/rule_aggregation_push_down.go b/planner/core/rule_aggregation_push_down.go index c9326929b550f..24aef4161a8ec 100644 --- a/planner/core/rule_aggregation_push_down.go +++ b/planner/core/rule_aggregation_push_down.go @@ -405,6 +405,12 @@ func (a *aggregationPushDownSolver) pushAggCrossUnion(agg *LogicalAggregation, u if err != nil { return nil, err } + // Update mode of new generated firstRow as other agg funcs. + if len(agg.AggFuncs) != 0 { + firstRow.Mode = agg.AggFuncs[0].Mode + } else { + firstRow.Mode = aggregation.Partial1Mode + } newAgg.AggFuncs = append(newAgg.AggFuncs, firstRow) } tmpSchema := expression.NewSchema(newAgg.GetGroupByCols()...) diff --git a/planner/core/task.go b/planner/core/task.go index 5d7ca6e5fd424..ff4e22756f15a 100644 --- a/planner/core/task.go +++ b/planner/core/task.go @@ -1163,6 +1163,12 @@ func (p *PhysicalUnionAll) attach2MppTasks(tasks ...task) task { func (p *PhysicalUnionAll) attach2Task(tasks ...task) task { for _, t := range tasks { if _, ok := t.(*mppTask); ok { + if p.TP() == plancodec.TypePartitionUnion { + // In attach2MppTasks(), will attach PhysicalUnion to mppTask directly. + // But PartitionUnion cannot pushdown to tiflash, so here disable PartitionUnion pushdown to tiflash explicitly. + // For now, return invalidTask immediately, we can refine this by letting childTask of PartitionUnion convert to rootTask. + return invalidTask + } return p.attach2MppTasks(tasks...) } } diff --git a/planner/core/testdata/plan_suite_in.json b/planner/core/testdata/plan_suite_in.json index d433f5dd88dbe..6f6e74fac3cfa 100644 --- a/planner/core/testdata/plan_suite_in.json +++ b/planner/core/testdata/plan_suite_in.json @@ -1186,5 +1186,13 @@ "select a, count(*) from t group by a -- shouldn't be rewritten", "select sum(a) from t -- sum shouldn't be rewritten" ] + }, + { + "name": "TestHashAggPushdownToTiFlashCompute", + "cases": [ + "select /*+ agg_to_cop() hash_agg() */ avg( distinct tbl_15.col_96 ) as r0 , min( tbl_15.col_92 ) as r1 , sum( distinct tbl_15.col_91 ) as r2 , max( tbl_15.col_92 ) as r3 from tbl_15 where tbl_15.col_94 != '2033-01-09' and tbl_15.col_93 > 7623.679908049186 order by r0,r1,r2,r3 limit 79 ;", + "select /*+ agg_to_cop() hash_agg() */ count(1) from tbl_15 ;", + "select /*+ agg_to_cop() stream_agg() */ avg( tbl_16.col_100 ) as r0 from tbl_16 where tbl_16.col_100 in ( 10672141 ) or tbl_16.col_104 in ( 'yfEG1t!*b' ,'C1*bqx_qyO' ,'vQ^yUpKHr&j#~' ) group by tbl_16.col_100 order by r0 limit 20 ;" + ] } ] diff --git a/planner/core/testdata/plan_suite_out.json b/planner/core/testdata/plan_suite_out.json index 31964823e95f2..14213e2223dab 100644 --- a/planner/core/testdata/plan_suite_out.json +++ b/planner/core/testdata/plan_suite_out.json @@ -7501,5 +7501,104 @@ "Warning": null } ] + }, + { + "Name": "TestHashAggPushdownToTiFlashCompute", + "Cases": [ + { + "SQL": "select /*+ agg_to_cop() hash_agg() */ avg( distinct tbl_15.col_96 ) as r0 , min( tbl_15.col_92 ) as r1 , sum( distinct tbl_15.col_91 ) as r2 , max( tbl_15.col_92 ) as r3 from tbl_15 where tbl_15.col_94 != '2033-01-09' and tbl_15.col_93 > 7623.679908049186 order by r0,r1,r2,r3 limit 79 ;", + "Plan": [ + "Limit 1.00 root offset:0, count:79", + "└─Sort 1.00 root Column#11, Column#12, Column#13, Column#14", + " └─HashAgg 1.00 root funcs:avg(distinct Column#89)->Column#11, funcs:min(Column#90)->Column#12, funcs:sum(distinct Column#91)->Column#13, funcs:max(Column#92)->Column#14", + " └─Projection 7100.44 root cast(test.tbl_15.col_96, decimal(10,0) UNSIGNED BINARY)->Column#89, Column#15, cast(test.tbl_15.col_91, decimal(3,0) UNSIGNED BINARY)->Column#91, Column#16", + " └─PartitionUnion 7100.44 root ", + " ├─TableReader 1775.11 root data:ExchangeSender", + " │ └─ExchangeSender 1775.11 mpp[tiflash] ExchangeType: PassThrough", + " │ └─HashAgg 1775.11 mpp[tiflash] group by:test.tbl_15.col_91, test.tbl_15.col_96, funcs:firstrow(test.tbl_15.col_96)->test.tbl_15.col_96, funcs:min(Column#18)->Column#15, funcs:firstrow(test.tbl_15.col_91)->test.tbl_15.col_91, funcs:max(Column#20)->Column#16, funcs:firstrow(test.tbl_15.col_96)->test.tbl_15.col_96, funcs:firstrow(test.tbl_15.col_91)->test.tbl_15.col_91", + " │ └─ExchangeReceiver 1775.11 mpp[tiflash] ", + " │ └─ExchangeSender 1775.11 mpp[tiflash] ExchangeType: HashPartition, Hash Cols: [name: test.tbl_15.col_96, collate: binary], [name: test.tbl_15.col_91, collate: binary]", + " │ └─HashAgg 1775.11 mpp[tiflash] group by:test.tbl_15.col_91, test.tbl_15.col_96, funcs:min(test.tbl_15.col_92)->Column#18, funcs:max(test.tbl_15.col_92)->Column#20", + " │ └─Selection 2218.89 mpp[tiflash] gt(test.tbl_15.col_93, 7623.679908049186), ne(test.tbl_15.col_94, 2033-01-09 00:00:00.000000)", + " │ └─TableFullScan 10000.00 mpp[tiflash] table:tbl_15, partition:p0 keep order:false, stats:pseudo", + " ├─TableReader 1775.11 root data:ExchangeSender", + " │ └─ExchangeSender 1775.11 mpp[tiflash] ExchangeType: PassThrough", + " │ └─HashAgg 1775.11 mpp[tiflash] group by:test.tbl_15.col_91, test.tbl_15.col_96, funcs:firstrow(test.tbl_15.col_96)->test.tbl_15.col_96, funcs:min(Column#30)->Column#15, funcs:firstrow(test.tbl_15.col_91)->test.tbl_15.col_91, funcs:max(Column#32)->Column#16, funcs:firstrow(test.tbl_15.col_96)->test.tbl_15.col_96, funcs:firstrow(test.tbl_15.col_91)->test.tbl_15.col_91", + " │ └─ExchangeReceiver 1775.11 mpp[tiflash] ", + " │ └─ExchangeSender 1775.11 mpp[tiflash] ExchangeType: HashPartition, Hash Cols: [name: test.tbl_15.col_96, collate: binary], [name: test.tbl_15.col_91, collate: binary]", + " │ └─HashAgg 1775.11 mpp[tiflash] group by:test.tbl_15.col_91, test.tbl_15.col_96, funcs:min(test.tbl_15.col_92)->Column#30, funcs:max(test.tbl_15.col_92)->Column#32", + " │ └─Selection 2218.89 mpp[tiflash] gt(test.tbl_15.col_93, 7623.679908049186), ne(test.tbl_15.col_94, 2033-01-09 00:00:00.000000)", + " │ └─TableFullScan 10000.00 mpp[tiflash] table:tbl_15, partition:p1 keep order:false, stats:pseudo", + " ├─TableReader 1775.11 root data:ExchangeSender", + " │ └─ExchangeSender 1775.11 mpp[tiflash] ExchangeType: PassThrough", + " │ └─HashAgg 1775.11 mpp[tiflash] group by:test.tbl_15.col_91, test.tbl_15.col_96, funcs:firstrow(test.tbl_15.col_96)->test.tbl_15.col_96, funcs:min(Column#42)->Column#15, funcs:firstrow(test.tbl_15.col_91)->test.tbl_15.col_91, funcs:max(Column#44)->Column#16, funcs:firstrow(test.tbl_15.col_96)->test.tbl_15.col_96, funcs:firstrow(test.tbl_15.col_91)->test.tbl_15.col_91", + " │ └─ExchangeReceiver 1775.11 mpp[tiflash] ", + " │ └─ExchangeSender 1775.11 mpp[tiflash] ExchangeType: HashPartition, Hash Cols: [name: test.tbl_15.col_96, collate: binary], [name: test.tbl_15.col_91, collate: binary]", + " │ └─HashAgg 1775.11 mpp[tiflash] group by:test.tbl_15.col_91, test.tbl_15.col_96, funcs:min(test.tbl_15.col_92)->Column#42, funcs:max(test.tbl_15.col_92)->Column#44", + " │ └─Selection 2218.89 mpp[tiflash] gt(test.tbl_15.col_93, 7623.679908049186), ne(test.tbl_15.col_94, 2033-01-09 00:00:00.000000)", + " │ └─TableFullScan 10000.00 mpp[tiflash] table:tbl_15, partition:p2 keep order:false, stats:pseudo", + " └─TableReader 1775.11 root data:ExchangeSender", + " └─ExchangeSender 1775.11 mpp[tiflash] ExchangeType: PassThrough", + " └─HashAgg 1775.11 mpp[tiflash] group by:test.tbl_15.col_91, test.tbl_15.col_96, funcs:firstrow(test.tbl_15.col_96)->test.tbl_15.col_96, funcs:min(Column#54)->Column#15, funcs:firstrow(test.tbl_15.col_91)->test.tbl_15.col_91, funcs:max(Column#56)->Column#16, funcs:firstrow(test.tbl_15.col_96)->test.tbl_15.col_96, funcs:firstrow(test.tbl_15.col_91)->test.tbl_15.col_91", + " └─ExchangeReceiver 1775.11 mpp[tiflash] ", + " └─ExchangeSender 1775.11 mpp[tiflash] ExchangeType: HashPartition, Hash Cols: [name: test.tbl_15.col_96, collate: binary], [name: test.tbl_15.col_91, collate: binary]", + " └─HashAgg 1775.11 mpp[tiflash] group by:test.tbl_15.col_91, test.tbl_15.col_96, funcs:min(test.tbl_15.col_92)->Column#54, funcs:max(test.tbl_15.col_92)->Column#56", + " └─Selection 2218.89 mpp[tiflash] gt(test.tbl_15.col_93, 7623.679908049186), ne(test.tbl_15.col_94, 2033-01-09 00:00:00.000000)", + " └─TableFullScan 10000.00 mpp[tiflash] table:tbl_15, partition:p3 keep order:false, stats:pseudo" + ], + "Warning": null + }, + { + "SQL": "select /*+ agg_to_cop() hash_agg() */ count(1) from tbl_15 ;", + "Plan": [ + "HashAgg 1.00 root funcs:count(Column#12)->Column#11", + "└─PartitionUnion 4.00 root ", + " ├─HashAgg 1.00 root funcs:count(Column#13)->Column#12", + " │ └─TableReader 1.00 root data:ExchangeSender", + " │ └─ExchangeSender 1.00 mpp[tiflash] ExchangeType: PassThrough", + " │ └─HashAgg 1.00 mpp[tiflash] funcs:count(test.tbl_15.col_91)->Column#13", + " │ └─TableFullScan 10000.00 mpp[tiflash] table:tbl_15, partition:p0 keep order:false, stats:pseudo", + " ├─HashAgg 1.00 root funcs:count(Column#14)->Column#12", + " │ └─TableReader 1.00 root data:ExchangeSender", + " │ └─ExchangeSender 1.00 mpp[tiflash] ExchangeType: PassThrough", + " │ └─HashAgg 1.00 mpp[tiflash] funcs:count(test.tbl_15.col_91)->Column#14", + " │ └─TableFullScan 10000.00 mpp[tiflash] table:tbl_15, partition:p1 keep order:false, stats:pseudo", + " ├─HashAgg 1.00 root funcs:count(Column#15)->Column#12", + " │ └─TableReader 1.00 root data:ExchangeSender", + " │ └─ExchangeSender 1.00 mpp[tiflash] ExchangeType: PassThrough", + " │ └─HashAgg 1.00 mpp[tiflash] funcs:count(test.tbl_15.col_91)->Column#15", + " │ └─TableFullScan 10000.00 mpp[tiflash] table:tbl_15, partition:p2 keep order:false, stats:pseudo", + " └─HashAgg 1.00 root funcs:count(Column#16)->Column#12", + " └─TableReader 1.00 root data:ExchangeSender", + " └─ExchangeSender 1.00 mpp[tiflash] ExchangeType: PassThrough", + " └─HashAgg 1.00 mpp[tiflash] funcs:count(test.tbl_15.col_91)->Column#16", + " └─TableFullScan 10000.00 mpp[tiflash] table:tbl_15, partition:p3 keep order:false, stats:pseudo" + ], + "Warning": null + }, + { + "SQL": "select /*+ agg_to_cop() stream_agg() */ avg( tbl_16.col_100 ) as r0 from tbl_16 where tbl_16.col_100 in ( 10672141 ) or tbl_16.col_104 in ( 'yfEG1t!*b' ,'C1*bqx_qyO' ,'vQ^yUpKHr&j#~' ) group by tbl_16.col_100 order by r0 limit 20 ;", + "Plan": [ + "TopN 20.00 root Column#10, offset:0, count:20", + "└─HashAgg 63.95 root group by:test.tbl_16.col_100, funcs:avg(Column#11, Column#12)->Column#10", + " └─PartitionUnion 63.95 root ", + " ├─StreamAgg 31.98 root group by:Column#22, funcs:count(Column#19)->Column#11, funcs:sum(Column#20)->Column#12, funcs:firstrow(Column#21)->test.tbl_16.col_100", + " │ └─Projection 39.97 root test.tbl_16.col_100, cast(test.tbl_16.col_100, decimal(8,0) UNSIGNED BINARY)->Column#20, test.tbl_16.col_100, test.tbl_16.col_100", + " │ └─Sort 39.97 root test.tbl_16.col_100", + " │ └─TableReader 39.97 root data:ExchangeSender", + " │ └─ExchangeSender 39.97 mpp[tiflash] ExchangeType: PassThrough", + " │ └─Selection 39.97 mpp[tiflash] or(eq(test.tbl_16.col_100, 10672141), in(test.tbl_16.col_104, \"yfEG1t!*b\", \"C1*bqx_qyO\", \"vQ^yUpKHr&j#~\"))", + " │ └─TableFullScan 10000.00 mpp[tiflash] table:tbl_16, partition:p0 keep order:false, stats:pseudo", + " └─StreamAgg 31.98 root group by:Column#26, funcs:count(Column#23)->Column#11, funcs:sum(Column#24)->Column#12, funcs:firstrow(Column#25)->test.tbl_16.col_100", + " └─Projection 39.97 root test.tbl_16.col_100, cast(test.tbl_16.col_100, decimal(8,0) UNSIGNED BINARY)->Column#24, test.tbl_16.col_100, test.tbl_16.col_100", + " └─Sort 39.97 root test.tbl_16.col_100", + " └─TableReader 39.97 root data:ExchangeSender", + " └─ExchangeSender 39.97 mpp[tiflash] ExchangeType: PassThrough", + " └─Selection 39.97 mpp[tiflash] or(eq(test.tbl_16.col_100, 10672141), in(test.tbl_16.col_104, \"yfEG1t!*b\", \"C1*bqx_qyO\", \"vQ^yUpKHr&j#~\"))", + " └─TableFullScan 10000.00 mpp[tiflash] table:tbl_16, partition:p1 keep order:false, stats:pseudo" + ], + "Warning": null + } + ] } ] diff --git a/server/conn.go b/server/conn.go index 4d4300c099ecb..4cd2aedcb42c1 100644 --- a/server/conn.go +++ b/server/conn.go @@ -668,12 +668,12 @@ func (cc *clientConn) readOptionalSSLRequestAndHandshakeResponse(ctx context.Con switch resp.AuthPlugin { case mysql.AuthCachingSha2Password: - resp.Auth, err = cc.authSha(ctx) + resp.Auth, err = cc.authSha(ctx, resp) if err != nil { return err } case mysql.AuthTiDBSM3Password: - resp.Auth, err = cc.authSM3(ctx) + resp.Auth, err = cc.authSM3(ctx, resp) if err != nil { return err } @@ -727,7 +727,7 @@ func (cc *clientConn) handleAuthPlugin(ctx context.Context, resp *handshakeRespo } // authSha implements the caching_sha2_password specific part of the protocol. -func (cc *clientConn) authSha(ctx context.Context) ([]byte, error) { +func (cc *clientConn) authSha(ctx context.Context, resp handshakeResponse41) ([]byte, error) { const ( shaCommand = 1 requestRsaPubKey = 2 // Not supported yet, only TLS is supported as secure channel. @@ -735,6 +735,13 @@ func (cc *clientConn) authSha(ctx context.Context) ([]byte, error) { fastAuthFail = 4 ) + // If no password is specified, we don't send the FastAuthFail to do the full authentication + // as that doesn't make sense without a password and confuses the client. + // https://github.com/pingcap/tidb/issues/40831 + if len(resp.Auth) == 0 { + return []byte{}, nil + } + // Currently we always send a "FastAuthFail" as the cached part of the protocol isn't implemented yet. // This triggers the client to send the full response. err := cc.writePacket([]byte{0, 0, 0, 0, shaCommand, fastAuthFail}) @@ -757,8 +764,16 @@ func (cc *clientConn) authSha(ctx context.Context) ([]byte, error) { } // authSM3 implements the tidb_sm3_password specific part of the protocol. -func (cc *clientConn) authSM3(ctx context.Context) ([]byte, error) { - err := cc.writePacket([]byte{0, 0, 0, 0, 1, 4}) +// tidb_sm3_password is very similar to caching_sha2_password. +func (cc *clientConn) authSM3(ctx context.Context, resp handshakeResponse41) ([]byte, error) { + // If no password is specified, we don't send the FastAuthFail to do the full authentication + // as that doesn't make sense without a password and confuses the client. + // https://github.com/pingcap/tidb/issues/40831 + if len(resp.Auth) == 0 { + return []byte{}, nil + } + + err := cc.writePacket([]byte{0, 0, 0, 0, 1, 4}) // fastAuthFail if err != nil { logutil.Logger(ctx).Error("authSM3 packet write failed", zap.Error(err)) return nil, err diff --git a/server/conn_test.go b/server/conn_test.go index 9f8033cd1f98d..fb1cd15102129 100644 --- a/server/conn_test.go +++ b/server/conn_test.go @@ -1806,3 +1806,48 @@ func TestExtensionChangeUser(t *testing.T) { require.Equal(t, expectedConnInfo.Error, logInfo.Error) require.Equal(t, *(expectedConnInfo.ConnectionInfo), *(logInfo.ConnectionInfo)) } + +func TestAuthSha(t *testing.T) { + store := testkit.CreateMockStore(t) + + var outBuffer bytes.Buffer + tidbdrv := NewTiDBDriver(store) + cfg := newTestConfig() + cfg.Port, cfg.Status.StatusPort = 0, 0 + cfg.Status.ReportStatus = false + server, err := NewServer(cfg, tidbdrv) + require.NoError(t, err) + defer server.Close() + + cc := &clientConn{ + connectionID: 1, + salt: []byte{0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F, 0x10, 0x11, 0x12, 0x13, 0x14}, + server: server, + pkt: &packetIO{ + bufWriter: bufio.NewWriter(&outBuffer), + }, + collation: mysql.DefaultCollationID, + peerHost: "localhost", + alloc: arena.NewAllocator(512), + chunkAlloc: chunk.NewAllocator(), + capability: mysql.ClientProtocol41, + } + + tk := testkit.NewTestKit(t, store) + ctx := &TiDBContext{Session: tk.Session()} + cc.setCtx(ctx) + + resp := handshakeResponse41{ + Capability: mysql.ClientProtocol41 | mysql.ClientPluginAuth, + AuthPlugin: mysql.AuthCachingSha2Password, + Auth: []byte{}, // No password + } + + authData, err := cc.authSha(context.Background(), resp) + require.NoError(t, err) + + // If no password is specified authSha() should return an empty byte slice + // which differs from when a password is specified as that should trigger + // fastAuthFail and the rest of the auth process. + require.Equal(t, authData, []byte{}) +} diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index b339ac39140cc..de0e5ffc90bd3 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -731,7 +731,7 @@ var defaultSysVars = []*SysVar{ return nil }}, {Scope: ScopeGlobal, Name: TiDBEnableTelemetry, Value: BoolToOnOff(DefTiDBEnableTelemetry), Type: TypeBool}, - {Scope: ScopeGlobal, Name: TiDBEnableHistoricalStats, Value: Off, Type: TypeBool}, + {Scope: ScopeGlobal, Name: TiDBEnableHistoricalStats, Value: On, Type: TypeBool}, /* tikv gc metrics */ {Scope: ScopeGlobal, Name: TiDBGCEnable, Value: On, Type: TypeBool, GetGlobal: func(_ context.Context, s *SessionVars) (string, error) { return getTiDBTableValue(s, "tikv_gc_enable", On) diff --git a/statistics/handle/gc.go b/statistics/handle/gc.go index f16e2c9719088..af20eeb35a363 100644 --- a/statistics/handle/gc.go +++ b/statistics/handle/gc.go @@ -53,8 +53,11 @@ func (h *Handle) GCStats(is infoschema.InfoSchema, ddlLease time.Duration) error if err := h.gcTableStats(is, row.GetInt64(0)); err != nil { return errors.Trace(err) } - if err := h.gcHistoryStatsFromKV(row.GetInt64(0)); err != nil { - return errors.Trace(err) + _, existed := is.TableByID(row.GetInt64(0)) + if !existed { + if err := h.gcHistoryStatsFromKV(row.GetInt64(0)); err != nil { + return errors.Trace(err) + } } } if err := h.ClearOutdatedHistoryStats(); err != nil { diff --git a/ttl/metrics/metrics.go b/ttl/metrics/metrics.go index 3c8ceee213a14..8bc01551bc2a0 100644 --- a/ttl/metrics/metrics.go +++ b/ttl/metrics/metrics.go @@ -48,6 +48,8 @@ var ( RunningJobsCnt = metrics.TTLJobStatus.With(prometheus.Labels{metrics.LblType: "running"}) CancellingJobsCnt = metrics.TTLJobStatus.With(prometheus.Labels{metrics.LblType: "cancelling"}) + + RunningTaskCnt = metrics.TTLTaskStatus.With(prometheus.Labels{metrics.LblType: "running"}) ) func initWorkerPhases(workerType string) map[string]prometheus.Counter { diff --git a/ttl/ttlworker/BUILD.bazel b/ttl/ttlworker/BUILD.bazel index f314eca5c01ed..55fe5dffc2b8a 100644 --- a/ttl/ttlworker/BUILD.bazel +++ b/ttl/ttlworker/BUILD.bazel @@ -55,6 +55,7 @@ go_test( ], embed = [":ttlworker"], flaky = True, + race = "on", deps = [ "//domain", "//infoschema", @@ -69,6 +70,7 @@ go_test( "//testkit", "//ttl/cache", "//ttl/client", + "//ttl/metrics", "//ttl/session", "//types", "//util/chunk", @@ -76,6 +78,7 @@ go_test( "@com_github_ngaut_pools//:pools", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_failpoint//:failpoint", + "@com_github_prometheus_client_model//go", "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", "@org_golang_x_time//rate", diff --git a/ttl/ttlworker/job_manager.go b/ttl/ttlworker/job_manager.go index 0b427e64318ac..5f8b7bd038fc4 100644 --- a/ttl/ttlworker/job_manager.go +++ b/ttl/ttlworker/job_manager.go @@ -45,7 +45,7 @@ const setTableStatusOwnerTemplate = `UPDATE mysql.tidb_ttl_table_status SET current_job_id = %?, current_job_owner_id = %?, current_job_start_time = %?, - current_job_status = 'waiting', + current_job_status = 'running', current_job_status_update_time = %?, current_job_ttl_expire = %?, current_job_owner_hb_time = %? @@ -161,6 +161,7 @@ func (m *JobManager) jobLoop() error { m.taskManager.resizeWorkersWithSysVar() for { m.reportMetrics() + m.taskManager.reportMetrics() now := se.Now() select { @@ -651,7 +652,7 @@ func (m *JobManager) createNewJob(expireTime time.Time, now time.Time, table *ca // information from schema cache directly tbl: table, - status: cache.JobStatusWaiting, + status: cache.JobStatusRunning, } } diff --git a/ttl/ttlworker/job_manager_integration_test.go b/ttl/ttlworker/job_manager_integration_test.go index e2e864344fde3..45fd9e69e2bf8 100644 --- a/ttl/ttlworker/job_manager_integration_test.go +++ b/ttl/ttlworker/job_manager_integration_test.go @@ -34,9 +34,11 @@ import ( "github.com/pingcap/tidb/testkit" "github.com/pingcap/tidb/ttl/cache" "github.com/pingcap/tidb/ttl/client" + "github.com/pingcap/tidb/ttl/metrics" "github.com/pingcap/tidb/ttl/session" "github.com/pingcap/tidb/ttl/ttlworker" "github.com/pingcap/tidb/util/logutil" + dto "github.com/prometheus/client_model/go" "github.com/stretchr/testify/require" "go.uber.org/atomic" "go.uber.org/zap" @@ -574,3 +576,41 @@ func TestGCTTLHistory(t *testing.T) { ttlworker.DoGC(context.TODO(), se) tk.MustQuery("select job_id from mysql.tidb_ttl_job_history order by job_id asc").Check(testkit.Rows("1", "2", "3", "4", "5")) } + +func TestJobMetrics(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + sessionFactory := sessionFactory(t, store) + + waitAndStopTTLManager(t, dom) + + now := time.Now() + tk.MustExec("create table test.t (id int, created_at datetime) ttl = `created_at` + interval 1 minute ttl_job_interval = '1m'") + table, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t")) + require.NoError(t, err) + ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnTTL) + + se := sessionFactory() + m := ttlworker.NewJobManager("manager-1", nil, store, nil) + m.TaskManager().ResizeWorkersWithSysVar() + require.NoError(t, m.InfoSchemaCache().Update(se)) + // schedule jobs + m.RescheduleJobs(se, now) + // set the worker to be empty, so none of the tasks will be scheduled + m.TaskManager().SetScanWorkers4Test([]ttlworker.Worker{}) + + sql, args := cache.SelectFromTTLTableStatusWithID(table.Meta().ID) + rows, err := se.ExecuteSQL(ctx, sql, args...) + require.NoError(t, err) + tableStatus, err := cache.RowToTableStatus(se, rows[0]) + require.NoError(t, err) + + require.NotEmpty(t, tableStatus.CurrentJobID) + require.Equal(t, "manager-1", tableStatus.CurrentJobOwnerID) + require.Equal(t, cache.JobStatusRunning, tableStatus.CurrentJobStatus) + + m.ReportMetrics() + out := &dto.Metric{} + require.NoError(t, metrics.RunningJobsCnt.Write(out)) + require.Equal(t, float64(1), out.GetGauge().GetValue()) +} diff --git a/ttl/ttlworker/job_manager_test.go b/ttl/ttlworker/job_manager_test.go index bf7837fc2ee64..311a42072ea91 100644 --- a/ttl/ttlworker/job_manager_test.go +++ b/ttl/ttlworker/job_manager_test.go @@ -171,6 +171,11 @@ func (m *JobManager) UpdateHeartBeat(ctx context.Context, se session.Session, no return m.updateHeartBeat(ctx, se, now) } +// ReportMetrics is an exported version of reportMetrics +func (m *JobManager) ReportMetrics() { + m.reportMetrics() +} + func (j *ttlJob) Finish(se session.Session, now time.Time, summary *TTLSummary) { j.finish(se, now, summary) } diff --git a/ttl/ttlworker/task_manager.go b/ttl/ttlworker/task_manager.go index f20e5cee4f3f6..0db5405034c25 100644 --- a/ttl/ttlworker/task_manager.go +++ b/ttl/ttlworker/task_manager.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/ttl/cache" + "github.com/pingcap/tidb/ttl/metrics" "github.com/pingcap/tidb/ttl/session" "github.com/pingcap/tidb/util/logutil" "go.uber.org/multierr" @@ -327,15 +328,9 @@ func (m *taskManager) lockScanTask(se session.Session, task *cache.TTLTask, now } err := se.RunInTxn(ctx, func() error { - sql, args := cache.SelectFromTTLTaskWithID(task.JobID, task.ScanID) - rows, err := se.ExecuteSQL(ctx, sql+" FOR UPDATE NOWAIT", args...) - if err != nil { - return errors.Wrapf(err, "execute sql: %s", sql) - } - if len(rows) == 0 { - return errors.Errorf("didn't find task with jobID: %s, scanID: %d", task.JobID, task.ScanID) - } - task, err = cache.RowToTTLTask(se, rows[0]) + var err error + + task, err = m.syncTaskFromTable(se, task.JobID, task.ScanID, true) if err != nil { return err } @@ -343,7 +338,7 @@ func (m *taskManager) lockScanTask(se session.Session, task *cache.TTLTask, now return errors.New("task is already scheduled") } - sql, args = setTTLTaskOwnerSQL(task.JobID, task.ScanID, m.id, now) + sql, args := setTTLTaskOwnerSQL(task.JobID, task.ScanID, m.id, now) _, err = se.ExecuteSQL(ctx, sql, args...) if err != nil { return errors.Wrapf(err, "execute sql: %s", sql) @@ -355,6 +350,12 @@ func (m *taskManager) lockScanTask(se session.Session, task *cache.TTLTask, now return nil, err } + // update the task after setting status and owner + task, err = m.syncTaskFromTable(se, task.JobID, task.ScanID, false) + if err != nil { + return nil, err + } + ctx, cancel := context.WithCancel(m.ctx) scanTask := &ttlScanTask{ ctx: ctx, @@ -371,6 +372,28 @@ func (m *taskManager) lockScanTask(se session.Session, task *cache.TTLTask, now }, nil } +func (m *taskManager) syncTaskFromTable(se session.Session, jobID string, scanID int64, detectLock bool) (*cache.TTLTask, error) { + ctx := m.ctx + + sql, args := cache.SelectFromTTLTaskWithID(jobID, scanID) + if detectLock { + sql += " FOR UPDATE NOWAIT" + } + rows, err := se.ExecuteSQL(ctx, sql, args...) + if err != nil { + return nil, errors.Wrapf(err, "execute sql: %s", sql) + } + if len(rows) == 0 { + return nil, errors.Errorf("didn't find task with jobID: %s, scanID: %d", jobID, scanID) + } + task, err := cache.RowToTTLTask(se, rows[0]) + if err != nil { + return nil, err + } + + return task, nil +} + // updateHeartBeat updates the heartbeat for all tasks with current instance as owner func (m *taskManager) updateHeartBeat(ctx context.Context, se session.Session, now time.Time) error { for _, task := range m.runningTasks { @@ -427,6 +450,7 @@ func (m *taskManager) reportTaskFinished(se session.Session, now time.Time, task if err != nil { return err } + task.Status = cache.TaskStatusFinished timeoutCtx, cancel := context.WithTimeout(m.ctx, ttlInternalSQLTimeout) _, err = se.ExecuteSQL(timeoutCtx, sql, args...) @@ -474,6 +498,10 @@ func (m *taskManager) checkInvalidTask(se session.Session) { m.runningTasks = ownRunningTask } +func (m *taskManager) reportMetrics() { + metrics.RunningTaskCnt.Set(float64(len(m.runningTasks))) +} + type runningScanTask struct { *ttlScanTask cancel func() diff --git a/ttl/ttlworker/task_manager_integration_test.go b/ttl/ttlworker/task_manager_integration_test.go index 8b7d0df5257b0..9e3bad19b2acd 100644 --- a/ttl/ttlworker/task_manager_integration_test.go +++ b/ttl/ttlworker/task_manager_integration_test.go @@ -26,8 +26,10 @@ import ( "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/testkit" "github.com/pingcap/tidb/ttl/cache" + "github.com/pingcap/tidb/ttl/metrics" "github.com/pingcap/tidb/ttl/ttlworker" "github.com/pingcap/tidb/util/logutil" + dto "github.com/prometheus/client_model/go" "github.com/stretchr/testify/require" "go.uber.org/atomic" "go.uber.org/zap" @@ -185,3 +187,35 @@ func TestTaskScheduleExpireHeartBeat(t *testing.T) { m2.RescheduleTasks(sessionFactory(), now.Add(time.Hour)) tk.MustQuery("select status,owner_id from mysql.tidb_ttl_task").Check(testkit.Rows("running task-manager-2")) } + +func TestTaskMetrics(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + waitAndStopTTLManager(t, dom) + tk := testkit.NewTestKit(t, store) + sessionFactory := sessionFactory(t, store) + + // create table and scan task + tk.MustExec("create table test.t(id int, created_at datetime) ttl=created_at + interval 1 day") + table, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t")) + require.NoError(t, err) + sql := fmt.Sprintf("insert into mysql.tidb_ttl_task(job_id,table_id,scan_id,expire_time,created_time) values ('test-job', %d, %d, NOW(), NOW())", table.Meta().ID, 1) + tk.MustExec(sql) + + // update the infoschema cache + isc := cache.NewInfoSchemaCache(time.Second) + require.NoError(t, isc.Update(sessionFactory())) + now := time.Now() + + // schedule in a task manager + scanWorker := ttlworker.NewMockScanWorker(t) + scanWorker.Start() + m := ttlworker.NewTaskManager(context.Background(), nil, isc, "task-manager-1") + m.SetScanWorkers4Test([]ttlworker.Worker{scanWorker}) + m.RescheduleTasks(sessionFactory(), now) + tk.MustQuery("select status,owner_id from mysql.tidb_ttl_task").Check(testkit.Rows("running task-manager-1")) + + m.ReportMetrics() + out := &dto.Metric{} + require.NoError(t, metrics.RunningTaskCnt.Write(out)) + require.Equal(t, float64(1), out.GetGauge().GetValue()) +} diff --git a/ttl/ttlworker/task_manager_test.go b/ttl/ttlworker/task_manager_test.go index 9241146b719b3..37365cb9757f6 100644 --- a/ttl/ttlworker/task_manager_test.go +++ b/ttl/ttlworker/task_manager_test.go @@ -49,6 +49,11 @@ func (m *taskManager) RescheduleTasks(se session.Session, now time.Time) { m.rescheduleTasks(se, now) } +// ReportMetrics is an exported version of reportMetrics +func (m *taskManager) ReportMetrics() { + m.reportMetrics() +} + func TestResizeWorkers(t *testing.T) { tbl := newMockTTLTbl(t, "t1") diff --git a/util/gctuner/tuner.go b/util/gctuner/tuner.go index ec74f48ec4be0..3332d77b93875 100644 --- a/util/gctuner/tuner.go +++ b/util/gctuner/tuner.go @@ -144,15 +144,18 @@ func (t *tuner) getGCPercent() uint32 { // tuning check the memory inuse and tune GC percent dynamically. // Go runtime ensure that it will be called serially. func (t *tuner) tuning() { + if !EnableGOGCTuner.Load() { + return + } + inuse := readMemoryInuse() threshold := t.getThreshold() // stop gc tuning if threshold <= 0 { return } - if EnableGOGCTuner.Load() { - t.setGCPercent(calcGCPercent(inuse, threshold)) - } + + t.setGCPercent(calcGCPercent(inuse, threshold)) } // threshold = inuse + inuse * (gcPercent / 100)