Skip to content

Commit

Permalink
Merge branch 'master' into fix_batch_delete_binary_args
Browse files Browse the repository at this point in the history
  • Loading branch information
zhaoxinyu authored Mar 3, 2023
2 parents 4f3aaef + 4f4a2ba commit 48b588d
Show file tree
Hide file tree
Showing 11 changed files with 390 additions and 179 deletions.
8 changes: 0 additions & 8 deletions cdc/model/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,14 +437,6 @@ func (info *ChangeFeedInfo) updateSinkURIAndConfigProtocol(uri *url.URL, newProt
info.Config.Sink.Protocol = newProtocol
}

// HasFastFailError returns true if the error in changefeed is fast-fail
func (info *ChangeFeedInfo) HasFastFailError() bool {
if info.Error == nil {
return false
}
return cerror.IsChangefeedFastFailErrorCode(errors.RFCErrorCode(info.Error.Code))
}

func (info *ChangeFeedInfo) fixMemoryQuota() {
info.Config.FixMemoryQuota()
}
Expand Down
42 changes: 0 additions & 42 deletions cdc/model/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,45 +282,3 @@ type ProcInfoSnap struct {
CfID ChangeFeedID `json:"changefeed-id"`
CaptureID string `json:"capture-id"`
}

// TableSet maintains a set of TableID.
type TableSet struct {
memo map[TableID]struct{}
}

// NewTableSet creates a TableSet.
func NewTableSet() *TableSet {
return &TableSet{
memo: make(map[TableID]struct{}),
}
}

// Add adds a tableID to TableSet.
func (s *TableSet) Add(tableID TableID) {
s.memo[tableID] = struct{}{}
}

// Remove removes a tableID from a TableSet.
func (s *TableSet) Remove(tableID TableID) {
delete(s.memo, tableID)
}

// Keys returns a collection of TableID.
func (s *TableSet) Keys() []TableID {
result := make([]TableID, 0, len(s.memo))
for k := range s.memo {
result = append(result, k)
}
return result
}

// Contain checks whether a TableID is in TableSet.
func (s *TableSet) Contain(tableID TableID) bool {
_, ok := s.memo[tableID]
return ok
}

// Size returns the size of TableSet.
func (s *TableSet) Size() int {
return len(s.memo)
}
42 changes: 0 additions & 42 deletions cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,48 +348,6 @@ func (r *RowChangedEvent) PrimaryKeyColumnNames() []string {
return result
}

// PrimaryKeyColumns returns the column(s) corresponding to the handle key(s)
func (r *RowChangedEvent) PrimaryKeyColumns() []*Column {
pkeyCols := make([]*Column, 0)

var cols []*Column
if r.IsDelete() {
cols = r.PreColumns
} else {
cols = r.Columns
}

for _, col := range cols {
if col != nil && (col.Flag.IsPrimaryKey()) {
pkeyCols = append(pkeyCols, col)
}
}

// It is okay not to have primary keys, so the empty array is an acceptable result
return pkeyCols
}

// HandleKeyColumns returns the column(s) corresponding to the handle key(s)
func (r *RowChangedEvent) HandleKeyColumns() []*Column {
pkeyCols := make([]*Column, 0)

var cols []*Column
if r.IsDelete() {
cols = r.PreColumns
} else {
cols = r.Columns
}

for _, col := range cols {
if col != nil && col.Flag.IsHandleKey() {
pkeyCols = append(pkeyCols, col)
}
}

// It is okay not to have handle keys, so the empty array is an acceptable result
return pkeyCols
}

// HandleKeyColInfos returns the column(s) and colInfo(s) corresponding to the handle key(s)
func (r *RowChangedEvent) HandleKeyColInfos() ([]*Column, []rowcodec.ColInfo) {
pkeyCols := make([]*Column, 0)
Expand Down
58 changes: 0 additions & 58 deletions cdc/model/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,65 +102,7 @@ func TestRowChangedEventFuncs(t *testing.T) {
},
},
}
expectedKeyCols := []*Column{
{
Name: "a",
Value: 1,
Flag: HandleKeyFlag | PrimaryKeyFlag,
},
}
require.True(t, deleteRow.IsDelete())
require.Equal(t, expectedKeyCols, deleteRow.PrimaryKeyColumns())
require.Equal(t, expectedKeyCols, deleteRow.HandleKeyColumns())

