Skip to content

Commit

Permalink
api(ticdc): support create simple protocol changefeed (#10116)
Browse files Browse the repository at this point in the history
ref #9898, close #10108
  • Loading branch information
asddongmen authored Nov 27, 2023
1 parent d260fdd commit 13499f4
Show file tree
Hide file tree
Showing 8 changed files with 64 additions and 21 deletions.
18 changes: 18 additions & 0 deletions cdc/model/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,24 @@ func (info *ChangeFeedInfo) DownstreamType() (DownstreamType, error) {
return Unknown, nil
}

// NeedSendBootstrapEvent returns true if the changefeed need to send bootstrap event.
func (info *ChangeFeedInfo) NeedSendBootstrapEvent() (bool, error) {
downStreamType, err := info.DownstreamType()
if err != nil {
return false, errors.Trace(err)
}
if downStreamType != MQ {
return false, nil
}
if info.Config.Sink.Protocol == nil {
return false, nil
}
if *info.Config.Sink.Protocol == config.ProtocolSimple.String() {
return true, nil
}
return false, nil
}

func (info *ChangeFeedInfo) fixMemoryQuota() {
info.Config.FixMemoryQuota()
}
Expand Down
6 changes: 6 additions & 0 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -652,6 +652,11 @@ LOOP2:
return errors.Trace(err)
}

needSendBootstrapEvent, err := c.latestInfo.NeedSendBootstrapEvent()
if err != nil {
return errors.Trace(err)
}

c.ddlManager = newDDLManager(
c.id,
ddlStartTs,
Expand All @@ -663,6 +668,7 @@ LOOP2:
c.redoMetaMgr,
downstreamType,
util.GetOrZero(c.latestInfo.Config.BDRMode),
needSendBootstrapEvent,
)

// create scheduler
Expand Down
22 changes: 12 additions & 10 deletions cdc/owner/ddl_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,11 @@ type ddlManager struct {
sinkType model.DownstreamType
ddlResolvedTs model.Ts

needBootstrap bool
errCh chan error
bootstrapState int32
// needBootstrap is true when the downstream is kafka
// and the protocol is simple protocol.
needSendBootstrapEvent bool
errCh chan error
bootstrapState int32
}

