Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

redo(ticdc): use memory backend in writer #8107

Merged
merged 7 commits into from
Mar 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion cdc/api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/redo"
"github.com/pingcap/tiflow/pkg/security"
)

Expand Down Expand Up @@ -229,6 +230,7 @@ func (c *ReplicaConfig) ToInternalReplicaConfig() *config.ReplicaConfig {
MaxLogSize: c.Consistent.MaxLogSize,
FlushIntervalInMs: c.Consistent.FlushIntervalInMs,
Storage: c.Consistent.Storage,
UseFileBackend: c.Consistent.UseFileBackend,
}
}
if c.Sink != nil {
Expand Down Expand Up @@ -387,6 +389,7 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
MaxLogSize: cloned.Consistent.MaxLogSize,
FlushIntervalInMs: cloned.Consistent.FlushIntervalInMs,
Storage: cloned.Consistent.Storage,
UseFileBackend: cloned.Consistent.UseFileBackend,
}
}
if cloned.Mounter != nil {
Expand Down Expand Up @@ -419,8 +422,9 @@ func GetDefaultReplicaConfig() *ReplicaConfig {
Consistent: &ConsistentConfig{
Level: "none",
MaxLogSize: 64,
FlushIntervalInMs: config.DefaultFlushIntervalInMs,
FlushIntervalInMs: redo.DefaultFlushIntervalInMs,
Storage: "",
UseFileBackend: true,
},
Scheduler: &ChangefeedSchedulerConfig{
EnableTableAcrossNodes: config.GetDefaultReplicaConfig().
Expand Down Expand Up @@ -569,6 +573,7 @@ type ConsistentConfig struct {
MaxLogSize int64 `json:"max_log_size"`
FlushIntervalInMs int64 `json:"flush_interval"`
Storage string `json:"storage"`
UseFileBackend bool `json:"use_file_backend"`
}

// ChangefeedSchedulerConfig is per changefeed scheduler settings.
Expand Down
41 changes: 35 additions & 6 deletions cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,13 +241,48 @@ type RedoLog struct {
Type RedoLogType `msg:"type"`
}

// GetCommitTs returns the commit ts of the redo log.
func (r *RedoLog) GetCommitTs() Ts {
switch r.Type {
case RedoLogTypeRow:
return r.RedoRow.Row.CommitTs
case RedoLogTypeDDL:
return r.RedoDDL.DDL.CommitTs
default:
log.Panic("invalid redo log type", zap.Any("type", r.Type))
}
return 0
}

// RedoRowChangedEvent represents the DML event used in RedoLog
type RedoRowChangedEvent struct {
Row *RowChangedEvent `msg:"row"`
Columns []RedoColumn `msg:"columns"`
PreColumns []RedoColumn `msg:"pre-columns"`
}

// RedoDDLEvent represents DDL event used in redo log persistent
type RedoDDLEvent struct {
DDL *DDLEvent `msg:"ddl"`
Type byte `msg:"type"`
}

// ToRedoLog converts row changed event to redo log
func (row *RowChangedEvent) ToRedoLog() *RedoLog {
zhaoxinyu marked this conversation as resolved.
Show resolved Hide resolved
return &RedoLog{
RedoRow: RedoRowChangedEvent{Row: row},
Type: RedoLogTypeRow,
}
}

// ToRedoLog converts ddl event to redo log
func (ddl *DDLEvent) ToRedoLog() *RedoLog {
return &RedoLog{
RedoDDL: RedoDDLEvent{DDL: ddl},
Type: RedoLogTypeDDL,
}
}

// RowChangedEvent represents a row changed event
type RowChangedEvent struct {
StartTs uint64 `json:"start-ts" msg:"start-ts"`
Expand Down Expand Up @@ -598,12 +633,6 @@ type DDLEvent struct {
Done bool `msg:"-"`
}

// RedoDDLEvent represents DDL event used in redo log persistent
type RedoDDLEvent struct {
DDL *DDLEvent `msg:"ddl"`
Type byte `msg:"type"`
}

// FromJob fills the values with DDLEvent from DDL job
func (d *DDLEvent) FromJob(job *model.Job, preTableInfo *TableInfo, tableInfo *TableInfo) {
// populating DDLEvent of an `rename tables` job is handled in `FromRenameTablesJob()`
Expand Down
62 changes: 40 additions & 22 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ type changefeed struct {
// and will be destroyed when a changefeed is closed.
barriers *barriers
feedStateManager *feedStateManager
redoManager redo.LogManager
redoDDLMgr redo.DDLManager
redoMetaMgr redo.MetaManager

schema *schemaWrap4Owner
sink DDLSink
Expand Down Expand Up @@ -108,9 +109,9 @@ type changefeed struct {
// cancel the running goroutine start by `DDLPuller`
cancel context.CancelFunc

// The changefeed will start a backend goroutine in the function `initialize` for DDLPuller
// `ddlWg` is used to manage this backend goroutine.
ddlWg sync.WaitGroup
// The changefeed will start a backend goroutine in the function `initialize`
// for DDLPuller and redo manager. `wg` is used to manage this backend goroutine.
wg sync.WaitGroup

metricsChangefeedCheckpointTsGauge prometheus.Gauge
metricsChangefeedCheckpointTsLagGauge prometheus.Gauge
Expand Down Expand Up @@ -370,13 +371,13 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]*
newCheckpointTs = barrierTs
}
prevResolvedTs := c.state.Status.ResolvedTs
if c.redoManager.Enabled() {
var flushedCheckpointTs, flushedResolvedTs model.Ts
if c.redoMetaMgr.Enabled() {
// newResolvedTs can never exceed the barrier timestamp boundary. If redo is enabled,
// we can only upload it to etcd after it has been flushed into redo meta.
// NOTE: `UpdateMeta` handles regressed checkpointTs and resolvedTs internally.
c.redoManager.UpdateMeta(newCheckpointTs, newResolvedTs)
c.redoManager.GetFlushedMeta(&flushedCheckpointTs, &flushedResolvedTs)
c.redoMetaMgr.UpdateMeta(newCheckpointTs, newResolvedTs)
flushedMeta := c.redoMetaMgr.GetFlushedMeta()
flushedCheckpointTs, flushedResolvedTs := flushedMeta.CheckpointTs, flushedMeta.ResolvedTs
log.Debug("owner gets flushed meta",
zap.Uint64("flushedResolvedTs", flushedResolvedTs),
zap.Uint64("flushedCheckpointTs", flushedCheckpointTs),
Expand Down Expand Up @@ -546,9 +547,9 @@ LOOP:
return errors.Trace(err)
}

c.ddlWg.Add(1)
c.wg.Add(1)
go func() {
defer c.ddlWg.Done()
defer c.wg.Done()
ctx.Throw(c.ddlPuller.Run(cancelCtx))
}()

Expand All @@ -560,15 +561,33 @@ LOOP:
c.observerLastTick = atomic.NewTime(time.Time{})

stdCtx := contextutil.PutChangefeedIDInCtx(cancelCtx, c.id)
redoManagerOpts := redo.NewOwnerManagerOptions(c.errCh)
mgr, err := redo.NewManager(stdCtx, c.state.Info.Config.Consistent, redoManagerOpts)
c.redoManager = mgr
c.redoDDLMgr, err = redo.NewDDLManager(stdCtx, c.state.Info.Config.Consistent, ddlStartTs)
failpoint.Inject("ChangefeedNewRedoManagerError", func() {
err = errors.New("changefeed new redo manager injected error")
})
if err != nil {
return err
}
if c.redoDDLMgr.Enabled() {
CharlesCheung96 marked this conversation as resolved.
Show resolved Hide resolved
c.wg.Add(1)
go func() {
defer c.wg.Done()
ctx.Throw(c.redoDDLMgr.Run(stdCtx))
}()
}

c.redoMetaMgr, err = redo.NewMetaManagerWithInit(stdCtx,
c.state.Info.Config.Consistent, checkpointTs)
if err != nil {
return err
}
if c.redoMetaMgr.Enabled() {
c.wg.Add(1)
go func() {
defer c.wg.Done()
ctx.Throw(c.redoMetaMgr.Run(stdCtx))
}()
}
log.Info("owner creates redo manager",
zap.String("namespace", c.id.Namespace),
zap.String("changefeed", c.id.ID))
Expand Down Expand Up @@ -634,7 +653,7 @@ func (c *changefeed) releaseResources(ctx cdcContext.Context) {
if c.ddlPuller != nil {
c.ddlPuller.Close()
}
c.ddlWg.Wait()
c.wg.Wait()

if c.sink != nil {
canceledCtx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -695,7 +714,7 @@ func (c *changefeed) cleanupMetrics() {
c.metricsChangefeedBarrierTsGauge = nil
}

// redoManagerCleanup cleanups redo logs if changefeed is removed and redo log is enabled
// cleanup redo logs if changefeed is removed and redo log is enabled
func (c *changefeed) cleanupRedoManager(ctx context.Context) {
if c.isRemoved {
if c.state == nil || c.state.Info == nil || c.state.Info.Config == nil ||
Expand All @@ -707,19 +726,18 @@ func (c *changefeed) cleanupRedoManager(ctx context.Context) {
return
}
// when removing a paused changefeed, the redo manager is nil, create a new one
if c.redoManager == nil {
redoManagerOpts := redo.NewManagerOptionsForClean()
redoManager, err := redo.NewManager(ctx, c.state.Info.Config.Consistent, redoManagerOpts)
if c.redoMetaMgr == nil {
redoMetaMgr, err := redo.NewMetaManager(ctx, c.state.Info.Config.Consistent)
if err != nil {
log.Info("owner creates redo manager for clean fail",
zap.String("namespace", c.id.Namespace),
zap.String("changefeed", c.id.ID),
zap.Error(err))
return
}
c.redoManager = redoManager
c.redoMetaMgr = redoMetaMgr
}
err := c.redoManager.Cleanup(ctx)
err := c.redoMetaMgr.Cleanup(ctx)
if err != nil {
log.Error("cleanup redo logs failed", zap.String("changefeed", c.id.ID), zap.Error(err))
}
Expand Down Expand Up @@ -906,12 +924,12 @@ func (c *changefeed) asyncExecDDLJob(ctx cdcContext.Context,
if err != nil {
return false, errors.Trace(err)
}
if c.redoManager.Enabled() {
if c.redoDDLMgr.Enabled() {
for _, ddlEvent := range c.ddlEventCache {
// FIXME: seems it's not necessary to emit DDL to redo storage,
// because for a given redo meta with range (checkpointTs, resolvedTs],
// there must be no pending DDLs not flushed into DDL sink.
err = c.redoManager.EmitDDLEvent(ctx, ddlEvent)
err = c.redoDDLMgr.EmitDDLEvent(ctx, ddlEvent)
if err != nil {
return false, err
}
Expand Down
23 changes: 16 additions & 7 deletions cdc/owner/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
cdcContext "github.com/pingcap/tiflow/pkg/context"
"github.com/pingcap/tiflow/pkg/etcd"
"github.com/pingcap/tiflow/pkg/orchestrator"
"github.com/pingcap/tiflow/pkg/redo"
"github.com/pingcap/tiflow/pkg/sink/observer"
"github.com/pingcap/tiflow/pkg/txnutil/gc"
"github.com/pingcap/tiflow/pkg/upstream"
Expand Down Expand Up @@ -501,7 +502,7 @@ func TestRemoveChangefeed(t *testing.T) {
info.Config.Consistent = &config.ConsistentConfig{
Level: "eventual",
Storage: filepath.Join("nfs://", dir),
FlushIntervalInMs: config.DefaultFlushIntervalInMs,
FlushIntervalInMs: redo.DefaultFlushIntervalInMs,
}
ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{
ID: ctx.ChangefeedVars().ID,
Expand All @@ -517,8 +518,9 @@ func TestRemovePausedChangefeed(t *testing.T) {
info.State = model.StateStopped
dir := t.TempDir()
info.Config.Consistent = &config.ConsistentConfig{
Level: "eventual",
Storage: filepath.Join("nfs://", dir),
Level: "eventual",
Storage: filepath.Join("nfs://", dir),
FlushIntervalInMs: redo.DefaultFlushIntervalInMs,
}
ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{
ID: ctx.ChangefeedVars().ID,
Expand Down Expand Up @@ -555,10 +557,17 @@ func testChangefeedReleaseResource(
err := cf.tick(ctx, captures)
require.Nil(t, err)
cancel()
// check redo log dir is deleted
_, err = os.Stat(redoLogDir)
log.Error(err)
require.True(t, os.IsNotExist(err))

if cf.state.Info.Config.Consistent.UseFileBackend {
// check redo log dir is deleted
_, err = os.Stat(redoLogDir)
log.Error(err)
require.True(t, os.IsNotExist(err))
} else {
files, err := os.ReadDir(redoLogDir)
require.NoError(t, err)
require.Len(t, files, 1) // only delete mark
}
}

func TestExecRenameTablesDDL(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions cdc/owner/ddl_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ type ddlManager struct {
// schema store multiple version of schema, it is used by scheduler
schema *schemaWrap4Owner
// redoManager is used to send DDL events to redo log and get redo resolvedTs.
redoManager redo.LogManager
redoManager redo.DDLManager
// ddlSink is used to ddlSink DDL events to the downstream
ddlSink DDLSink
// tableCheckpoint store the tableCheckpoint of each table. We need to wait
Expand Down Expand Up @@ -89,7 +89,7 @@ func newDDLManager(
ddlSink DDLSink,
ddlPuller puller.DDLPuller,
schema *schemaWrap4Owner,
redoManager redo.LogManager,
redoManager redo.DDLManager,
sinkType model.DownStreamType,
bdrMode bool,
) *ddlManager {
Expand Down
Loading