Skip to content

Commit

Permalink
remove Initialize method from the sink.
Browse files Browse the repository at this point in the history
  • Loading branch information
3AceShowHand committed Dec 1, 2021
1 parent 30d77ea commit 6a409e9
Show file tree
Hide file tree
Showing 13 changed files with 5 additions and 117 deletions.
5 changes: 0 additions & 5 deletions cdc/owner/async_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ const (
// The EmitCheckpointTs and EmitDDLEvent is asynchronous function for now
// Other functions are still synchronization
type AsyncSink interface {
Initialize(ctx cdcContext.Context, tableInfo []*model.SimpleTableInfo) error
// EmitCheckpointTs emits the checkpoint Ts to downstream data source
// this function will return after recording the checkpointTs specified in memory immediately
// and the recorded checkpointTs will be sent and updated to downstream data source every second
Expand Down Expand Up @@ -101,10 +100,6 @@ func newAsyncSink(ctx cdcContext.Context) (AsyncSink, error) {
return asyncSink, nil
}

func (s *asyncSinkImpl) Initialize(ctx cdcContext.Context, tableInfo []*model.SimpleTableInfo) error {
return s.sink.Initialize(ctx, tableInfo)
}

func (s *asyncSinkImpl) run(ctx cdcContext.Context) {
defer s.wg.Done()
// TODO make the tick duration configurable
Expand Down
25 changes: 4 additions & 21 deletions cdc/owner/async_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,10 @@ type asyncSinkSuite struct{}

type mockSink struct {
sink.Sink
initTableInfo []*model.SimpleTableInfo
checkpointTs model.Ts
ddl *model.DDLEvent
ddlMu sync.Mutex
ddlError error
}

func (m *mockSink) Initialize(ctx context.Context, tableInfo []*model.SimpleTableInfo) error {
m.initTableInfo = tableInfo
return nil
checkpointTs model.Ts
ddl *model.DDLEvent
ddlMu sync.Mutex
ddlError error
}

func (m *mockSink) EmitCheckpointTs(ctx context.Context, ts uint64) error {
Expand Down Expand Up @@ -87,17 +81,6 @@ func newAsyncSink4Test(ctx cdcContext.Context, c *check.C) (cdcContext.Context,
return ctx, sink, mockSink
}

func (s *asyncSinkSuite) TestInitialize(c *check.C) {
defer testleak.AfterTest(c)()
ctx := cdcContext.NewBackendContext4Test(false)
ctx, sink, mockSink := newAsyncSink4Test(ctx, c)
defer sink.Close(ctx)
tableInfos := []*model.SimpleTableInfo{{Schema: "test"}}
err := sink.Initialize(ctx, tableInfos)
c.Assert(err, check.IsNil)
c.Assert(tableInfos, check.DeepEquals, mockSink.initTableInfo)
}

func (s *asyncSinkSuite) TestCheckpoint(c *check.C) {
defer testleak.AfterTest(c)()
ctx := cdcContext.NewBackendContext4Test(false)
Expand Down
5 changes: 1 addition & 4 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,10 +256,7 @@ LOOP:
if err != nil {
return errors.Trace(err)
}
err = c.sink.Initialize(cancelCtx, c.schema.SinkTableInfos())
if err != nil {
return errors.Trace(err)
}

// Refer to the previous comment on why we use (checkpointTs-1).
c.ddlPuller, err = c.newDDLPuller(cancelCtx, checkpointTs-1)
if err != nil {
Expand Down
4 changes: 0 additions & 4 deletions cdc/processor/pipeline/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,6 @@ func (c *mockFlowController) GetConsumption() uint64 {
return 0
}

func (s *mockSink) Initialize(ctx context.Context, tableInfo []*model.SimpleTableInfo) error {
return nil
}

func (s *mockSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error {
for _, row := range rows {
s.received = append(s.received, struct {
Expand Down
5 changes: 0 additions & 5 deletions cdc/sink/black_hole.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,6 @@ func (b *blackHoleSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) e
return nil
}

// Initialize is no-op for blackhole
func (b *blackHoleSink) Initialize(ctx context.Context, tableInfo []*model.SimpleTableInfo) error {
return nil
}

func (b *blackHoleSink) Close(ctx context.Context) error {
return nil
}
Expand Down
33 changes: 0 additions & 33 deletions cdc/sink/cdclog/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,39 +312,6 @@ func (f *fileSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error
return nil
}

func (f *fileSink) Initialize(ctx context.Context, tableInfo []*model.SimpleTableInfo) error {
if tableInfo != nil {
for _, table := range tableInfo {
if table != nil {
name := makeTableDirectoryName(table.TableID)
err := os.MkdirAll(filepath.Join(f.logPath.root, name), defaultDirMode)
if err != nil {
return cerror.WrapError(cerror.ErrFileSinkCreateDir, err)
}
}
}
// update log meta to record the relationship about tableName and tableID
f.logMeta = makeLogMetaContent(tableInfo)
data, err := f.logMeta.Marshal()
if err != nil {
return cerror.WrapError(cerror.ErrMarshalFailed, err)
}
filePath := f.logPath.meta
if _, err := os.Stat(filePath); !os.IsNotExist(err) {
return cerror.WrapError(cerror.ErrFileSinkMetaAlreadyExists, err)
}
file, err := os.Create(filePath)
if err != nil {
return cerror.WrapError(cerror.ErrFileSinkCreateDir, err)
}
_, err = file.Write(data)
if err != nil {
return cerror.WrapError(cerror.ErrFileSinkFileOp, err)
}
}
return nil
}

func (f *fileSink) Close(ctx context.Context) error {
return nil
}
Expand Down
14 changes: 0 additions & 14 deletions cdc/sink/cdclog/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,20 +329,6 @@ func (s *s3Sink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error {
return s.storage.WriteFile(ctx, name, fileData)
}

func (s *s3Sink) Initialize(ctx context.Context, tableInfo []*model.SimpleTableInfo) error {
if tableInfo != nil {
// update log meta to record the relationship about tableName and tableID
s.logMeta = makeLogMetaContent(tableInfo)

data, err := s.logMeta.Marshal()
if err != nil {
return cerror.WrapError(cerror.ErrMarshalFailed, err)
}
return s.storage.WriteFile(ctx, logMetaFile, data)
}
return nil
}

func (s *s3Sink) Close(ctx context.Context) error {
return nil
}
Expand Down
8 changes: 0 additions & 8 deletions cdc/sink/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,6 @@ func newCheckSink(c *check.C) *checkSink {
}
}

func (c *checkSink) Initialize(ctx context.Context, tableInfo []*model.SimpleTableInfo) error {
panic("unreachable")
}

func (c *checkSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error {
c.rowsMu.Lock()
defer c.rowsMu.Unlock()
Expand Down Expand Up @@ -344,10 +340,6 @@ type errorSink struct {
*check.C
}

func (e *errorSink) Initialize(ctx context.Context, tableInfo []*model.SimpleTableInfo) error {
panic("unreachable")
}

func (e *errorSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error {
return errors.New("error in emit row changed events")
}
Expand Down
6 changes: 0 additions & 6 deletions cdc/sink/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,12 +249,6 @@ func (k *mqSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error {
return errors.Trace(err)
}

// Initialize registers Avro schemas for all tables
func (k *mqSink) Initialize(ctx context.Context, tableInfo []*model.SimpleTableInfo) error {
// No longer need it for now
return nil
}

func (k *mqSink) Close(ctx context.Context) error {
err := k.mqProducer.Close()
return errors.Trace(err)
Expand Down
5 changes: 0 additions & 5 deletions cdc/sink/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,11 +291,6 @@ func (s *mysqlSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error
return errors.Trace(err)
}

// Initialize is no-op for Mysql sink
func (s *mysqlSink) Initialize(ctx context.Context, tableInfo []*model.SimpleTableInfo) error {
return nil
}

func (s *mysqlSink) execDDLWithMaxRetries(ctx context.Context, ddl *model.DDLEvent) error {
return retry.Do(ctx, func() error {
err := s.execDDL(ctx, ddl)
Expand Down
5 changes: 0 additions & 5 deletions cdc/sink/simple_mysql_tester.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,6 @@ func newSimpleMySQLSink(ctx context.Context, sinkURI *url.URL, config *config.Re
return sink, nil
}

func (s *simpleMySQLSink) Initialize(ctx context.Context, tableInfo []*model.SimpleTableInfo) error {
// do nothing
return nil
}

// EmitRowChangedEvents sends Row Changed Event to Sink
// EmitRowChangedEvents may write rows to downstream directly;
func (s *simpleMySQLSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error {
Expand Down
2 changes: 0 additions & 2 deletions cdc/sink/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ const (

// Sink is an abstraction for anything that a changefeed may emit into.
type Sink interface {
Initialize(ctx context.Context, tableInfo []*model.SimpleTableInfo) error

// EmitRowChangedEvents sends Row Changed Event to Sink
// EmitRowChangedEvents may write rows to downstream directly;
EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error
Expand Down
5 changes: 0 additions & 5 deletions cdc/sink/table_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,6 @@ type tableSink struct {
redoManager redo.LogManager
}

func (t *tableSink) Initialize(ctx context.Context, tableInfo []*model.SimpleTableInfo) error {
// do nothing
return nil
}

func (t *tableSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error {
t.buffer = append(t.buffer, rows...)
t.manager.metricsTableSinkTotalRows.Add(float64(len(rows)))
Expand Down

0 comments on commit 6a409e9

Please sign in to comment.