Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cdc/sink: adjust kafka initialization logic (#3192) #3568

Merged
5 changes: 0 additions & 5 deletions cdc/owner/async_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ const (
// The EmitCheckpointTs and EmitDDLEvent is asynchronous function for now
// Other functions are still synchronization
type AsyncSink interface {
Initialize(ctx cdcContext.Context, tableInfo []*model.SimpleTableInfo) error
// EmitCheckpointTs emits the checkpoint Ts to downstream data source
// this function will return after recording the checkpointTs specified in memory immediately
// and the recorded checkpointTs will be sent and updated to downstream data source every second
Expand Down Expand Up @@ -101,10 +100,6 @@ func newAsyncSink(ctx cdcContext.Context) (AsyncSink, error) {
return asyncSink, nil
}

func (s *asyncSinkImpl) Initialize(ctx cdcContext.Context, tableInfo []*model.SimpleTableInfo) error {
return s.sink.Initialize(ctx, tableInfo)
}

func (s *asyncSinkImpl) run(ctx cdcContext.Context) {
defer s.wg.Done()
// TODO make the tick duration configurable
Expand Down
25 changes: 4 additions & 21 deletions cdc/owner/async_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,10 @@ type asyncSinkSuite struct {

type mockSink struct {
sink.Sink
initTableInfo []*model.SimpleTableInfo
checkpointTs model.Ts
ddl *model.DDLEvent
ddlMu sync.Mutex
ddlError error
}

func (m *mockSink) Initialize(ctx context.Context, tableInfo []*model.SimpleTableInfo) error {
m.initTableInfo = tableInfo
return nil
checkpointTs model.Ts
ddl *model.DDLEvent
ddlMu sync.Mutex
ddlError error
}

func (m *mockSink) EmitCheckpointTs(ctx context.Context, ts uint64) error {
Expand Down Expand Up @@ -88,17 +82,6 @@ func newAsyncSink4Test(ctx cdcContext.Context, c *check.C) (cdcContext.Context,
return ctx, sink, mockSink
}

func (s *asyncSinkSuite) TestInitialize(c *check.C) {
defer testleak.AfterTest(c)()
ctx := cdcContext.NewBackendContext4Test(false)
ctx, sink, mockSink := newAsyncSink4Test(ctx, c)
defer sink.Close(ctx)
tableInfos := []*model.SimpleTableInfo{{Schema: "test"}}
err := sink.Initialize(ctx, tableInfos)
c.Assert(err, check.IsNil)
c.Assert(tableInfos, check.DeepEquals, mockSink.initTableInfo)
}

func (s *asyncSinkSuite) TestCheckpoint(c *check.C) {
defer testleak.AfterTest(c)()
ctx := cdcContext.NewBackendContext4Test(false)
Expand Down
5 changes: 1 addition & 4 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,10 +254,7 @@ LOOP:
if err != nil {
return errors.Trace(err)
}
err = c.sink.Initialize(cancelCtx, c.schema.SinkTableInfos())
if err != nil {
return errors.Trace(err)
}

// Refer to the previous comment on why we use (checkpointTs-1).
c.ddlPuller, err = c.newDDLPuller(cancelCtx, checkpointTs-1)
if err != nil {
Expand Down
4 changes: 0 additions & 4 deletions cdc/processor/pipeline/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,6 @@ func (c *mockFlowController) GetConsumption() uint64 {
return 0
}

func (s *mockSink) Initialize(ctx context.Context, tableInfo []*model.SimpleTableInfo) error {
return nil
}

func (s *mockSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error {
for _, row := range rows {
s.received = append(s.received, struct {
Expand Down
5 changes: 0 additions & 5 deletions cdc/sink/black_hole.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,6 @@ func (b *blackHoleSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) e
return nil
}

// Initialize is no-op for blackhole
func (b *blackHoleSink) Initialize(ctx context.Context, tableInfo []*model.SimpleTableInfo) error {
return nil
}

func (b *blackHoleSink) Close(ctx context.Context) error {
return nil
}
Expand Down
8 changes: 0 additions & 8 deletions cdc/sink/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,6 @@ func newCheckSink(c *check.C) *checkSink {
}
}

func (c *checkSink) Initialize(ctx context.Context, tableInfo []*model.SimpleTableInfo) error {
panic("unreachable")
}

func (c *checkSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error {
c.rowsMu.Lock()
defer c.rowsMu.Unlock()
Expand Down Expand Up @@ -342,10 +338,6 @@ type errorSink struct {
*check.C
}

func (e *errorSink) Initialize(ctx context.Context, tableInfo []*model.SimpleTableInfo) error {
panic("unreachable")
}

func (e *errorSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error {
return errors.New("error in emit row changed events")
}
Expand Down
16 changes: 7 additions & 9 deletions cdc/sink/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,12 +265,6 @@ func (k *mqSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error {
return errors.Trace(err)
}

// Initialize registers Avro schemas for all tables
func (k *mqSink) Initialize(ctx context.Context, tableInfo []*model.SimpleTableInfo) error {
// No longer need it for now
return nil
}

func (k *mqSink) Close(ctx context.Context) error {
err := k.mqProducer.Close()
return errors.Trace(err)
Expand Down Expand Up @@ -414,7 +408,11 @@ func newKafkaSaramaSink(ctx context.Context, sinkURI *url.URL, filter *filter.Fi
topic := strings.TrimFunc(sinkURI.Path, func(r rune) bool {
return r == '/'
})
producer, err := kafka.NewKafkaSaramaProducer(ctx, sinkURI.Host, topic, config, errCh)
if topic == "" {
return nil, cerror.ErrKafkaInvalidConfig.GenWithStack("no topic is specified in sink-uri")
}

producer, err := kafka.NewKafkaSaramaProducer(ctx, topic, config, errCh)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -444,8 +442,8 @@ func newPulsarSink(ctx context.Context, sinkURI *url.URL, filter *filter.Filter,
if s != "" {
opts["max-batch-size"] = s
}
// For now, it's a place holder. Avro format have to make connection to Schema Registery,
// and it may needs credential.
// For now, it's a placeholder. Avro format have to make connection to Schema Registry,
// and it may need credential.
credential := &security.Credential{}
sink, err := newMqSink(ctx, credential, producer, filter, replicaConfig, opts, errCh)
if err != nil {
Expand Down
12 changes: 12 additions & 0 deletions cdc/sink/mq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@ func (s mqSinkSuite) TestKafkaSink(c *check.C) {
c.Assert(err, check.IsNil)
opts := map[string]string{}
errCh := make(chan error, 1)

c.Assert(failpoint.Enable("github.com/pingcap/tiflow/cdc/sink/producer/kafka/SkipTopicAutoCreate", "return(true)"), check.IsNil)
defer func() {
_ = failpoint.Disable("github.com/pingcap/tiflow/cdc/sink/producer/kafka/SkipTopicAutoCreate")
}()

sink, err := newKafkaSaramaSink(ctx, sinkURI, fr, replicaConfig, opts, errCh)
c.Assert(err, check.IsNil)

Expand Down Expand Up @@ -161,6 +167,12 @@ func (s mqSinkSuite) TestKafkaSinkFilter(c *check.C) {
c.Assert(err, check.IsNil)
opts := map[string]string{}
errCh := make(chan error, 1)

c.Assert(failpoint.Enable("github.com/pingcap/tiflow/cdc/sink/producer/kafka/SkipTopicAutoCreate", "return(true)"), check.IsNil)
defer func() {
_ = failpoint.Disable("github.com/pingcap/tiflow/cdc/sink/producer/kafka/SkipTopicAutoCreate")
}()

sink, err := newKafkaSaramaSink(ctx, sinkURI, fr, replicaConfig, opts, errCh)
c.Assert(err, check.IsNil)

Expand Down
5 changes: 0 additions & 5 deletions cdc/sink/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,11 +185,6 @@ func (s *mysqlSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error
return errors.Trace(err)
}

// Initialize is no-op for Mysql sink
func (s *mysqlSink) Initialize(ctx context.Context, tableInfo []*model.SimpleTableInfo) error {
return nil
}

func (s *mysqlSink) execDDLWithMaxRetries(ctx context.Context, ddl *model.DDLEvent) error {
return retry.Do(ctx, func() error {
err := s.execDDL(ctx, ddl)
Expand Down
Loading