insertRow := &RowChangedEvent{
Table: &TableName{
Schema: "test",
Table: "t1",
},
Columns: []*Column{
{
Name: "a",
Value: 1,
Flag: HandleKeyFlag,
}, {
Name: "b",
Value: 2,
Flag: 0,
},
},
}
expectedPrimaryKeyCols := []*Column{}
expectedHandleKeyCols := []*Column{
{
Name: "a",
Value: 1,
Flag: HandleKeyFlag,
},
}
require.False(t, insertRow.IsDelete())
require.Equal(t, expectedPrimaryKeyCols, insertRow.PrimaryKeyColumns())
require.Equal(t, expectedHandleKeyCols, insertRow.HandleKeyColumns())

forceReplicaRow := &RowChangedEvent{
Table: &TableName{
Schema: "test",
Table: "t1",
},
Columns: []*Column{
{
Name: "a",
Value: 1,
Flag: 0,
}, {
Name: "b",
Value: 2,
Flag: 0,
},
},
}
require.Empty(t, forceReplicaRow.PrimaryKeyColumns())
require.Empty(t, forceReplicaRow.HandleKeyColumns())
}

func TestColumnValueString(t *testing.T) {
Expand Down
24 changes: 12 additions & 12 deletions cdc/owner/feed_state_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,18 +358,18 @@ func (m *feedStateManager) patchState(feedState model.FeedState) {
if info.AdminJobType != adminJobType {
info.AdminJobType = adminJobType
changed = true
}
if updateEpoch {
previous := info.Epoch
ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second)
defer cancel()
info.Epoch = GenerateChangefeedEpoch(ctx, m.upstream.PDClient)
changed = true
log.Info("update changefeed epoch",
zap.String("namespace", m.state.ID.Namespace),
zap.String("changefeed", m.state.ID.ID),
zap.Uint64("perviousEpoch", previous),
zap.Uint64("currentEpoch", info.Epoch))

if updateEpoch {
previous := info.Epoch
ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second)
defer cancel()
info.Epoch = GenerateChangefeedEpoch(ctx, m.upstream.PDClient)
log.Info("update changefeed epoch",
zap.String("namespace", m.state.ID.Namespace),
zap.String("changefeed", m.state.ID.ID),
zap.Uint64("perviousEpoch", previous),
zap.Uint64("currentEpoch", info.Epoch))
}
}
return info, changed, nil
})
Expand Down
58 changes: 58 additions & 0 deletions cdc/owner/feed_state_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,14 @@ import (

type mockPD struct {
pd.Client

getTs func() (int64, int64, error)
}

func (p *mockPD) GetTS(_ context.Context) (int64, int64, error) {
if p.getTs != nil {
return p.getTs()
}
return 1, 2, nil
}

Expand Down Expand Up @@ -638,3 +643,56 @@ func TestBackoffNeverStops(t *testing.T) {
tester.MustApplyPatches()
}
}

func TestUpdateChangefeedEpoch(t *testing.T) {
ctx := cdcContext.NewBackendContext4Test(true)
// Set a long backoff time
manager := newFeedStateManager4Test(time.Hour, time.Hour, 0, 1.0)
state := orchestrator.NewChangefeedReactorState(etcd.DefaultCDCClusterID,
ctx.ChangefeedVars().ID)
tester := orchestrator.NewReactorStateTester(t, state, nil)
state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) {
require.Nil(t, info)
return &model.ChangeFeedInfo{SinkURI: "123", Config: &config.ReplicaConfig{}}, true, nil
})
state.PatchStatus(func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) {
require.Nil(t, status)
return &model.ChangeFeedStatus{}, true, nil
})

tester.MustApplyPatches()
manager.Tick(state)
tester.MustApplyPatches()
require.Equal(t, state.Info.State, model.StateNormal)
require.True(t, manager.ShouldRunning())

