Skip to content

Commit

Permalink
Merge branch 'master' into fix-39826
Browse files Browse the repository at this point in the history
  • Loading branch information
Defined2014 authored Dec 30, 2022
2 parents 6b4fd3a + ea29db5 commit d427935
Show file tree
Hide file tree
Showing 93 changed files with 2,150 additions and 286 deletions.
2 changes: 1 addition & 1 deletion br/pkg/lightning/restore/tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func createIfNotExistsStmt(p *parser.Parser, createTable, dbName, tblName string
}

var res strings.Builder
ctx := format.NewRestoreCtx(format.DefaultRestoreFlags|format.RestoreTiDBSpecialComment, &res)
ctx := format.NewRestoreCtx(format.DefaultRestoreFlags|format.RestoreTiDBSpecialComment|format.RestoreWithTTLEnableOff, &res)

retStmts := make([]string, 0, len(stmts))
for _, stmt := range stmts {
Expand Down
8 changes: 8 additions & 0 deletions br/pkg/restore/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,10 @@ func (db *DB) CreateTables(ctx context.Context, tables []*metautil.Table,
return errors.Trace(err)
}
}

if ttlInfo := table.Info.TTLInfo; ttlInfo != nil {
ttlInfo.Enable = false
}
}
if err := batchSession.CreateTables(ctx, m, db.tableIDAllocFilter()); err != nil {
return err
Expand Down Expand Up @@ -336,6 +340,10 @@ func (db *DB) CreateTable(ctx context.Context, table *metautil.Table,
}
}

if ttlInfo := table.Info.TTLInfo; ttlInfo != nil {
ttlInfo.Enable = false
}

