Skip to content

Commit

Permalink
Merge branch 'master' into fix-40584
Browse files Browse the repository at this point in the history
  • Loading branch information
Dousir9 authored Jan 16, 2023
2 parents ec518cf + dbce2cb commit cbd1855
Show file tree
Hide file tree
Showing 35 changed files with 750 additions and 134 deletions.
4 changes: 2 additions & 2 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -3582,8 +3582,8 @@ def go_deps():
name = "com_github_tikv_client_go_v2",
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/client-go/v2",
sum = "h1:RI6bs9TDIIJ96N0lR5uZoGO8QNot4qS/1l+Mobx0InM=",
version = "v2.0.5-0.20230110071533-f313ddf58d73",
sum = "h1:B2FNmPDaGirXpIOgQbqxiukIkT8eOT4tKEahqYE2ers=",
version = "v2.0.5-0.20230112062023-fe5b35c5f5dc",
)
go_repository(
name = "com_github_tikv_pd_client",
Expand Down
6 changes: 5 additions & 1 deletion br/pkg/gluetidb/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ type tidbSession struct {

// GetDomain implements glue.Glue.
func (Glue) GetDomain(store kv.Storage) (*domain.Domain, error) {
initStatsSe, err := session.CreateSession(store)
if err != nil {
return nil, errors.Trace(err)
}
se, err := session.CreateSession(store)
if err != nil {
return nil, errors.Trace(err)
Expand All @@ -74,7 +78,7 @@ func (Glue) GetDomain(store kv.Storage) (*domain.Domain, error) {
return nil, err
}
// create stats handler for backup and restore.
err = dom.UpdateTableStatsLoop(se)
err = dom.UpdateTableStatsLoop(se, initStatsSe)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/streamhelper/advancer_cliext.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func (t AdvancerExt) Begin(ctx context.Context, ch chan<- TaskEvent) error {
return nil
}

func (t AdvancerExt) getGlobalCheckpointForTask(ctx context.Context, taskName string) (uint64, error) {
func (t AdvancerExt) GetGlobalCheckpointForTask(ctx context.Context, taskName string) (uint64, error) {
key := GlobalCheckpointOf(taskName)
resp, err := t.KV.Get(ctx, key)
if err != nil {
Expand All @@ -211,7 +211,7 @@ func (t AdvancerExt) getGlobalCheckpointForTask(ctx context.Context, taskName st
func (t AdvancerExt) UploadV3GlobalCheckpointForTask(ctx context.Context, taskName string, checkpoint uint64) error {
key := GlobalCheckpointOf(taskName)
value := string(encodeUint64(checkpoint))
oldValue, err := t.getGlobalCheckpointForTask(ctx, taskName)
oldValue, err := t.GetGlobalCheckpointForTask(ctx, taskName)
if err != nil {
return err
}
Expand Down
34 changes: 2 additions & 32 deletions br/pkg/streamhelper/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,8 @@ func (c *MetaDataClient) DeleteTask(ctx context.Context, taskName string) error
clientv3.OpDelete(CheckPointsOf(taskName), clientv3.WithPrefix()),
clientv3.OpDelete(Pause(taskName)),
clientv3.OpDelete(LastErrorPrefixOf(taskName), clientv3.WithPrefix()),
clientv3.OpDelete(GlobalCheckpointOf(taskName)),
clientv3.OpDelete(StorageCheckpointOf(taskName), clientv3.WithPrefix()),
).
Commit()
if err != nil {
Expand Down Expand Up @@ -372,28 +374,6 @@ func (t *Task) GetStorageCheckpoint(ctx context.Context) (uint64, error) {
return storageCheckpoint, nil
}

// MinNextBackupTS query the all next backup ts of a store, returning the minimal next backup ts of the store.
func (t *Task) MinNextBackupTS(ctx context.Context, store uint64) (uint64, error) {
key := CheckPointOf(t.Info.Name, store)
resp, err := t.cli.KV.Get(ctx, key)
if err != nil {
return 0, errors.Annotatef(err, "failed to get checkpoints of %s", t.Info.Name)
}
if resp.Count != 1 {
return 0, nil
}
kv := resp.Kvs[0]
if len(kv.Value) != 8 {
return 0, errors.Annotatef(berrors.ErrPiTRMalformedMetadata,
"the next backup ts of store %d isn't 64bits (it is %d bytes, value = %s)",
store,
len(kv.Value),
redact.Key(kv.Value))
}
nextBackupTS := binary.BigEndian.Uint64(kv.Value)
return nextBackupTS, nil
}

// GetGlobalCheckPointTS gets the global checkpoint timestamp according to log task.
func (t *Task) GetGlobalCheckPointTS(ctx context.Context) (uint64, error) {
checkPointMap, err := t.NextBackupTSList(ctx)
Expand Down Expand Up @@ -422,16 +402,6 @@ func (t *Task) GetGlobalCheckPointTS(ctx context.Context) (uint64, error) {
return mathutil.Max(checkpoint, ts), nil
}

// Step forwards the progress (next_backup_ts) of some region.
// The task should be done by TiKV. This function should only be used for test cases.
func (t *Task) Step(ctx context.Context, store uint64, ts uint64) error {
_, err := t.cli.KV.Put(ctx, CheckPointOf(t.Info.Name, store), string(encodeUint64(ts)))
if err != nil {
return errors.Annotatef(err, "failed forward the progress of %s to %d", t.Info.Name, ts)
}
return nil
}

func (t *Task) UploadGlobalCheckpoint(ctx context.Context, ts uint64) error {
_, err := t.cli.KV.Put(ctx, GlobalCheckpointOf(t.Info.Name), string(encodeUint64(ts)))
if err != nil {
Expand Down
122 changes: 81 additions & 41 deletions br/pkg/streamhelper/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"testing"

"github.com/pingcap/errors"
backup "github.com/pingcap/kvproto/pkg/brpb"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/log"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/logutil"
Expand Down Expand Up @@ -138,11 +138,11 @@ func TestIntegration(t *testing.T) {
defer etcd.Server.Stop()
metaCli := streamhelper.MetaDataClient{Client: cli}
t.Run("TestBasic", func(t *testing.T) { testBasic(t, metaCli, etcd) })
t.Run("TestForwardProgress", func(t *testing.T) { testForwardProgress(t, metaCli, etcd) })
t.Run("testGetStorageCheckpoint", func(t *testing.T) { testGetStorageCheckpoint(t, metaCli) })
t.Run("testGetGlobalCheckPointTS", func(t *testing.T) { testGetGlobalCheckPointTS(t, metaCli) })
t.Run("TestStreamListening", func(t *testing.T) { testStreamListening(t, streamhelper.AdvancerExt{MetaDataClient: metaCli}) })
t.Run("TestStreamCheckpoint", func(t *testing.T) { testStreamCheckpoint(t, streamhelper.AdvancerExt{MetaDataClient: metaCli}) })
t.Run("testStoptask", func(t *testing.T) { testStoptask(t, streamhelper.AdvancerExt{MetaDataClient: metaCli}) })
}

func TestChecking(t *testing.T) {
Expand Down Expand Up @@ -210,31 +210,6 @@ func testBasic(t *testing.T, metaCli streamhelper.MetaDataClient, etcd *embed.Et
rangeIsEmpty(t, []byte(streamhelper.RangesOf(taskName)), etcd)
}

func testForwardProgress(t *testing.T, metaCli streamhelper.MetaDataClient, etcd *embed.Etcd) {
ctx := context.Background()
taskName := "many_tables"
taskInfo := simpleTask(taskName, 65)
defer func() {
require.NoError(t, metaCli.DeleteTask(ctx, taskName))
}()

require.NoError(t, metaCli.PutTask(ctx, taskInfo))
task, err := metaCli.GetTask(ctx, taskName)
require.NoError(t, err)
require.NoError(t, task.Step(ctx, 1, 41))
require.NoError(t, task.Step(ctx, 1, 42))
require.NoError(t, task.Step(ctx, 2, 40))
rs, err := task.Ranges(ctx)
require.NoError(t, err)
require.Equal(t, simpleRanges(65), rs)
store1Checkpoint, err := task.MinNextBackupTS(ctx, 1)
require.NoError(t, err)
require.Equal(t, store1Checkpoint, uint64(42))
store2Checkpoint, err := task.MinNextBackupTS(ctx, 2)
require.NoError(t, err)
require.Equal(t, store2Checkpoint, uint64(40))
}

func testGetStorageCheckpoint(t *testing.T, metaCli streamhelper.MetaDataClient) {
var (
taskName = "my_task"
Expand Down Expand Up @@ -298,7 +273,7 @@ func testGetGlobalCheckPointTS(t *testing.T, metaCli streamhelper.MetaDataClient
require.NoError(t, err)
}

task := streamhelper.NewTask(&metaCli, backup.StreamBackupTaskInfo{Name: taskName})
task := streamhelper.NewTask(&metaCli, backuppb.StreamBackupTaskInfo{Name: taskName})
task.UploadGlobalCheckpoint(ctx, 1003)

globalTS, err := task.GetGlobalCheckPointTS(ctx)
Expand Down Expand Up @@ -343,21 +318,86 @@ func testStreamCheckpoint(t *testing.T, metaCli streamhelper.AdvancerExt) {
ctx := context.Background()
task := "simple"
req := require.New(t)
getCheckpoint := func() uint64 {
resp, err := metaCli.KV.Get(ctx, streamhelper.GlobalCheckpointOf(task))
req.NoError(err)
if len(resp.Kvs) == 0 {
return 0
}
req.Len(resp.Kvs, 1)
return binary.BigEndian.Uint64(resp.Kvs[0].Value)
}

req.NoError(metaCli.UploadV3GlobalCheckpointForTask(ctx, task, 5))
req.EqualValues(5, getCheckpoint())
ts, err := metaCli.GetGlobalCheckpointForTask(ctx, task)
req.NoError(err)
req.EqualValues(5, ts)
req.NoError(metaCli.UploadV3GlobalCheckpointForTask(ctx, task, 18))
req.EqualValues(18, getCheckpoint())
ts, err = metaCli.GetGlobalCheckpointForTask(ctx, task)
req.NoError(err)
req.EqualValues(18, ts)
req.NoError(metaCli.UploadV3GlobalCheckpointForTask(ctx, task, 16))
req.EqualValues(18, getCheckpoint())
ts, err = metaCli.GetGlobalCheckpointForTask(ctx, task)
req.NoError(err)
req.EqualValues(18, ts)
req.NoError(metaCli.ClearV3GlobalCheckpointForTask(ctx, task))
req.EqualValues(0, getCheckpoint())
ts, err = metaCli.GetGlobalCheckpointForTask(ctx, task)
req.NoError(err)
req.EqualValues(0, ts)
}

func testStoptask(t *testing.T, metaCli streamhelper.AdvancerExt) {
var (
ctx = context.Background()
taskName = "stop_task"
req = require.New(t)
taskInfo = streamhelper.TaskInfo{
PBInfo: backuppb.StreamBackupTaskInfo{
Name: taskName,
StartTs: 0,
},
}
storeID = "5"
storageCheckpoint = make([]byte, 8)
)

// put task
req.NoError(metaCli.PutTask(ctx, taskInfo))
t2, err := metaCli.GetTask(ctx, taskName)
req.NoError(err)
req.EqualValues(taskInfo.PBInfo.Name, t2.Info.Name)

// upload global checkpoint
req.NoError(metaCli.UploadV3GlobalCheckpointForTask(ctx, taskName, 100))
ts, err := metaCli.GetGlobalCheckpointForTask(ctx, taskName)
req.NoError(err)
req.EqualValues(100, ts)

//upload storage checkpoint
key := path.Join(streamhelper.StorageCheckpointOf(taskName), storeID)
binary.BigEndian.PutUint64(storageCheckpoint, 90)
_, err = metaCli.Put(ctx, key, string(storageCheckpoint))
req.NoError(err)

task := streamhelper.NewTask(&metaCli.MetaDataClient, taskInfo.PBInfo)
ts, err = task.GetStorageCheckpoint(ctx)
req.NoError(err)
req.EqualValues(ts, 90)

// pause task
req.NoError(metaCli.PauseTask(ctx, taskName))
resp, err := metaCli.KV.Get(ctx, streamhelper.Pause(taskName))
req.NoError(err)
req.EqualValues(1, len(resp.Kvs))

// stop task
err = metaCli.DeleteTask(ctx, taskName)
req.NoError(err)

// check task and other meta infomations not existed
_, err = metaCli.GetTask(ctx, taskName)
req.Error(err)

ts, err = task.GetStorageCheckpoint(ctx)
req.NoError(err)
req.EqualValues(ts, 0)

ts, err = metaCli.GetGlobalCheckpointForTask(ctx, taskName)
req.NoError(err)
req.EqualValues(0, ts)

resp, err = metaCli.KV.Get(ctx, streamhelper.Pause(taskName))
req.NoError(err)
req.EqualValues(0, len(resp.Kvs))
}
18 changes: 2 additions & 16 deletions br/pkg/streamhelper/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,6 @@ func RangeKeyOf(name string, startKey []byte) string {
return RangesOf(name) + string(startKey)
}

func writeUint64(buf *bytes.Buffer, num uint64) {
items := [8]byte{}
binary.BigEndian.PutUint64(items[:], num)
buf.Write(items[:])
}

func encodeUint64(num uint64) []byte {
items := [8]byte{}
binary.BigEndian.PutUint64(items[:], num)
Expand All @@ -83,25 +77,17 @@ func CheckPointsOf(task string) string {
}

// GlobalCheckpointOf returns the path to the "global" checkpoint of some task.
// Normally it would be <prefix>/checkpoint/<task-name>/central_globa.
func GlobalCheckpointOf(task string) string {
return path.Join(streamKeyPrefix, taskCheckpointPath, task, checkpointTypeGlobal)
}

// StorageCheckpointOf get the prefix path of the `storage checkpoint status` of a task.
// Normally it would be <prefix>/storage-checkpoint/<task>.
func StorageCheckpointOf(task string) string {
return path.Join(streamKeyPrefix, storageCheckPoint, task)
}

// CheckpointOf returns the checkpoint prefix of some store.
// Normally it would be <prefix>/checkpoint/<task-name>/<store-id(binary-u64)>.
func CheckPointOf(task string, store uint64) string {
buf := bytes.NewBuffer(nil)
buf.WriteString(strings.TrimSuffix(path.Join(streamKeyPrefix, taskCheckpointPath, task), "/"))
buf.WriteRune('/')
writeUint64(buf, store)
return buf.String()
}

// Pause returns the path for pausing the task.
// Normally it would be <prefix>/pause/<task-name>.
func Pause(task string) string {
Expand Down
12 changes: 6 additions & 6 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1836,8 +1836,8 @@ func (do *Domain) StatsHandle() *handle.Handle {
}

// CreateStatsHandle is used only for test.
func (do *Domain) CreateStatsHandle(ctx sessionctx.Context) error {
h, err := handle.NewHandle(ctx, do.statsLease, do.sysSessionPool, &do.sysProcesses, do.ServerID)
func (do *Domain) CreateStatsHandle(ctx, initStatsCtx sessionctx.Context) error {
h, err := handle.NewHandle(ctx, initStatsCtx, do.statsLease, do.sysSessionPool, &do.sysProcesses, do.ServerID)
if err != nil {
return err
}
Expand Down Expand Up @@ -1900,8 +1900,8 @@ func (do *Domain) SetupAnalyzeExec(ctxs []sessionctx.Context) {
}

// LoadAndUpdateStatsLoop loads and updates stats info.
func (do *Domain) LoadAndUpdateStatsLoop(ctxs []sessionctx.Context) error {
if err := do.UpdateTableStatsLoop(ctxs[0]); err != nil {
func (do *Domain) LoadAndUpdateStatsLoop(ctxs []sessionctx.Context, initStatsCtx sessionctx.Context) error {
if err := do.UpdateTableStatsLoop(ctxs[0], initStatsCtx); err != nil {
return err
}
do.StartLoadStatsSubWorkers(ctxs[1:])
Expand All @@ -1911,9 +1911,9 @@ func (do *Domain) LoadAndUpdateStatsLoop(ctxs []sessionctx.Context) error {
// UpdateTableStatsLoop creates a goroutine loads stats info and updates stats info in a loop.
// It will also start a goroutine to analyze tables automatically.
// It should be called only once in BootstrapSession.
func (do *Domain) UpdateTableStatsLoop(ctx sessionctx.Context) error {
func (do *Domain) UpdateTableStatsLoop(ctx, initStatsCtx sessionctx.Context) error {
ctx.GetSessionVars().InRestrictedSQL = true
statsHandle, err := handle.NewHandle(ctx, do.statsLease, do.sysSessionPool, &do.sysProcesses, do.ServerID)
statsHandle, err := handle.NewHandle(ctx, initStatsCtx, do.statsLease, do.sysSessionPool, &do.sysProcesses, do.ServerID)
if err != nil {
return err
}
Expand Down
3 changes: 3 additions & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,13 +234,16 @@ func TestShow(t *testing.T) {
tk.MustQuery("show create database test_show").Check(testkit.Rows("test_show CREATE DATABASE `test_show` /*!40100 DEFAULT CHARACTER SET utf8mb4 */"))
tk.MustQuery("show privileges").Check(testkit.Rows("Alter Tables To alter the table",
"Alter routine Functions,Procedures To alter or drop stored functions/procedures",
"Config Server Admin To use SHOW CONFIG and SET CONFIG statements",
"Create Databases,Tables,Indexes To create new databases and tables",
"Create routine Databases To use CREATE FUNCTION/PROCEDURE",
"Create role Server Admin To create new roles",
"Create temporary tables Databases To use CREATE TEMPORARY TABLE",
"Create view Tables To create new views",
"Create user Server Admin To create new users",
"Delete Tables To delete existing rows",
"Drop Databases,Tables To drop databases, tables, and views",
"Drop role Server Admin To drop roles",
"Event Server Admin To create, alter, drop and execute events",
"Execute Functions,Procedures To execute stored routines",
"File File access on server To read and write files on the server",
Expand Down
19 changes: 19 additions & 0 deletions executor/seqtest/prepared_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -787,3 +787,22 @@ func TestLimitUnsupportedCase(t *testing.T) {
tk.MustExec("set @a = 1_2")
tk.MustGetErrMsg("execute stmt using @a", "[planner:1210]Incorrect arguments to LIMIT")
}

func TestIssue38323(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(id int, k int);")

tk.MustExec("prepare stmt from 'explain select * from t where id = ? and k = ? group by id, k';")
tk.MustQuery("show warnings").Check(testkit.Rows("Warning 1105 skip plan-cache: not a SELECT/UPDATE/INSERT/DELETE/SET statement"))
tk.MustExec("set @a = 1;")
tk.MustExec("execute stmt using @a, @a")
tk.MustQuery("execute stmt using @a, @a").Check(tk.MustQuery("explain select * from t where id = 1 and k = 1 group by id, k").Rows())

tk.MustExec("prepare stmt from 'explain select * from t where ? = id and ? = k group by id, k';")
tk.MustQuery("show warnings").Check(testkit.Rows("Warning 1105 skip plan-cache: not a SELECT/UPDATE/INSERT/DELETE/SET statement"))
tk.MustExec("set @a = 1;")
tk.MustQuery("execute stmt using @a, @a").Check(tk.MustQuery("explain select * from t where 1 = id and 1 = k group by id, k").Rows())
}
3 changes: 3 additions & 0 deletions executor/show.go
Original file line number Diff line number Diff line change
Expand Up @@ -1696,13 +1696,16 @@ func (e *ShowExec) fetchShowGrants() error {
func (e *ShowExec) fetchShowPrivileges() error {
e.appendRow([]interface{}{"Alter", "Tables", "To alter the table"})
e.appendRow([]interface{}{"Alter routine", "Functions,Procedures", "To alter or drop stored functions/procedures"})
e.appendRow([]interface{}{"Config", "Server Admin", "To use SHOW CONFIG and SET CONFIG statements"})
e.appendRow([]interface{}{"Create", "Databases,Tables,Indexes", "To create new databases and tables"})
e.appendRow([]interface{}{"Create routine", "Databases", "To use CREATE FUNCTION/PROCEDURE"})
e.appendRow([]interface{}{"Create role", "Server Admin", "To create new roles"})
e.appendRow([]interface{}{"Create temporary tables", "Databases", "To use CREATE TEMPORARY TABLE"})
e.appendRow([]interface{}{"Create view", "Tables", "To create new views"})
e.appendRow([]interface{}{"Create user", "Server Admin", "To create new users"})
e.appendRow([]interface{}{"Delete", "Tables", "To delete existing rows"})
e.appendRow([]interface{}{"Drop", "Databases,Tables", "To drop databases, tables, and views"})
e.appendRow([]interface{}{"Drop role", "Server Admin", "To drop roles"})
e.appendRow([]interface{}{"Event", "Server Admin", "To create, alter, drop and execute events"})
e.appendRow([]interface{}{"Execute", "Functions,Procedures", "To execute stored routines"})
e.appendRow([]interface{}{"File", "File access on server", "To read and write files on the server"})
Expand Down
Loading

0 comments on commit cbd1855

Please sign in to comment.