func newDDLManager(
Expand All @@ -156,6 +158,7 @@ func newDDLManager(
redoMetaManager redo.MetaManager,
sinkType model.DownstreamType,
bdrMode bool,
needSendBootstrapEvent bool,
) *ddlManager {
log.Info("create ddl manager",
zap.String("namaspace", changefeedID.Namespace),
Expand All @@ -177,10 +180,11 @@ func newDDLManager(
ddlResolvedTs: startTs,
BDRMode: bdrMode,
// use the passed sinkType after we support get resolvedTs from sink
sinkType: model.DB,
tableCheckpoint: make(map[model.TableName]model.Ts),
pendingDDLs: make(map[model.TableName][]*model.DDLEvent),
errCh: make(chan error, 1),
sinkType: model.DB,
tableCheckpoint: make(map[model.TableName]model.Ts),
pendingDDLs: make(map[model.TableName][]*model.DDLEvent),
errCh: make(chan error, 1),
needSendBootstrapEvent: needSendBootstrapEvent,
}
}

Expand All @@ -198,9 +202,7 @@ func (m *ddlManager) tick(
checkpointTs model.Ts,
tableCheckpoint map[model.TableName]model.Ts,
) ([]model.TableID, *schedulepb.BarrierWithMinTs, error) {
// needBootstrap is true when the downstream is kafka
// and the protocol is simple protocol.
if m.needBootstrap {
if m.needSendBootstrapEvent {
ok, err := m.checkAndBootstrap(ctx)
if err != nil {
return nil, nil, err
Expand Down
2 changes: 1 addition & 1 deletion cdc/owner/ddl_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func createDDLManagerForTest(t *testing.T) *ddlManager {
schema,
redo.NewDisabledDDLManager(),
redo.NewDisabledMetaManager(),
model.DB, false)
model.DB, false, false)
return res
}

Expand Down
2 changes: 1 addition & 1 deletion cdc/sink/util/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func GetProtocol(protocolStr string) (config.Protocol, error) {
func GetFileExtension(protocol config.Protocol) string {
switch protocol {
case config.ProtocolAvro, config.ProtocolCanalJSON, config.ProtocolMaxwell,
config.ProtocolOpen:
config.ProtocolOpen, config.ProtocolSimple:
return ".json"
case config.ProtocolCraft:
return ".craft"
Expand Down
4 changes: 3 additions & 1 deletion pkg/sink/codec/builder/encoder_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/tiflow/pkg/sink/codec/csv"
"github.com/pingcap/tiflow/pkg/sink/codec/maxwell"
"github.com/pingcap/tiflow/pkg/sink/codec/open"
"github.com/pingcap/tiflow/pkg/sink/codec/simple"
)

// NewRowEventEncoderBuilder returns an RowEventEncoderBuilder
Expand All @@ -45,7 +46,8 @@ func NewRowEventEncoderBuilder(
return canal.NewJSONRowEventEncoderBuilder(ctx, cfg)
case config.ProtocolCraft:
return craft.NewBatchEncoderBuilder(cfg), nil

case config.ProtocolSimple:
return simple.NewBuilder(cfg), nil
default:
return nil, cerror.ErrSinkUnknownProtocol.GenWithStackByArgs(cfg.Protocol)
}
Expand Down
21 changes: 16 additions & 5 deletions pkg/sink/codec/simple/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,31 @@ import (
)

//nolint:unused
type encoder struct{}
type encoder struct {
cfg *common.Config
}

type builder struct{}
type builder struct {
cfg *common.Config
}

// NewBuilder returns a new builder
func NewBuilder() *builder {
return &builder{}
func NewBuilder(cfg *common.Config) *builder {
return &builder{
cfg: cfg,
}
}

// Build implement the RowEventEncoderBuilder interface
func (b *builder) Build() codec.RowEventEncoder {
return &encoder{}
return &encoder{
cfg: b.cfg,
}
}

// CleanMetrics implement the RowEventEncoderBuilder interface
func (b *builder) CleanMetrics() {}

// AppendRowChangedEvent implement the RowEventEncoder interface
//
//nolint:unused
Expand Down
10 changes: 7 additions & 3 deletions pkg/sink/codec/simple/encoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@ import (

"github.com/pingcap/tiflow/cdc/entry"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/sink/codec/common"
"github.com/stretchr/testify/require"
)

func TestEncodeCheckpoint(t *testing.T) {
t.Parallel()

enc := NewBuilder().Build()
cfg := &common.Config{}
enc := NewBuilder(cfg).Build()

checkpoint := 23
m, err := enc.EncodeCheckpointEvent(uint64(checkpoint))
Expand Down Expand Up @@ -52,7 +54,8 @@ func TestEncodeDDLEvent(t *testing.T) {
age int, email varchar(255) not null, key idx_name(name), key idx_name_email(name, email))`
job := helper.DDL2Job(sql)
tableInfo := model.WrapTableInfo(1, "test", 1, job.BinlogInfo.TableInfo)
enc := NewBuilder().Build()
cfg := &common.Config{}
enc := NewBuilder(cfg).Build()
ddlEvent := &model.DDLEvent{
StartTs: 1,
CommitTs: 2,
Expand Down Expand Up @@ -92,7 +95,8 @@ func TestEncodeBootstrapEvent(t *testing.T) {
age int, email varchar(255) not null, key idx_name(name), key idx_name_email(name, email))`
job := helper.DDL2Job(sql)
tableInfo := model.WrapTableInfo(1, "test", 1, job.BinlogInfo.TableInfo)
enc := NewBuilder().Build()
cfg := &common.Config{}
enc := NewBuilder(cfg).Build()
ddlEvent := &model.DDLEvent{
StartTs: 1,
CommitTs: 2,
Expand Down

0 comments on commit 13499f4

Please sign in to comment.