Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

redo(ticdc): fix redo initialization block the owner (#9887) #9993

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 9 additions & 22 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,12 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]*
default:
}

if c.redoMetaMgr.Enabled() {
if !c.redoMetaMgr.Running() {
return nil
}
}

// TODO: pass table checkpointTs when we support concurrent process ddl
allPhysicalTables, barrier, err := c.ddlManager.tick(ctx, preCheckpointTs, nil)
if err != nil {
Expand Down Expand Up @@ -582,13 +588,7 @@ LOOP2:
}
c.observerLastTick = atomic.NewTime(time.Time{})

c.redoDDLMgr, err = redo.NewDDLManager(cancelCtx, c.id, c.state.Info.Config.Consistent, ddlStartTs)
failpoint.Inject("ChangefeedNewRedoManagerError", func() {
err = errors.New("changefeed new redo manager injected error")
})
if err != nil {
return err
}
c.redoDDLMgr = redo.NewDDLManager(c.id, c.state.Info.Config.Consistent, ddlStartTs)
if c.redoDDLMgr.Enabled() {
c.wg.Add(1)
go func() {
Expand All @@ -597,12 +597,7 @@ LOOP2:
}()
}

c.redoMetaMgr, err = redo.NewMetaManagerWithInit(cancelCtx,
c.id,
c.state.Info.Config.Consistent, checkpointTs)
if err != nil {
return err
}
c.redoMetaMgr = redo.NewMetaManager(c.id, c.state.Info.Config.Consistent, checkpointTs)
if c.redoMetaMgr.Enabled() {
c.wg.Add(1)
go func() {
Expand Down Expand Up @@ -771,15 +766,7 @@ func (c *changefeed) cleanupRedoManager(ctx context.Context) {
}
// when removing a paused changefeed, the redo manager is nil, create a new one
if c.redoMetaMgr == nil {
redoMetaMgr, err := redo.NewMetaManager(ctx, c.id, c.state.Info.Config.Consistent)
if err != nil {
log.Info("owner creates redo manager for clean fail",
zap.String("namespace", c.id.Namespace),
zap.String("changefeed", c.id.ID),
zap.Error(err))
return
}
c.redoMetaMgr = redoMetaMgr
c.redoMetaMgr = redo.NewMetaManager(c.id, c.state.Info.Config.Consistent, 0)
}
err := c.redoMetaMgr.Cleanup(ctx)
if err != nil {
Expand Down
5 changes: 1 addition & 4 deletions cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -605,10 +605,7 @@ func (p *processor) lazyInitImpl(etcdCtx cdcContext.Context) (err error) {
}
p.latestInfo.Config.Sink.TiDBSourceID = sourceID

p.redo.r, err = redo.NewDMLManager(prcCtx, p.changefeedID, p.latestInfo.Config.Consistent)
if err != nil {
return err
}
p.redo.r = redo.NewDMLManager(p.changefeedID, p.latestInfo.Config.Consistent)
p.redo.name = "RedoManager"
p.redo.changefeedID = p.changefeedID
p.redo.spawn(prcCtx)
Expand Down
3 changes: 1 addition & 2 deletions cdc/processor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,13 @@ func newProcessor4Test(
} else {
tmpDir := t.TempDir()
redoDir := fmt.Sprintf("%s/%s", tmpDir, changefeedID)
dmlMgr, err := redo.NewDMLManager(ctx, changefeedID, &config.ConsistentConfig{
dmlMgr := redo.NewDMLManager(changefeedID, &config.ConsistentConfig{
Level: string(redoPkg.ConsistentLevelEventual),
MaxLogSize: redoPkg.DefaultMaxLogSize,
FlushIntervalInMs: redoPkg.DefaultFlushIntervalInMs,
Storage: "file://" + redoDir,
UseFileBackend: false,
})
require.NoError(t, err)
p.redo.r = dmlMgr
}
p.redo.name = "RedoManager"
Expand Down
93 changes: 46 additions & 47 deletions cdc/redo/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ import (
"sync/atomic"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/processor/tablepb"
"github.com/pingcap/tiflow/cdc/redo/common"
Expand Down Expand Up @@ -65,20 +65,17 @@ func NewDisabledDDLManager() *ddlManager {

// NewDDLManager creates a new ddl Manager.
func NewDDLManager(
ctx context.Context, changefeedID model.ChangeFeedID,
changefeedID model.ChangeFeedID,
cfg *config.ConsistentConfig, ddlStartTs model.Ts,
) (*ddlManager, error) {
logManager, err := newLogManager(ctx, changefeedID, cfg, redo.RedoDDLLogFileType)
if err != nil {
return nil, err
}
) *ddlManager {
m := newLogManager(changefeedID, cfg, redo.RedoDDLLogFileType)
span := spanz.TableIDToComparableSpan(0)
logManager.AddTable(span, ddlStartTs)
m.AddTable(span, ddlStartTs)
return &ddlManager{
logManager: logManager,
// The current fakeSpan is meaningless, find a meaningful sapn in the future.
logManager: m,
// The current fakeSpan is meaningless, find a meaningful span in the future.
fakeSpan: span,
}, nil
}
}

type ddlManager struct {
Expand Down Expand Up @@ -115,14 +112,12 @@ type DMLManager interface {
}

// NewDMLManager creates a new dml Manager.
func NewDMLManager(ctx context.Context, changefeedID model.ChangeFeedID,
func NewDMLManager(changefeedID model.ChangeFeedID,
cfg *config.ConsistentConfig,
) (*dmlManager, error) {
logManager, err := newLogManager(ctx, changefeedID, cfg, redo.RedoRowLogFileType)
if err != nil {
return nil, err
) *dmlManager {
return &dmlManager{
logManager: newLogManager(changefeedID, cfg, redo.RedoRowLogFileType),
}
return &dmlManager{logManager: logManager}, nil
}

// NewDisabledDMLManager creates a disabled dml Manager.
Expand Down Expand Up @@ -228,29 +223,22 @@ type logManager struct {
}

func newLogManager(
ctx context.Context,
changefeedID model.ChangeFeedID,
cfg *config.ConsistentConfig, logType string,
) (*logManager, error) {
) *logManager {
// return a disabled Manager if no consistent config or normal consistent level
if cfg == nil || !redo.IsConsistentEnabled(cfg.Level) {
return &logManager{enabled: false}, nil
return &logManager{enabled: false}
}

uri, err := storage.ParseRawURL(cfg.Storage)
if err != nil {
return nil, err
}
m := &logManager{
return &logManager{
enabled: true,
cfg: &writer.LogWriterConfig{
ConsistentConfig: *cfg,
LogType: logType,
CaptureID: config.GetGlobalServerConfig().AdvertiseAddr,
ChangeFeedID: changefeedID,
URI: *uri,
UseExternalStorage: redo.IsExternalStorage(uri.Scheme),
MaxLogSizeInBytes: cfg.MaxLogSize * redo.Megabyte,
ConsistentConfig: *cfg,
LogType: logType,
CaptureID: config.GetGlobalServerConfig().AdvertiseAddr,
ChangeFeedID: changefeedID,
MaxLogSizeInBytes: cfg.MaxLogSize * redo.Megabyte,
},
logBuffer: chann.NewAutoDrainChann[cacheEvents](),
rtsMap: spanz.SyncMap{},
Expand All @@ -263,21 +251,30 @@ func newLogManager(
metricRedoWorkerBusyRatio: common.RedoWorkerBusyRatio.
WithLabelValues(changefeedID.Namespace, changefeedID.ID),
}

m.writer, err = factory.NewRedoLogWriter(ctx, m.cfg)
if err != nil {
return nil, err
}
return m, nil
}

// Run implements pkg/util.Runnable.
func (m *logManager) Run(ctx context.Context, _ ...chan<- error) error {
if m.Enabled() {
defer m.close()
return m.bgUpdateLog(ctx)
failpoint.Inject("ChangefeedNewRedoManagerError", func() {
failpoint.Return(errors.New("changefeed new redo manager injected error"))
})
if !m.Enabled() {
return nil
}
return nil

defer m.close()
start := time.Now()
w, err := factory.NewRedoLogWriter(ctx, m.cfg)
if err != nil {
log.Error("redo: failed to create redo log writer",
zap.String("namespace", m.cfg.ChangeFeedID.Namespace),
zap.String("changefeed", m.cfg.ChangeFeedID.ID),
zap.Duration("duration", time.Since(start)),
zap.Error(err))
return err
}
m.writer = w
return m.bgUpdateLog(ctx)
}

// WaitForReady implements pkg/util.Runnable.
Expand Down Expand Up @@ -549,11 +546,13 @@ func (m *logManager) close() {
atomic.StoreInt32(&m.closed, 1)

m.logBuffer.CloseAndDrain()
if err := m.writer.Close(); err != nil {
log.Error("redo manager fails to close writer",
zap.String("namespace", m.cfg.ChangeFeedID.Namespace),
zap.String("changefeed", m.cfg.ChangeFeedID.ID),
zap.Error(err))
if m.writer != nil {
if err := m.writer.Close(); err != nil && errors.Cause(err) != context.Canceled {
log.Error("redo manager fails to close writer",
zap.String("namespace", m.cfg.ChangeFeedID.Namespace),
zap.String("changefeed", m.cfg.ChangeFeedID.ID),
zap.Error(err))
}
}
log.Info("redo manager closed",
zap.String("namespace", m.cfg.ChangeFeedID.Namespace),
Expand Down
72 changes: 29 additions & 43 deletions cdc/redo/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/processor/tablepb"
"github.com/pingcap/tiflow/cdc/redo/writer"
"github.com/pingcap/tiflow/cdc/redo/writer/blackhole"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/redo"
"github.com/pingcap/tiflow/pkg/spanz"
Expand Down Expand Up @@ -121,15 +120,11 @@ func TestLogManagerInProcessor(t *testing.T) {
FlushIntervalInMs: redo.MinFlushIntervalInMs,
UseFileBackend: useFileBackend,
}
dmlMgr, err := NewDMLManager(ctx, model.DefaultChangeFeedID("test"), cfg)
require.NoError(t, err)
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
dmlMgr.Run(ctx)
}()

dmlMgr := NewDMLManager(model.DefaultChangeFeedID("test"), cfg)
var eg errgroup.Group
eg.Go(func() error {
return dmlMgr.Run(ctx)
})
// check emit row changed events can move forward resolved ts
spans := []tablepb.Span{
spanz.TableIDToComparableSpan(53),
Expand Down Expand Up @@ -202,7 +197,7 @@ func TestLogManagerInProcessor(t *testing.T) {
checkResolvedTs(t, dmlMgr.logManager, flushResolvedTs)

cancel()
wg.Wait()
require.ErrorIs(t, eg.Wait(), context.Canceled)
}

testWriteDMLs("blackhole://", true)
Expand Down Expand Up @@ -233,26 +228,24 @@ func TestLogManagerInOwner(t *testing.T) {
UseFileBackend: useFileBackend,
}
startTs := model.Ts(10)
ddlMgr, err := NewDDLManager(ctx, model.DefaultChangeFeedID("test"), cfg, startTs)
require.NoError(t, err)
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
ddlMgr.Run(ctx)
}()
ddlMgr := NewDDLManager(model.DefaultChangeFeedID("test"), cfg, startTs)

var eg errgroup.Group
eg.Go(func() error {
return ddlMgr.Run(ctx)
})

require.Equal(t, startTs, ddlMgr.GetResolvedTs())
ddl := &model.DDLEvent{StartTs: 100, CommitTs: 120, Query: "CREATE TABLE `TEST.T1`"}
err = ddlMgr.EmitDDLEvent(ctx, ddl)
err := ddlMgr.EmitDDLEvent(ctx, ddl)
require.NoError(t, err)
require.Equal(t, startTs, ddlMgr.GetResolvedTs())

ddlMgr.UpdateResolvedTs(ctx, ddl.CommitTs)
checkResolvedTs(t, ddlMgr.logManager, ddl.CommitTs)

cancel()
wg.Wait()
require.ErrorIs(t, eg.Wait(), context.Canceled)
}

testWriteDDLs("blackhole://", true)
Expand All @@ -275,23 +268,14 @@ func TestLogManagerError(t *testing.T) {
cfg := &config.ConsistentConfig{
Level: string(redo.ConsistentLevelEventual),
MaxLogSize: redo.DefaultMaxLogSize,
Storage: "blackhole://",
Storage: "blackhole-invalid://",
FlushIntervalInMs: redo.MinFlushIntervalInMs,
}
logMgr, err := NewDMLManager(ctx, model.DefaultChangeFeedID("test"), cfg)
require.NoError(t, err)
err = logMgr.writer.Close()
require.NoError(t, err)
logMgr.writer = blackhole.NewInvalidLogWriter(logMgr.writer)

wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
err := logMgr.Run(ctx)
require.Regexp(t, ".*invalid black hole writer.*", err)
require.Regexp(t, ".*WriteLog.*", err)
}()
logMgr := NewDMLManager(model.DefaultChangeFeedID("test"), cfg)
var eg errgroup.Group
eg.Go(func() error {
return logMgr.Run(ctx)
})

testCases := []struct {
span tablepb.Span
Expand All @@ -310,7 +294,10 @@ func TestLogManagerError(t *testing.T) {
err := logMgr.emitRedoEvents(ctx, tc.span, nil, tc.rows...)
require.NoError(t, err)
}
wg.Wait()

err := eg.Wait()
require.Regexp(t, ".*invalid black hole writer.*", err)
require.Regexp(t, ".*WriteLog.*", err)
}

func BenchmarkBlackhole(b *testing.B) {
Expand All @@ -336,9 +323,8 @@ func runBenchTest(b *testing.B, storage string, useFileBackend bool) {
FlushIntervalInMs: redo.MinFlushIntervalInMs,
UseFileBackend: useFileBackend,
}
dmlMgr, err := NewDMLManager(ctx, model.DefaultChangeFeedID("test"), cfg)
require.Nil(b, err)
eg := errgroup.Group{}
dmlMgr := NewDMLManager(model.DefaultChangeFeedID("test"), cfg)
var eg errgroup.Group
eg.Go(func() error {
return dmlMgr.Run(ctx)
})
Expand Down Expand Up @@ -366,7 +352,7 @@ func runBenchTest(b *testing.B, storage string, useFileBackend bool) {
go func(span tablepb.Span) {
defer wg.Done()
maxCommitTs := maxTsMap.GetV(span)
rows := []*model.RowChangedEvent{}
var rows []*model.RowChangedEvent
for i := 0; i < maxRowCount; i++ {
if i%100 == 0 {
// prepare new row change events
Expand Down Expand Up @@ -409,6 +395,6 @@ func runBenchTest(b *testing.B, storage string, useFileBackend bool) {
time.Sleep(time.Millisecond * 500)
}
cancel()
err = eg.Wait()
require.ErrorIs(b, err, context.Canceled)

require.ErrorIs(b, eg.Wait(), context.Canceled)
}
Loading
Loading