err := db.se.CreateTable(ctx, table.DB.Name, table.Info, db.tableIDAllocFilter())
if err != nil {
log.Error("create table failed",
Expand Down
3 changes: 3 additions & 0 deletions br/pkg/stream/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,11 @@ go_test(
"//br/pkg/storage",
"//br/pkg/streamhelper",
"//meta",
"//parser/ast",
"//parser/model",
"//parser/mysql",
"//tablecodec",
"//types",
"//util/codec",
"//util/table-filter",
"@com_github_pingcap_kvproto//pkg/brpb",
Expand Down
5 changes: 5 additions & 0 deletions br/pkg/stream/rewrite_meta_rawkv.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,11 @@ func (sr *SchemasReplace) rewriteTableInfo(value []byte, dbID int64) ([]byte, bo
}
}

// Force to disable TTL_ENABLE when restore
if newTableInfo.TTLInfo != nil {
newTableInfo.TTLInfo.Enable = false
}

if sr.AfterTableRewritten != nil {
sr.AfterTableRewritten(false, newTableInfo)
}
Expand Down
49 changes: 49 additions & 0 deletions br/pkg/stream/rewrite_meta_rawkv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ import (
"encoding/json"
"testing"

"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/types"
filter "github.com/pingcap/tidb/util/table-filter"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -312,6 +315,52 @@ func TestRewriteValueForExchangePartition(t *testing.T) {
require.Equal(t, tableInfo.ID, pt1ID+100)
}

func TestRewriteValueForTTLTable(t *testing.T) {
var (
dbId int64 = 40
tableID int64 = 100
colID int64 = 1000
colName = "t"
tableName = "t1"
tableInfo model.TableInfo
)

tbl := model.TableInfo{
ID: tableID,
Name: model.NewCIStr(tableName),
Columns: []*model.ColumnInfo{
{
ID: colID,
Name: model.NewCIStr(colName),
FieldType: *types.NewFieldType(mysql.TypeTimestamp),
},
},
TTLInfo: &model.TTLInfo{
ColumnName: model.NewCIStr(colName),
IntervalExprStr: "1",
IntervalTimeUnit: int(ast.TimeUnitDay),
Enable: true,
},
}
value, err := json.Marshal(&tbl)
require.Nil(t, err)

sr := MockEmptySchemasReplace(nil)
newValue, needRewrite, err := sr.rewriteTableInfo(value, dbId)
require.Nil(t, err)
require.True(t, needRewrite)

err = json.Unmarshal(newValue, &tableInfo)
require.Nil(t, err)
require.Equal(t, tableInfo.Name.String(), tableName)
require.Equal(t, tableInfo.ID, sr.DbMap[dbId].TableMap[tableID].NewTableID)
require.NotNil(t, tableInfo.TTLInfo)
require.Equal(t, colName, tableInfo.TTLInfo.ColumnName.O)
require.Equal(t, "1", tableInfo.TTLInfo.IntervalExprStr)
require.Equal(t, int(ast.TimeUnitDay), tableInfo.TTLInfo.IntervalTimeUnit)
require.False(t, tableInfo.TTLInfo.Enable)
}

// db:70->80 -
// | - t0:71->81 -
// | | - p0:72->82
Expand Down
54 changes: 54 additions & 0 deletions br/tests/br_ttl/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
#!/bin/sh
#
# Copyright 2022 PingCAP, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

set -eu
DB="$TEST_NAME"

PROGRESS_FILE="$TEST_DIR/progress_file"
BACKUPMETAV1_LOG="$TEST_DIR/backup.log"
BACKUPMETAV2_LOG="$TEST_DIR/backupv2.log"
RESTORE_LOG="$TEST_DIR/restore.log"
rm -rf $PROGRESS_FILE

run_sql "create schema $DB;"
run_sql "create table $DB.ttl_test_tbl(id int primary key, t datetime) TTL=\`t\` + interval 1 day TTL_ENABLE='ON'"

# backup db
echo "full backup meta v2 start..."
unset BR_LOG_TO_TERM
rm -f $BACKUPMETAV2_LOG
run_br backup full --log-file $BACKUPMETAV2_LOG -s "local://$TEST_DIR/${DB}v2" --pd $PD_ADDR --use-backupmeta-v2

echo "full backup meta v1 start..."
rm -f $BACKUPMETAV1_LOG
run_br backup full --log-file $BACKUPMETAV1_LOG -s "local://$TEST_DIR/$DB" --pd $PD_ADDR

TTL_MARK='![ttl]'
CREATE_SQL_CONTAINS="/*T${TTL_MARK} TTL=\`t\` + INTERVAL 1 DAY */ /*T${TTL_MARK} TTL_ENABLE='OFF' */"

# restore v2
run_sql "DROP DATABASE $DB;"
echo "restore ttl table start v2..."
run_br restore db --db $DB -s "local://$TEST_DIR/${DB}v2" --pd $PD_ADDR
run_sql "show create table $DB.ttl_test_tbl;"
check_contains "$CREATE_SQL_CONTAINS"

# restore v1
run_sql "DROP DATABASE $DB;"
echo "restore ttl table start v1..."
run_br restore db --db $DB -s "local://$TEST_DIR/$DB" --pd $PD_ADDR
run_sql "show create table $DB.ttl_test_tbl;"
check_contains "$CREATE_SQL_CONTAINS"
1 change: 1 addition & 0 deletions br/tests/lightning_foreign_key/data/fk.child-schema.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
create table child (id int key, pid int, constraint fk_1 foreign key (pid) references parent(id));
1 change: 1 addition & 0 deletions br/tests/lightning_foreign_key/data/fk.child.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
insert into child values (1,1),(2,2),(3,3),(4,4);
1 change: 1 addition & 0 deletions br/tests/lightning_foreign_key/data/fk.parent-schema.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
create table parent(id int key, a int);
1 change: 1 addition & 0 deletions br/tests/lightning_foreign_key/data/fk.parent.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
insert into parent values (1,1),(2,2),(3,3),(4,4);
14 changes: 12 additions & 2 deletions br/tests/lightning_foreign_key/run.sh
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,23 @@

set -eu

# Create existing tables that import data will reference.
run_sql 'DROP DATABASE IF EXISTS fk;'
run_sql 'CREATE DATABASE IF NOT EXISTS fk;'
# Create existing tables that import data will reference.
run_sql 'CREATE TABLE fk.t2 (a BIGINT PRIMARY KEY);'

for BACKEND in tidb local; do
run_sql 'DROP TABLE IF EXISTS fk.t;'
run_sql 'DROP TABLE IF EXISTS fk.t, fk.parent, fk.child;'

run_lightning --backend $BACKEND
run_sql 'SELECT GROUP_CONCAT(a) FROM fk.t ORDER BY a;'
check_contains '1,2,3,4,5'

run_sql 'SELECT count(1), sum(a) FROM fk.parent;'
check_contains 'count(1): 4'
check_contains 'sum(a): 10'

run_sql 'SELECT count(1), sum(pid) FROM fk.child;'
check_contains 'count(1): 4'
check_contains 'sum(pid): 10'
done
2 changes: 2 additions & 0 deletions br/tests/lightning_ttl/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[tikv-importer]
backend = 'local'
1 change: 1 addition & 0 deletions br/tests/lightning_ttl/data/ttldb-schema-create.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
CREATE DATABASE `ttldb`;
4 changes: 4 additions & 0 deletions br/tests/lightning_ttl/data/ttldb.t1-schema.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
CREATE TABLE `t1` (
`id` int(11) PRIMARY KEY,
`t` datetime
) TTL = `t` + INTERVAL 1 DAY TTL_ENABLE = 'ON';
26 changes: 26 additions & 0 deletions br/tests/lightning_ttl/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#!/bin/sh
#
# Copyright 2022 PingCAP, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

set -eu

run_sql 'drop database if exists ttldb;'
run_lightning

TTL_MARK='![ttl]'
CREATE_SQL_CONTAINS="/*T${TTL_MARK} TTL=\`t\` + INTERVAL 1 DAY */ /*T${TTL_MARK} TTL_ENABLE='OFF' */"

run_sql 'show create table ttldb.t1'
check_contains "$CREATE_SQL_CONTAINS"
3 changes: 3 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,9 @@ type Config struct {
// EnableGlobalKill indicates whether to enable global kill.
TrxSummary TrxSummary `toml:"transaction-summary" json:"transaction-summary"`
EnableGlobalKill bool `toml:"enable-global-kill" json:"enable-global-kill"`
// InitializeSQLFile is a file that will be executed after first bootstrap only.
// It can be used to set GLOBAL system variable values
InitializeSQLFile string `toml:"initialize-sql-file" json:"initialize-sql-file"`

// The following items are deprecated. We need to keep them here temporarily
// to support the upgrade process. They can be removed in future.
Expand Down
60 changes: 54 additions & 6 deletions ddl/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"context"
"encoding/hex"
"fmt"
"math"
"strings"
"time"

Expand Down Expand Up @@ -147,6 +148,17 @@ func getTiDBSuperReadOnly(sess sessionctx.Context) (string, error) {
return val, nil
}

func isFlashbackSupportedDDLAction(action model.ActionType) bool {
switch action {
case model.ActionSetTiFlashReplica, model.ActionUpdateTiFlashReplicaStatus, model.ActionAlterPlacementPolicy,
model.ActionAlterTablePlacement, model.ActionAlterTablePartitionPlacement, model.ActionCreatePlacementPolicy,
model.ActionDropPlacementPolicy, model.ActionModifySchemaDefaultPlacement:
return false
default:
return true
}
}

func checkAndSetFlashbackClusterInfo(sess sessionctx.Context, d *ddlCtx, t *meta.Meta, job *model.Job, flashbackTS uint64) (err error) {
if err = ValidateFlashbackTS(d.ctx, sess, flashbackTS); err != nil {
return err
Expand All @@ -170,19 +182,47 @@ func checkAndSetFlashbackClusterInfo(sess sessionctx.Context, d *ddlCtx, t *meta
return errors.Trace(err)
}

flashbackSchemaVersion, err := meta.NewSnapshotMeta(d.store.GetSnapshot(kv.NewVersion(flashbackTS))).GetSchemaVersion()
flashbackSnapshotMeta := meta.NewSnapshotMeta(d.store.GetSnapshot(kv.NewVersion(flashbackTS)))
flashbackSchemaVersion, err := flashbackSnapshotMeta.GetSchemaVersion()
if err != nil {
return errors.Trace(err)
}

// If flashbackSchemaVersion not same as nowSchemaVersion, we've done ddl during [flashbackTs, now).
flashbackTSString := oracle.GetTimeFromTS(flashbackTS).String()

// Check if there is an upgrade during [flashbackTS, now)
sql := fmt.Sprintf("select VARIABLE_VALUE from mysql.tidb as of timestamp '%s' where VARIABLE_NAME='tidb_server_version'", flashbackTSString)
rows, err := newSession(sess).execute(d.ctx, sql, "check_tidb_server_version")
if err != nil || len(rows) == 0 {
return errors.Errorf("Get history `tidb_server_version` failed, can't do flashback")
}
sql = fmt.Sprintf("select 1 from mysql.tidb where VARIABLE_NAME='tidb_server_version' and VARIABLE_VALUE=%s", rows[0].GetString(0))
rows, err = newSession(sess).execute(d.ctx, sql, "check_tidb_server_version")
if err != nil {
return errors.Trace(err)
}
if len(rows) == 0 {
return errors.Errorf("Detected TiDB upgrade during [%s, now), can't do flashback", flashbackTSString)
}

// Check is there a DDL task at flashbackTS.
sql = fmt.Sprintf("select count(*) from mysql.%s as of timestamp '%s'", JobTable, flashbackTSString)
rows, err = newSession(sess).execute(d.ctx, sql, "check_history_job")
if err != nil || len(rows) == 0 {
return errors.Errorf("Get history ddl jobs failed, can't do flashback")
}
if rows[0].GetInt64(0) != 0 {
return errors.Errorf("Detected another DDL job at %s, can't do flashback", flashbackTSString)
}

// If flashbackSchemaVersion not same as nowSchemaVersion, we should check all schema diffs during [flashbackTs, now).
for i := flashbackSchemaVersion + 1; i <= nowSchemaVersion; i++ {
diff, err := t.GetSchemaDiff(i)
if err != nil {
return errors.Trace(err)
}
if diff != nil && diff.Type != model.ActionFlashbackCluster {
return errors.Errorf("Detected schema change due to another DDL job during [%s, now), can't do flashback", oracle.GetTimeFromTS(flashbackTS))
if diff != nil && !isFlashbackSupportedDDLAction(diff.Type) {
return errors.Errorf("Detected unsupported DDL job type(%s) during [%s, now), can't do flashback", diff.Type.String(), flashbackTSString)
}
}

Expand Down Expand Up @@ -211,7 +251,7 @@ type flashbackID struct {

func addToSlice(schema string, tableName string, tableID int64, flashbackIDs []flashbackID) []flashbackID {
var excluded bool
if filter.IsSystemSchema(schema) && !strings.HasPrefix(tableName, "stats_") {
if filter.IsSystemSchema(schema) && !strings.HasPrefix(tableName, "stats_") && tableName != "gc_delete_range" {
excluded = true
}
flashbackIDs = append(flashbackIDs, flashbackID{
Expand Down Expand Up @@ -270,6 +310,14 @@ func GetFlashbackKeyRanges(sess sessionctx.Context) ([]kv.KeyRange, error) {
})
}

// The meta data key ranges.
metaStartKey := tablecodec.EncodeMetaKey(meta.DBkey(0), meta.TableKey(0))
metaEndKey := tablecodec.EncodeMetaKey(meta.DBkey(math.MaxInt64), meta.TableKey(math.MaxInt64))
keyRanges = append(keyRanges, kv.KeyRange{
StartKey: metaStartKey,
EndKey: metaEndKey,
})

return keyRanges, nil
}

Expand Down Expand Up @@ -633,7 +681,7 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve
asyncNotifyEvent(d, &util.Event{Tp: model.ActionFlashbackCluster})
job.State = model.JobStateDone
job.SchemaState = model.StatePublic
return ver, nil
return updateSchemaVersion(d, t, job)
}
return ver, nil
}
Expand Down
Loading

0 comments on commit d427935

Please sign in to comment.