Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

redo(ticdc): use uuid in s3 log file to avoid name conflict #5595

Merged
merged 5 commits into from
May 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 25 additions & 12 deletions cdc/redo/common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,15 @@ import (
cerror "github.com/pingcap/tiflow/pkg/errors"
)

const (
// RedoLogFileFormatV1 was used before v6.1.0, which doesn't contain namespace information
// layout: captureID_changefeedID_fileType_maxEventCommitTs_uuid.fileExtName
RedoLogFileFormatV1 = "%s_%s_%s_%d_%s%s"
// RedoLogFileFormatV2 is available since v6.1.0, which contains namespace information
// layout: captureID_namespace_changefeedID_fileType_maxEventCommitTs_uuid.fileExtName
RedoLogFileFormatV2 = "%s_%s_%s_%s_%d_%s%s"
)

// InitS3storage init a storage used for s3,
// s3URI should be like s3URI="s3://logbucket/test-changefeed?endpoint=http://$S3_ENDPOINT/"
var InitS3storage = func(ctx context.Context, uri url.URL) (storage.ExternalStorage, error) {
Expand Down Expand Up @@ -57,6 +66,13 @@ var InitS3storage = func(ctx context.Context, uri url.URL) (storage.ExternalStor
return s3storage, nil
}

// logFormat2ParseFormat converts redo log file name format to the space separated
// format, which can be read and parsed by sscanf. Besides remove the suffix `%s`
// which is used as file name extension, since we will parse extension first.
func logFormat2ParseFormat(fmtStr string) string {
return strings.TrimSuffix(strings.ReplaceAll(fmtStr, "_", " "), "%s")
}

// ParseLogFileName extract the commitTs, fileType from log fileName
func ParseLogFileName(name string) (uint64, string, error) {
ext := filepath.Ext(name)
Expand All @@ -67,7 +83,7 @@ func ParseLogFileName(name string) (uint64, string, error) {
// if .sort, the name should be like
// fmt.Sprintf("%s_%s_%s_%d_%s_%d%s", w.cfg.captureID,
// w.cfg.changeFeedID.Namespace,w.cfg.changeFeedID.ID,
// w.cfg.createTime.Unix(), w.cfg.fileType, w.commitTS.Load(), LogEXT)+SortLogEXT
// w.cfg.fileType, w.commitTS.Load(), uuid, LogEXT)+SortLogEXT
if ext == SortLogEXT {
name = strings.TrimSuffix(name, SortLogEXT)
ext = filepath.Ext(name)
Expand All @@ -76,31 +92,28 @@ func ParseLogFileName(name string) (uint64, string, error) {
return 0, "", nil
}

var commitTs, d1 uint64
var s1, namespace, s2, fileType string
var commitTs uint64
var s1, namespace, s2, fileType, uid string
// if the namespace is not default, the log looks like:
// fmt.Sprintf("%s_%s_%s_%d_%s_%d%s", w.cfg.captureID,
// w.cfg.changeFeedID.Namespace,w.cfg.changeFeedID.ID,
// w.cfg.createTime.Unix(), w.cfg.fileType, w.commitTS.Load(), redo.LogEXT)
// w.cfg.fileType, w.commitTS.Load(), uuid, redo.LogEXT)
// otherwise it looks like:
// fmt.Sprintf("%s_%s_%d_%s_%d%s", w.cfg.captureID,
// w.cfg.changeFeedID.ID,
// w.cfg.createTime.Unix(), w.cfg.fileType, w.commitTS.Load(), redo.LogEXT)
// w.cfg.fileType, w.commitTS.Load(), uuid, redo.LogEXT)
var (
vars []any
formatStr string
)
if len(strings.Split(name, "_")) == 6 {
formatStr = "%s %s %s %d %s %d" + LogEXT
vars = []any{&s1, &namespace, &s2, &d1, &fileType, &commitTs}
formatStr = logFormat2ParseFormat(RedoLogFileFormatV2)
vars = []any{&s1, &namespace, &s2, &fileType, &commitTs, &uid}
} else {
formatStr = "%s %s %d %s %d" + LogEXT
vars = []any{&s1, &s2, &d1, &fileType, &commitTs}
formatStr = logFormat2ParseFormat(RedoLogFileFormatV1)
vars = []any{&s1, &s2, &fileType, &commitTs, &uid}
}
name = strings.ReplaceAll(name, "_", " ")
if ext == TmpEXT {
formatStr += TmpEXT
}
_, err := fmt.Sscanf(name, formatStr, vars...)
if err != nil {
return 0, "", errors.Annotatef(err, "bad log name: %s", name)
Expand Down
49 changes: 24 additions & 25 deletions cdc/redo/common/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,15 @@ package common
import (
"fmt"
"testing"
"time"

"github.com/google/uuid"
"github.com/stretchr/testify/require"
)

func TestParseLogFileName(t *testing.T) {
type arg struct {
name string
}
// the log looks like: fmt.Sprintf("%s_%s_%d_%s_%d%s", w.cfg.captureID, w.cfg.changeFeedID, w.cfg.createTime.Unix(), w.cfg.fileType, w.commitTS.Load(), redo.LogEXT)
tests := []struct {
name string
args arg
Expand All @@ -36,99 +35,99 @@ func TestParseLogFileName(t *testing.T) {
{
name: "happy row .log",
args: arg{
name: fmt.Sprintf("%s_%s_%d_%s_%d%s", "cp",
name: fmt.Sprintf(RedoLogFileFormatV1, "cp",
"test",
time.Now().Unix(), DefaultRowLogFileType, 1, LogEXT),
DefaultRowLogFileType, 1, uuid.NewString(), LogEXT),
},
wantTs: 1,
wantFileType: DefaultRowLogFileType,
},
{
name: "happy row .log",
args: arg{
name: fmt.Sprintf("%s_%s_%s_%d_%s_%d%s", "cp",
name: fmt.Sprintf(RedoLogFileFormatV2, "cp",
"namespace", "test",
time.Now().Unix(), DefaultRowLogFileType, 1, LogEXT),
DefaultRowLogFileType, 1, uuid.NewString(), LogEXT),
},
wantTs: 1,
wantFileType: DefaultRowLogFileType,
},
{
name: "happy row .tmp",
args: arg{
name: fmt.Sprintf("%s_%s_%d_%s_%d%s", "cp",
name: fmt.Sprintf(RedoLogFileFormatV1, "cp",
"test",
time.Now().Unix(), DefaultRowLogFileType, 1, LogEXT) + TmpEXT,
DefaultRowLogFileType, 1, uuid.NewString(), LogEXT) + TmpEXT,
},
wantTs: 1,
wantFileType: DefaultRowLogFileType,
},
{
name: "happy row .tmp",
args: arg{
name: fmt.Sprintf("%s_%s_%s_%d_%s_%d%s", "cp",
name: fmt.Sprintf(RedoLogFileFormatV2, "cp",
"namespace", "test",
time.Now().Unix(), DefaultRowLogFileType, 1, LogEXT) + TmpEXT,
DefaultRowLogFileType, 1, uuid.NewString(), LogEXT) + TmpEXT,
},
wantTs: 1,
wantFileType: DefaultRowLogFileType,
},
{
name: "happy ddl .log",
args: arg{
name: fmt.Sprintf("%s_%s_%d_%s_%d%s", "cp",
name: fmt.Sprintf(RedoLogFileFormatV1, "cp",
"test",
time.Now().Unix(), DefaultDDLLogFileType, 1, LogEXT),
DefaultDDLLogFileType, 1, uuid.NewString(), LogEXT),
},
wantTs: 1,
wantFileType: DefaultDDLLogFileType,
},
{
name: "happy ddl .log",
args: arg{
name: fmt.Sprintf("%s_%s_%s_%d_%s_%d%s", "cp",
name: fmt.Sprintf(RedoLogFileFormatV2, "cp",
"namespace", "test",
time.Now().Unix(), DefaultDDLLogFileType, 1, LogEXT),
DefaultDDLLogFileType, 1, uuid.NewString(), LogEXT),
},
wantTs: 1,
wantFileType: DefaultDDLLogFileType,
},
{
name: "happy ddl .sort",
args: arg{
name: fmt.Sprintf("%s_%s_%s_%d_%s_%d%s", "cp",
name: fmt.Sprintf(RedoLogFileFormatV2, "cp",
"default", "test",
time.Now().Unix(), DefaultDDLLogFileType, 1, LogEXT) + SortLogEXT,
DefaultDDLLogFileType, 1, uuid.NewString(), LogEXT) + SortLogEXT,
},
wantTs: 1,
wantFileType: DefaultDDLLogFileType,
},
{
name: "happy ddl .sort",
args: arg{
name: fmt.Sprintf("%s_%s_%s_%d_%s_%d%s", "cp",
name: fmt.Sprintf(RedoLogFileFormatV2, "cp",
"namespace", "test",
time.Now().Unix(), DefaultDDLLogFileType, 1, LogEXT) + SortLogEXT,
DefaultDDLLogFileType, 1, uuid.NewString(), LogEXT) + SortLogEXT,
},
wantTs: 1,
wantFileType: DefaultDDLLogFileType,
},
{
name: "happy ddl .tmp",
args: arg{
name: fmt.Sprintf("%s_%s_%d_%s_%d%s", "cp",
name: fmt.Sprintf(RedoLogFileFormatV1, "cp",
"test",
time.Now().Unix(), DefaultDDLLogFileType, 1, LogEXT) + TmpEXT,
DefaultDDLLogFileType, 1, uuid.NewString(), LogEXT) + TmpEXT,
},
wantTs: 1,
wantFileType: DefaultDDLLogFileType,
},
{
name: "happy ddl .tmp",
args: arg{
name: fmt.Sprintf("%s_%s_%s_%d_%s_%d%s", "cp",
name: fmt.Sprintf(RedoLogFileFormatV2, "cp",
"namespace", "test",
time.Now().Unix(), DefaultDDLLogFileType, 1, LogEXT) + TmpEXT,
DefaultDDLLogFileType, 1, uuid.NewString(), LogEXT) + TmpEXT,
},
wantTs: 1,
wantFileType: DefaultDDLLogFileType,
Expand All @@ -150,9 +149,9 @@ func TestParseLogFileName(t *testing.T) {
{
name: "err wrong format ddl .tmp",
args: arg{
name: fmt.Sprintf("%s_%s_%s_%d_%s%d%s", "cp",
"default", "test",
time.Now().Unix(), DefaultDDLLogFileType, 1, LogEXT) + TmpEXT,
name: fmt.Sprintf("%s_%s_%s_%s_%d%s%s", /* a wrong format */
"cp", "default", "test",
DefaultDDLLogFileType, 1, uuid.NewString(), LogEXT) + TmpEXT,
},
wantErr: ".*bad log name*.",
},
Expand Down
34 changes: 19 additions & 15 deletions cdc/redo/reader/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/tiflow/cdc/redo/common"
"github.com/pingcap/tiflow/cdc/redo/writer"
"github.com/pingcap/tiflow/pkg/leakutil"
"github.com/pingcap/tiflow/pkg/uuid"
"github.com/stretchr/testify/require"
"golang.org/x/net/context"
)
Expand Down Expand Up @@ -62,7 +63,10 @@ func TestReaderRead(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

w, err := writer.NewWriter(ctx, cfg)
uuidGen := uuid.NewConstGenerator("const-uuid")
w, err := writer.NewWriter(ctx, cfg,
writer.WithUUIDGenerator(func() uuid.Generator { return uuidGen }),
)
require.Nil(t, err)
log := &model.RedoLog{
RedoRow: &model.RedoRowChangedEvent{Row: &model.RowChangedEvent{CommitTs: 1123}},
Expand All @@ -75,9 +79,9 @@ func TestReaderRead(t *testing.T) {
err = w.Close()
require.Nil(t, err)
require.True(t, !w.IsRunning())
fileName := fmt.Sprintf("%s_%s_%d_%s_%d%s", cfg.CaptureID,
fileName := fmt.Sprintf(common.RedoLogFileFormatV1, cfg.CaptureID,
cfg.ChangeFeedID.ID,
cfg.CreateTime.Unix(), cfg.FileType, 11, common.LogEXT)
cfg.FileType, 11, uuidGen.NewString(), common.LogEXT)
path := filepath.Join(cfg.Dir, fileName)
info, err := os.Stat(path)
require.Nil(t, err)
Expand Down Expand Up @@ -110,9 +114,10 @@ func TestReaderOpenSelectedFiles(t *testing.T) {
MaxLogSize: 100000,
Dir: dir,
}
fileName := fmt.Sprintf("%s_%s_%s_%d_%s_%d%s", "cp",
"default", "test-cf",
time.Now().Unix(), common.DefaultDDLLogFileType, 11, common.LogEXT+common.TmpEXT)
uuidGen := uuid.NewGenerator()
fileName := fmt.Sprintf(common.RedoLogFileFormatV2, "cp",
"default", "test-cf", common.DefaultDDLLogFileType, 11,
uuidGen.NewString(), common.LogEXT+common.TmpEXT)
w, err := writer.NewWriter(ctx, cfg, writer.WithLogFileName(func() string {
return fileName
}))
Expand All @@ -138,27 +143,26 @@ func TestReaderOpenSelectedFiles(t *testing.T) {
require.Nil(t, err)

// no data, wil not open
fileName = fmt.Sprintf("%s_%s_%s_%d_%s_%d%s", "cp",
"default", "test-cf11",
time.Now().Unix(), common.DefaultDDLLogFileType, 10, common.LogEXT)
fileName = fmt.Sprintf(common.RedoLogFileFormatV2, "cp",
"default", "test-cf11", common.DefaultDDLLogFileType, 10,
uuidGen.NewString(), common.LogEXT)
path = filepath.Join(dir, fileName)
_, err = os.Create(path)
require.Nil(t, err)

// SortLogEXT, wil open
fileName = fmt.Sprintf("%s_%s_%s_%d_%s_%d%s", "cp",
"default", "test-cf111",
time.Now().Unix(), common.DefaultDDLLogFileType, 10, common.LogEXT) + common.SortLogEXT
fileName = fmt.Sprintf(common.RedoLogFileFormatV2, "cp", "default",
"test-cf111", common.DefaultDDLLogFileType, 10, uuidGen.NewString(),
common.LogEXT) + common.SortLogEXT
path = filepath.Join(dir, fileName)
f1, err := os.Create(path)
require.Nil(t, err)

dir1, err := ioutil.TempDir("", "redo-openSelectedFiles1")
require.Nil(t, err)
defer os.RemoveAll(dir1) //nolint:errcheck
fileName = fmt.Sprintf("%s_%s_%s_%d_%s_%d%s", "cp",
"default", "test-cf",
time.Now().Unix(), common.DefaultDDLLogFileType, 11, common.LogEXT+"test")
fileName = fmt.Sprintf(common.RedoLogFileFormatV2, "cp", "default", "test-cf",
common.DefaultDDLLogFileType, 11, uuidGen.NewString(), common.LogEXT+"test")
path = filepath.Join(dir1, fileName)
_, err = os.Create(path)
require.Nil(t, err)
Expand Down
9 changes: 5 additions & 4 deletions cdc/redo/reader/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"time"

"github.com/golang/mock/gomock"
"github.com/google/uuid"
"github.com/pingcap/errors"
mockstorage "github.com/pingcap/tidb/br/pkg/mock/storage"
"github.com/pingcap/tidb/br/pkg/storage"
Expand Down Expand Up @@ -84,9 +85,9 @@ func TestLogReaderResetReader(t *testing.T) {
MaxLogSize: 100000,
Dir: dir,
}
fileName := fmt.Sprintf("%s_%s_%s_%d_%s_%d%s", "cp",
fileName := fmt.Sprintf(common.RedoLogFileFormatV2, "cp",
"default", "test-cf100",
time.Now().Unix(), common.DefaultDDLLogFileType, 100, common.LogEXT)
common.DefaultDDLLogFileType, 100, uuid.NewString(), common.LogEXT)
w, err := writer.NewWriter(ctx, cfg, writer.WithLogFileName(func() string {
return fileName
}))
Expand All @@ -105,9 +106,9 @@ func TestLogReaderResetReader(t *testing.T) {
f, err := os.Open(path)
require.Nil(t, err)

fileName = fmt.Sprintf("%s_%s_%s_%d_%s_%d%s", "cp",
fileName = fmt.Sprintf(common.RedoLogFileFormatV2, "cp",
"default", "test-cf10",
time.Now().Unix(), common.DefaultRowLogFileType, 10, common.LogEXT)
common.DefaultRowLogFileType, 10, uuid.NewString(), common.LogEXT)
w, err = writer.NewWriter(ctx, cfg, writer.WithLogFileName(func() string {
return fileName
}))
Expand Down
Loading