Skip to content

Commit

Permalink
log-backup: remove k-v record in etcd when stop log-backup task (#40431
Browse files Browse the repository at this point in the history
…) (#40590)

close #40403
  • Loading branch information
ti-chi-bot authored Feb 15, 2023
1 parent ba22f43 commit 045afd2
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 91 deletions.
4 changes: 2 additions & 2 deletions br/pkg/streamhelper/advancer_cliext.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ func (t AdvancerExt) Begin(ctx context.Context, ch chan<- TaskEvent) error {
return nil
}

func (t AdvancerExt) getGlobalCheckpointForTask(ctx context.Context, taskName string) (uint64, error) {
func (t AdvancerExt) GetGlobalCheckpointForTask(ctx context.Context, taskName string) (uint64, error) {
key := GlobalCheckpointOf(taskName)
resp, err := t.KV.Get(ctx, key)
if err != nil {
Expand All @@ -229,7 +229,7 @@ func (t AdvancerExt) getGlobalCheckpointForTask(ctx context.Context, taskName st
func (t AdvancerExt) UploadV3GlobalCheckpointForTask(ctx context.Context, taskName string, checkpoint uint64) error {
key := GlobalCheckpointOf(taskName)
value := string(encodeUint64(checkpoint))
oldValue, err := t.getGlobalCheckpointForTask(ctx, taskName)
oldValue, err := t.GetGlobalCheckpointForTask(ctx, taskName)
if err != nil {
return err
}
Expand Down
34 changes: 2 additions & 32 deletions br/pkg/streamhelper/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,8 @@ func (c *MetaDataClient) DeleteTask(ctx context.Context, taskName string) error
clientv3.OpDelete(CheckPointsOf(taskName), clientv3.WithPrefix()),
clientv3.OpDelete(Pause(taskName)),
clientv3.OpDelete(LastErrorPrefixOf(taskName), clientv3.WithPrefix()),
clientv3.OpDelete(GlobalCheckpointOf(taskName)),
clientv3.OpDelete(StorageCheckpointOf(taskName), clientv3.WithPrefix()),
).
Commit()
if err != nil {
Expand Down Expand Up @@ -372,28 +374,6 @@ func (t *Task) GetStorageCheckpoint(ctx context.Context) (uint64, error) {
return storageCheckpoint, nil
}

// MinNextBackupTS query the all next backup ts of a store, returning the minimal next backup ts of the store.
func (t *Task) MinNextBackupTS(ctx context.Context, store uint64) (uint64, error) {
key := CheckPointOf(t.Info.Name, store)
resp, err := t.cli.KV.Get(ctx, key)
if err != nil {
return 0, errors.Annotatef(err, "failed to get checkpoints of %s", t.Info.Name)
}
if resp.Count != 1 {
return 0, nil
}
kv := resp.Kvs[0]
if len(kv.Value) != 8 {
return 0, errors.Annotatef(berrors.ErrPiTRMalformedMetadata,
"the next backup ts of store %d isn't 64bits (it is %d bytes, value = %s)",
store,
len(kv.Value),
redact.Key(kv.Value))
}
nextBackupTS := binary.BigEndian.Uint64(kv.Value)
return nextBackupTS, nil
}

// GetGlobalCheckPointTS gets the global checkpoint timestamp according to log task.
func (t *Task) GetGlobalCheckPointTS(ctx context.Context) (uint64, error) {
checkPointMap, err := t.NextBackupTSList(ctx)
Expand Down Expand Up @@ -422,16 +402,6 @@ func (t *Task) GetGlobalCheckPointTS(ctx context.Context) (uint64, error) {
return mathutil.Max(checkpoint, ts), nil
}

// Step forwards the progress (next_backup_ts) of some region.
// The task should be done by TiKV. This function should only be used for test cases.
func (t *Task) Step(ctx context.Context, store uint64, ts uint64) error {
_, err := t.cli.KV.Put(ctx, CheckPointOf(t.Info.Name, store), string(encodeUint64(ts)))
if err != nil {
return errors.Annotatef(err, "failed forward the progress of %s to %d", t.Info.Name, ts)
}
return nil
}

func (t *Task) UploadGlobalCheckpoint(ctx context.Context, ts uint64) error {
_, err := t.cli.KV.Put(ctx, GlobalCheckpointOf(t.Info.Name), string(encodeUint64(ts)))
if err != nil {
Expand Down
122 changes: 81 additions & 41 deletions br/pkg/streamhelper/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
backup "github.com/pingcap/kvproto/pkg/brpb"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/log"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/logutil"
Expand Down Expand Up @@ -140,11 +140,11 @@ func TestIntegration(t *testing.T) {
defer etcd.Server.Stop()
metaCli := streamhelper.MetaDataClient{Client: cli}
t.Run("TestBasic", func(t *testing.T) { testBasic(t, metaCli, etcd) })
t.Run("TestForwardProgress", func(t *testing.T) { testForwardProgress(t, metaCli, etcd) })
t.Run("testGetStorageCheckpoint", func(t *testing.T) { testGetStorageCheckpoint(t, metaCli) })
t.Run("testGetGlobalCheckPointTS", func(t *testing.T) { testGetGlobalCheckPointTS(t, metaCli) })
t.Run("TestStreamListening", func(t *testing.T) { testStreamListening(t, streamhelper.AdvancerExt{MetaDataClient: metaCli}) })
t.Run("TestStreamCheckpoint", func(t *testing.T) { testStreamCheckpoint(t, streamhelper.AdvancerExt{MetaDataClient: metaCli}) })
t.Run("testStoptask", func(t *testing.T) { testStoptask(t, streamhelper.AdvancerExt{MetaDataClient: metaCli}) })
t.Run("TestStreamClose", func(t *testing.T) { testStreamClose(t, streamhelper.AdvancerExt{MetaDataClient: metaCli}) })
}

Expand Down Expand Up @@ -213,31 +213,6 @@ func testBasic(t *testing.T, metaCli streamhelper.MetaDataClient, etcd *embed.Et
rangeIsEmpty(t, []byte(streamhelper.RangesOf(taskName)), etcd)
}

func testForwardProgress(t *testing.T, metaCli streamhelper.MetaDataClient, etcd *embed.Etcd) {
ctx := context.Background()
taskName := "many_tables"
taskInfo := simpleTask(taskName, 65)
defer func() {
require.NoError(t, metaCli.DeleteTask(ctx, taskName))
}()

require.NoError(t, metaCli.PutTask(ctx, taskInfo))
task, err := metaCli.GetTask(ctx, taskName)
require.NoError(t, err)
require.NoError(t, task.Step(ctx, 1, 41))
require.NoError(t, task.Step(ctx, 1, 42))
require.NoError(t, task.Step(ctx, 2, 40))
rs, err := task.Ranges(ctx)
require.NoError(t, err)
require.Equal(t, simpleRanges(65), rs)
store1Checkpoint, err := task.MinNextBackupTS(ctx, 1)
require.NoError(t, err)
require.Equal(t, store1Checkpoint, uint64(42))
store2Checkpoint, err := task.MinNextBackupTS(ctx, 2)
require.NoError(t, err)
require.Equal(t, store2Checkpoint, uint64(40))
}

func testGetStorageCheckpoint(t *testing.T, metaCli streamhelper.MetaDataClient) {
var (
taskName = "my_task"
Expand Down Expand Up @@ -301,7 +276,7 @@ func testGetGlobalCheckPointTS(t *testing.T, metaCli streamhelper.MetaDataClient
require.NoError(t, err)
}

task := streamhelper.NewTask(&metaCli, backup.StreamBackupTaskInfo{Name: taskName})
task := streamhelper.NewTask(&metaCli, backuppb.StreamBackupTaskInfo{Name: taskName})
task.UploadGlobalCheckpoint(ctx, 1003)

globalTS, err := task.GetGlobalCheckPointTS(ctx)
Expand Down Expand Up @@ -383,21 +358,86 @@ func testStreamCheckpoint(t *testing.T, metaCli streamhelper.AdvancerExt) {
ctx := context.Background()
task := "simple"
req := require.New(t)
getCheckpoint := func() uint64 {
resp, err := metaCli.KV.Get(ctx, streamhelper.GlobalCheckpointOf(task))
req.NoError(err)
if len(resp.Kvs) == 0 {
return 0
}
req.Len(resp.Kvs, 1)
return binary.BigEndian.Uint64(resp.Kvs[0].Value)
}

req.NoError(metaCli.UploadV3GlobalCheckpointForTask(ctx, task, 5))
req.EqualValues(5, getCheckpoint())
ts, err := metaCli.GetGlobalCheckpointForTask(ctx, task)
req.NoError(err)
req.EqualValues(5, ts)
req.NoError(metaCli.UploadV3GlobalCheckpointForTask(ctx, task, 18))
req.EqualValues(18, getCheckpoint())
ts, err = metaCli.GetGlobalCheckpointForTask(ctx, task)
req.NoError(err)
req.EqualValues(18, ts)
req.NoError(metaCli.UploadV3GlobalCheckpointForTask(ctx, task, 16))
req.EqualValues(18, getCheckpoint())
ts, err = metaCli.GetGlobalCheckpointForTask(ctx, task)
req.NoError(err)
req.EqualValues(18, ts)
req.NoError(metaCli.ClearV3GlobalCheckpointForTask(ctx, task))
req.EqualValues(0, getCheckpoint())
ts, err = metaCli.GetGlobalCheckpointForTask(ctx, task)
req.NoError(err)
req.EqualValues(0, ts)
}

func testStoptask(t *testing.T, metaCli streamhelper.AdvancerExt) {
var (
ctx = context.Background()
taskName = "stop_task"
req = require.New(t)
taskInfo = streamhelper.TaskInfo{
PBInfo: backuppb.StreamBackupTaskInfo{
Name: taskName,
StartTs: 0,
},
}
storeID = "5"
storageCheckpoint = make([]byte, 8)
)

// put task
req.NoError(metaCli.PutTask(ctx, taskInfo))
t2, err := metaCli.GetTask(ctx, taskName)
req.NoError(err)
req.EqualValues(taskInfo.PBInfo.Name, t2.Info.Name)

// upload global checkpoint
req.NoError(metaCli.UploadV3GlobalCheckpointForTask(ctx, taskName, 100))
ts, err := metaCli.GetGlobalCheckpointForTask(ctx, taskName)
req.NoError(err)
req.EqualValues(100, ts)

//upload storage checkpoint
key := path.Join(streamhelper.StorageCheckpointOf(taskName), storeID)
binary.BigEndian.PutUint64(storageCheckpoint, 90)
_, err = metaCli.Put(ctx, key, string(storageCheckpoint))
req.NoError(err)

task := streamhelper.NewTask(&metaCli.MetaDataClient, taskInfo.PBInfo)
ts, err = task.GetStorageCheckpoint(ctx)
req.NoError(err)
req.EqualValues(ts, 90)

// pause task
req.NoError(metaCli.PauseTask(ctx, taskName))
resp, err := metaCli.KV.Get(ctx, streamhelper.Pause(taskName))
req.NoError(err)
req.EqualValues(1, len(resp.Kvs))

// stop task
err = metaCli.DeleteTask(ctx, taskName)
req.NoError(err)

// check task and other meta infomations not existed
_, err = metaCli.GetTask(ctx, taskName)
req.Error(err)

ts, err = task.GetStorageCheckpoint(ctx)
req.NoError(err)
req.EqualValues(ts, 0)

ts, err = metaCli.GetGlobalCheckpointForTask(ctx, taskName)
req.NoError(err)
req.EqualValues(0, ts)

resp, err = metaCli.KV.Get(ctx, streamhelper.Pause(taskName))
req.NoError(err)
req.EqualValues(0, len(resp.Kvs))
}
18 changes: 2 additions & 16 deletions br/pkg/streamhelper/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,6 @@ func RangeKeyOf(name string, startKey []byte) string {
return RangesOf(name) + string(startKey)
}

func writeUint64(buf *bytes.Buffer, num uint64) {
items := [8]byte{}
binary.BigEndian.PutUint64(items[:], num)
buf.Write(items[:])
}

func encodeUint64(num uint64) []byte {
items := [8]byte{}
binary.BigEndian.PutUint64(items[:], num)
Expand All @@ -83,25 +77,17 @@ func CheckPointsOf(task string) string {
}

// GlobalCheckpointOf returns the path to the "global" checkpoint of some task.
// Normally it would be <prefix>/checkpoint/<task-name>/central_globa.
func GlobalCheckpointOf(task string) string {
return path.Join(streamKeyPrefix, taskCheckpointPath, task, checkpointTypeGlobal)
}

// StorageCheckpointOf get the prefix path of the `storage checkpoint status` of a task.
// Normally it would be <prefix>/storage-checkpoint/<task>.
func StorageCheckpointOf(task string) string {
return path.Join(streamKeyPrefix, storageCheckPoint, task)
}

// CheckpointOf returns the checkpoint prefix of some store.
// Normally it would be <prefix>/checkpoint/<task-name>/<store-id(binary-u64)>.
func CheckPointOf(task string, store uint64) string {
buf := bytes.NewBuffer(nil)
buf.WriteString(strings.TrimSuffix(path.Join(streamKeyPrefix, taskCheckpointPath, task), "/"))
buf.WriteRune('/')
writeUint64(buf, store)
return buf.String()
}

// Pause returns the path for pausing the task.
// Normally it would be <prefix>/pause/<task-name>.
func Pause(task string) string {
Expand Down

0 comments on commit 045afd2

Please sign in to comment.