diff --git a/cdc/model/changefeed.go b/cdc/model/changefeed.go index 805d1a56729..c8330db49b2 100644 --- a/cdc/model/changefeed.go +++ b/cdc/model/changefeed.go @@ -589,6 +589,24 @@ func (info *ChangeFeedInfo) DownstreamType() (DownstreamType, error) { return Unknown, nil } +// NeedSendBootstrapEvent returns true if the changefeed need to send bootstrap event. +func (info *ChangeFeedInfo) NeedSendBootstrapEvent() (bool, error) { + downStreamType, err := info.DownstreamType() + if err != nil { + return false, errors.Trace(err) + } + if downStreamType != MQ { + return false, nil + } + if info.Config.Sink.Protocol == nil { + return false, nil + } + if *info.Config.Sink.Protocol == config.ProtocolSimple.String() { + return true, nil + } + return false, nil +} + func (info *ChangeFeedInfo) fixMemoryQuota() { info.Config.FixMemoryQuota() } diff --git a/cdc/owner/changefeed.go b/cdc/owner/changefeed.go index a2399e870b5..afea98aecc7 100644 --- a/cdc/owner/changefeed.go +++ b/cdc/owner/changefeed.go @@ -652,6 +652,11 @@ LOOP2: return errors.Trace(err) } + needSendBootstrapEvent, err := c.latestInfo.NeedSendBootstrapEvent() + if err != nil { + return errors.Trace(err) + } + c.ddlManager = newDDLManager( c.id, ddlStartTs, @@ -663,6 +668,7 @@ LOOP2: c.redoMetaMgr, downstreamType, util.GetOrZero(c.latestInfo.Config.BDRMode), + needSendBootstrapEvent, ) // create scheduler diff --git a/cdc/owner/ddl_manager.go b/cdc/owner/ddl_manager.go index c466f228c7d..833010440c1 100644 --- a/cdc/owner/ddl_manager.go +++ b/cdc/owner/ddl_manager.go @@ -140,9 +140,11 @@ type ddlManager struct { sinkType model.DownstreamType ddlResolvedTs model.Ts - needBootstrap bool - errCh chan error - bootstrapState int32 + // needBootstrap is true when the downstream is kafka + // and the protocol is simple protocol. + needSendBootstrapEvent bool + errCh chan error + bootstrapState int32 } func newDDLManager( @@ -156,6 +158,7 @@ func newDDLManager( redoMetaManager redo.MetaManager, sinkType model.DownstreamType, bdrMode bool, + needSendBootstrapEvent bool, ) *ddlManager { log.Info("create ddl manager", zap.String("namaspace", changefeedID.Namespace), @@ -177,10 +180,11 @@ func newDDLManager( ddlResolvedTs: startTs, BDRMode: bdrMode, // use the passed sinkType after we support get resolvedTs from sink - sinkType: model.DB, - tableCheckpoint: make(map[model.TableName]model.Ts), - pendingDDLs: make(map[model.TableName][]*model.DDLEvent), - errCh: make(chan error, 1), + sinkType: model.DB, + tableCheckpoint: make(map[model.TableName]model.Ts), + pendingDDLs: make(map[model.TableName][]*model.DDLEvent), + errCh: make(chan error, 1), + needSendBootstrapEvent: needSendBootstrapEvent, } } @@ -198,9 +202,7 @@ func (m *ddlManager) tick( checkpointTs model.Ts, tableCheckpoint map[model.TableName]model.Ts, ) ([]model.TableID, *schedulepb.BarrierWithMinTs, error) { - // needBootstrap is true when the downstream is kafka - // and the protocol is simple protocol. - if m.needBootstrap { + if m.needSendBootstrapEvent { ok, err := m.checkAndBootstrap(ctx) if err != nil { return nil, nil, err diff --git a/cdc/owner/ddl_manager_test.go b/cdc/owner/ddl_manager_test.go index 3255608be32..0d7eb543ab2 100644 --- a/cdc/owner/ddl_manager_test.go +++ b/cdc/owner/ddl_manager_test.go @@ -48,7 +48,7 @@ func createDDLManagerForTest(t *testing.T) *ddlManager { schema, redo.NewDisabledDDLManager(), redo.NewDisabledMetaManager(), - model.DB, false) + model.DB, false, false) return res } diff --git a/cdc/sink/util/helper.go b/cdc/sink/util/helper.go index cec952cbe5c..afa45941d26 100644 --- a/cdc/sink/util/helper.go +++ b/cdc/sink/util/helper.go @@ -51,7 +51,7 @@ func GetProtocol(protocolStr string) (config.Protocol, error) { func GetFileExtension(protocol config.Protocol) string { switch protocol { case config.ProtocolAvro, config.ProtocolCanalJSON, config.ProtocolMaxwell, - config.ProtocolOpen: + config.ProtocolOpen, config.ProtocolSimple: return ".json" case config.ProtocolCraft: return ".craft" diff --git a/pkg/sink/codec/builder/encoder_builder.go b/pkg/sink/codec/builder/encoder_builder.go index 49e58543abc..ea9ab3b118b 100644 --- a/pkg/sink/codec/builder/encoder_builder.go +++ b/pkg/sink/codec/builder/encoder_builder.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/tiflow/pkg/sink/codec/csv" "github.com/pingcap/tiflow/pkg/sink/codec/maxwell" "github.com/pingcap/tiflow/pkg/sink/codec/open" + "github.com/pingcap/tiflow/pkg/sink/codec/simple" ) // NewRowEventEncoderBuilder returns an RowEventEncoderBuilder @@ -45,7 +46,8 @@ func NewRowEventEncoderBuilder( return canal.NewJSONRowEventEncoderBuilder(ctx, cfg) case config.ProtocolCraft: return craft.NewBatchEncoderBuilder(cfg), nil - + case config.ProtocolSimple: + return simple.NewBuilder(cfg), nil default: return nil, cerror.ErrSinkUnknownProtocol.GenWithStackByArgs(cfg.Protocol) } diff --git a/pkg/sink/codec/simple/encoder.go b/pkg/sink/codec/simple/encoder.go index 47cc4efea35..53bfbb371b5 100644 --- a/pkg/sink/codec/simple/encoder.go +++ b/pkg/sink/codec/simple/encoder.go @@ -25,20 +25,31 @@ import ( ) //nolint:unused -type encoder struct{} +type encoder struct { + cfg *common.Config +} -type builder struct{} +type builder struct { + cfg *common.Config +} // NewBuilder returns a new builder -func NewBuilder() *builder { - return &builder{} +func NewBuilder(cfg *common.Config) *builder { + return &builder{ + cfg: cfg, + } } // Build implement the RowEventEncoderBuilder interface func (b *builder) Build() codec.RowEventEncoder { - return &encoder{} + return &encoder{ + cfg: b.cfg, + } } +// CleanMetrics implement the RowEventEncoderBuilder interface +func (b *builder) CleanMetrics() {} + // AppendRowChangedEvent implement the RowEventEncoder interface // //nolint:unused diff --git a/pkg/sink/codec/simple/encoder_test.go b/pkg/sink/codec/simple/encoder_test.go index 081a087e903..ce01f806f87 100644 --- a/pkg/sink/codec/simple/encoder_test.go +++ b/pkg/sink/codec/simple/encoder_test.go @@ -18,13 +18,15 @@ import ( "github.com/pingcap/tiflow/cdc/entry" "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/pkg/sink/codec/common" "github.com/stretchr/testify/require" ) func TestEncodeCheckpoint(t *testing.T) { t.Parallel() - enc := NewBuilder().Build() + cfg := &common.Config{} + enc := NewBuilder(cfg).Build() checkpoint := 23 m, err := enc.EncodeCheckpointEvent(uint64(checkpoint)) @@ -52,7 +54,8 @@ func TestEncodeDDLEvent(t *testing.T) { age int, email varchar(255) not null, key idx_name(name), key idx_name_email(name, email))` job := helper.DDL2Job(sql) tableInfo := model.WrapTableInfo(1, "test", 1, job.BinlogInfo.TableInfo) - enc := NewBuilder().Build() + cfg := &common.Config{} + enc := NewBuilder(cfg).Build() ddlEvent := &model.DDLEvent{ StartTs: 1, CommitTs: 2, @@ -92,7 +95,8 @@ func TestEncodeBootstrapEvent(t *testing.T) { age int, email varchar(255) not null, key idx_name(name), key idx_name_email(name, email))` job := helper.DDL2Job(sql) tableInfo := model.WrapTableInfo(1, "test", 1, job.BinlogInfo.TableInfo) - enc := NewBuilder().Build() + cfg := &common.Config{} + enc := NewBuilder(cfg).Build() ddlEvent := &model.DDLEvent{ StartTs: 1, CommitTs: 2,