for i := 1; i <= 30; i++ {
manager.upstream.PDClient.(*mockPD).getTs = func() (int64, int64, error) {
return int64(i), 0, nil
}
previousEpoch := state.Info.Epoch
previousState := state.Info.State
state.PatchTaskPosition(ctx.GlobalVars().CaptureInfo.ID,
func(position *model.TaskPosition) (*model.TaskPosition, bool, error) {
return &model.TaskPosition{Error: &model.RunningError{
Addr: ctx.GlobalVars().CaptureInfo.AdvertiseAddr,
Code: "[CDC:ErrEtcdSessionDone]",
Message: "fake error for test",
}}, true, nil
})
tester.MustApplyPatches()
manager.Tick(state)
tester.MustApplyPatches()
require.False(t, manager.ShouldRunning())
require.Equal(t, state.Info.State, model.StateError)
require.Equal(t, state.Info.AdminJobType, model.AdminStop)
require.Equal(t, state.Status.AdminJobType, model.AdminStop)

// Epoch only changes when State changes.
if previousState == state.Info.State {
require.Equal(t, previousEpoch, state.Info.Epoch)
} else {
require.NotEqual(t, previousEpoch, state.Info.Epoch)
}
}
}
7 changes: 0 additions & 7 deletions cdc/redo/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,8 @@ import (
"testing"

"github.com/pingcap/tiflow/pkg/leakutil"
"github.com/pingcap/tiflow/pkg/redo"
)

func TestMain(m *testing.M) {
originValue := redo.DefaultGCIntervalInMs
redo.DefaultGCIntervalInMs = 1
defer func() {
redo.DefaultGCIntervalInMs = originValue
}()

leakutil.SetUpLeakTest(m)
}
23 changes: 15 additions & 8 deletions cdc/redo/meta_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ func (m *metaManager) flush(ctx context.Context, meta common.LogMeta) error {
if err != nil {
return errors.WrapError(errors.ErrMarshalFailed, err)
}
metaFile := m.getMetafileName()
metaFile := getMetafileName(m.captureID, m.changeFeedID, m.uuidGenerator)
if err := m.extStorage.WriteFile(ctx, metaFile, data); err != nil {
return errors.WrapError(errors.ErrExternalStorageAPI, err)
}
Expand All @@ -391,12 +391,6 @@ func (m *metaManager) flush(ctx context.Context, meta common.LogMeta) error {
return nil
}

func (m *metaManager) getMetafileName() string {
return fmt.Sprintf(redo.RedoMetaFileFormat, m.captureID,
m.changeFeedID.Namespace, m.changeFeedID.ID,
redo.RedoMetaFileType, m.uuidGenerator.NewString(), redo.MetaEXT)
}

// Cleanup removes all redo logs of this manager, it is called when changefeed is removed
// only owner should call this method.
func (m *metaManager) Cleanup(ctx context.Context) error {
Expand Down Expand Up @@ -438,6 +432,8 @@ func (m *metaManager) bgFlushMeta(egCtx context.Context, flushIntervalInMs int64
func (m *metaManager) bgGC(egCtx context.Context) error {
ticker := time.NewTicker(time.Duration(redo.DefaultGCIntervalInMs) * time.Millisecond)
defer ticker.Stop()

preCkpt := uint64(0)
for {
select {
case <-egCtx.Done():
Expand All @@ -447,9 +443,10 @@ func (m *metaManager) bgGC(egCtx context.Context) error {
return errors.Trace(egCtx.Err())
case <-ticker.C:
ckpt := m.metaCheckpointTs.getFlushed()
if ckpt == 0 {
if ckpt == preCkpt {
continue
}
preCkpt = ckpt
log.Debug("redo manager GC is triggered",
zap.Uint64("checkpointTs", ckpt),
zap.String("namespace", m.changeFeedID.Namespace),
Expand All @@ -467,6 +464,16 @@ func (m *metaManager) bgGC(egCtx context.Context) error {
}
}

func getMetafileName(
captureID model.CaptureID,
changeFeedID model.ChangeFeedID,
uuidGenerator uuid.Generator,
) string {
return fmt.Sprintf(redo.RedoMetaFileFormat, captureID,
changeFeedID.Namespace, changeFeedID.ID,
redo.RedoMetaFileType, uuidGenerator.NewString(), redo.MetaEXT)
}

func getChangefeedMatcher(changeFeedID model.ChangeFeedID) string {
if changeFeedID.Namespace == "default" {
return fmt.Sprintf("_%s_", changeFeedID.ID)
Expand Down
Loading

0 comments on commit 48b588d

Please sign in to comment.