diff --git a/cdc/owner/async_sink.go b/cdc/owner/async_sink.go index e26b0f047ed..3cb58593996 100644 --- a/cdc/owner/async_sink.go +++ b/cdc/owner/async_sink.go @@ -38,7 +38,6 @@ const ( // The EmitCheckpointTs and EmitDDLEvent is asynchronous function for now // Other functions are still synchronization type AsyncSink interface { - Initialize(ctx cdcContext.Context, tableInfo []*model.SimpleTableInfo) error // EmitCheckpointTs emits the checkpoint Ts to downstream data source // this function will return after recording the checkpointTs specified in memory immediately // and the recorded checkpointTs will be sent and updated to downstream data source every second @@ -101,10 +100,6 @@ func newAsyncSink(ctx cdcContext.Context) (AsyncSink, error) { return asyncSink, nil } -func (s *asyncSinkImpl) Initialize(ctx cdcContext.Context, tableInfo []*model.SimpleTableInfo) error { - return s.sink.Initialize(ctx, tableInfo) -} - func (s *asyncSinkImpl) run(ctx cdcContext.Context) { defer s.wg.Done() // TODO make the tick duration configurable diff --git a/cdc/owner/async_sink_test.go b/cdc/owner/async_sink_test.go index 3c7fc11a11e..bf3f43896db 100644 --- a/cdc/owner/async_sink_test.go +++ b/cdc/owner/async_sink_test.go @@ -36,16 +36,10 @@ type asyncSinkSuite struct{} type mockSink struct { sink.Sink - initTableInfo []*model.SimpleTableInfo - checkpointTs model.Ts - ddl *model.DDLEvent - ddlMu sync.Mutex - ddlError error -} - -func (m *mockSink) Initialize(ctx context.Context, tableInfo []*model.SimpleTableInfo) error { - m.initTableInfo = tableInfo - return nil + checkpointTs model.Ts + ddl *model.DDLEvent + ddlMu sync.Mutex + ddlError error } func (m *mockSink) EmitCheckpointTs(ctx context.Context, ts uint64) error { @@ -87,17 +81,6 @@ func newAsyncSink4Test(ctx cdcContext.Context, c *check.C) (cdcContext.Context, return ctx, sink, mockSink } -func (s *asyncSinkSuite) TestInitialize(c *check.C) { - defer testleak.AfterTest(c)() - ctx := cdcContext.NewBackendContext4Test(false) - ctx, sink, mockSink := newAsyncSink4Test(ctx, c) - defer sink.Close(ctx) - tableInfos := []*model.SimpleTableInfo{{Schema: "test"}} - err := sink.Initialize(ctx, tableInfos) - c.Assert(err, check.IsNil) - c.Assert(tableInfos, check.DeepEquals, mockSink.initTableInfo) -} - func (s *asyncSinkSuite) TestCheckpoint(c *check.C) { defer testleak.AfterTest(c)() ctx := cdcContext.NewBackendContext4Test(false) diff --git a/cdc/owner/changefeed.go b/cdc/owner/changefeed.go index 3074e548b77..984faf80744 100644 --- a/cdc/owner/changefeed.go +++ b/cdc/owner/changefeed.go @@ -249,10 +249,7 @@ LOOP: if err != nil { return errors.Trace(err) } - err = c.sink.Initialize(cancelCtx, c.schema.SinkTableInfos()) - if err != nil { - return errors.Trace(err) - } + // Refer to the previous comment on why we use (checkpointTs-1). c.ddlPuller, err = c.newDDLPuller(cancelCtx, checkpointTs-1) if err != nil { diff --git a/cdc/processor/pipeline/sink_test.go b/cdc/processor/pipeline/sink_test.go index 43fe44668c0..507c12acaed 100644 --- a/cdc/processor/pipeline/sink_test.go +++ b/cdc/processor/pipeline/sink_test.go @@ -58,10 +58,6 @@ func (c *mockFlowController) GetConsumption() uint64 { return 0 } -func (s *mockSink) Initialize(ctx context.Context, tableInfo []*model.SimpleTableInfo) error { - return nil -} - func (s *mockSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error { for _, row := range rows { s.received = append(s.received, struct { diff --git a/cdc/sink/black_hole.go b/cdc/sink/black_hole.go index 3eca14a0119..da2f407df37 100644 --- a/cdc/sink/black_hole.go +++ b/cdc/sink/black_hole.go @@ -70,11 +70,6 @@ func (b *blackHoleSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) e return nil } -// Initialize is no-op for blackhole -func (b *blackHoleSink) Initialize(ctx context.Context, tableInfo []*model.SimpleTableInfo) error { - return nil -} - func (b *blackHoleSink) Close(ctx context.Context) error { return nil } diff --git a/cdc/sink/manager_test.go b/cdc/sink/manager_test.go index 354b06563ae..81d9a6afec8 100644 --- a/cdc/sink/manager_test.go +++ b/cdc/sink/manager_test.go @@ -40,10 +40,6 @@ type checkSink struct { lastResolvedTs uint64 } -func (c *checkSink) Initialize(ctx context.Context, tableInfo []*model.SimpleTableInfo) error { - panic("unreachable") -} - func (c *checkSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error { c.rowsMu.Lock() defer c.rowsMu.Unlock() @@ -331,10 +327,6 @@ type errorSink struct { *check.C } -func (e *errorSink) Initialize(ctx context.Context, tableInfo []*model.SimpleTableInfo) error { - panic("unreachable") -} - func (e *errorSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error { return errors.New("error in emit row changed events") } diff --git a/cdc/sink/mq.go b/cdc/sink/mq.go index a2a47f7fa66..e260ce71113 100644 --- a/cdc/sink/mq.go +++ b/cdc/sink/mq.go @@ -262,12 +262,6 @@ func (k *mqSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error { return errors.Trace(err) } -// Initialize registers Avro schemas for all tables -func (k *mqSink) Initialize(ctx context.Context, tableInfo []*model.SimpleTableInfo) error { - // No longer need it for now - return nil -} - func (k *mqSink) Close(ctx context.Context) error { err := k.mqProducer.Close() return errors.Trace(err) diff --git a/cdc/sink/mysql.go b/cdc/sink/mysql.go index c0b25f7c9b4..0583eee4b3a 100644 --- a/cdc/sink/mysql.go +++ b/cdc/sink/mysql.go @@ -202,11 +202,6 @@ func (s *mysqlSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error return errors.Trace(err) } -// Initialize is no-op for Mysql sink -func (s *mysqlSink) Initialize(ctx context.Context, tableInfo []*model.SimpleTableInfo) error { - return nil -} - func (s *mysqlSink) execDDLWithMaxRetries(ctx context.Context, ddl *model.DDLEvent) error { return retry.Do(ctx, func() error { err := s.execDDL(ctx, ddl) diff --git a/cdc/sink/simple_mysql_tester.go b/cdc/sink/simple_mysql_tester.go index 3b3d7edc885..d24460f53aa 100644 --- a/cdc/sink/simple_mysql_tester.go +++ b/cdc/sink/simple_mysql_tester.go @@ -104,11 +104,6 @@ func newSimpleMySQLSink(ctx context.Context, sinkURI *url.URL, config *config.Re return sink, nil } -func (s *simpleMySQLSink) Initialize(ctx context.Context, tableInfo []*model.SimpleTableInfo) error { - // do nothing - return nil -} - // EmitRowChangedEvents sends Row Changed Event to Sink // EmitRowChangedEvents may write rows to downstream directly; func (s *simpleMySQLSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error { diff --git a/cdc/sink/sink.go b/cdc/sink/sink.go index 12a381d0f9f..e908719aaab 100644 --- a/cdc/sink/sink.go +++ b/cdc/sink/sink.go @@ -33,8 +33,6 @@ const ( // Sink is an abstraction for anything that a changefeed may emit into. type Sink interface { - Initialize(ctx context.Context, tableInfo []*model.SimpleTableInfo) error - // EmitRowChangedEvents sends Row Changed Event to Sink // EmitRowChangedEvents may write rows to downstream directly; EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error diff --git a/cdc/sink/table_sink.go b/cdc/sink/table_sink.go new file mode 100644 index 00000000000..a57cc0123b6 --- /dev/null +++ b/cdc/sink/table_sink.go @@ -0,0 +1,116 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package sink + +import ( + "context" + "sort" + "sync/atomic" + + "github.com/pingcap/errors" + "github.com/pingcap/ticdc/cdc/model" + "github.com/pingcap/ticdc/cdc/redo" +) + +type tableSink struct { + tableID model.TableID + manager *Manager + buffer []*model.RowChangedEvent + // emittedTs means all of events which of commitTs less than or equal to emittedTs is sent to backendSink + emittedTs model.Ts + redoManager redo.LogManager +} + +func (t *tableSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error { + t.buffer = append(t.buffer, rows...) + t.manager.metricsTableSinkTotalRows.Add(float64(len(rows))) + if t.redoManager.Enabled() { + return t.redoManager.EmitRowChangedEvents(ctx, t.tableID, rows...) + } + return nil +} + +func (t *tableSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error { + // the table sink doesn't receive the DDL event + return nil +} + +// FlushRowChangedEvents flushes sorted rows to sink manager, note the resolvedTs +// is required to be no more than global resolvedTs, table barrierTs and table +// redo log watermarkTs. +func (t *tableSink) FlushRowChangedEvents(ctx context.Context, tableID model.TableID, resolvedTs uint64) (uint64, error) { + i := sort.Search(len(t.buffer), func(i int) bool { + return t.buffer[i].CommitTs > resolvedTs + }) + if i == 0 { + atomic.StoreUint64(&t.emittedTs, resolvedTs) + ckpt, err := t.flushRedoLogs(ctx, resolvedTs) + if err != nil { + return ckpt, err + } + return t.manager.flushBackendSink(ctx, tableID) + } + resolvedRows := t.buffer[:i] + t.buffer = append(make([]*model.RowChangedEvent, 0, len(t.buffer[i:])), t.buffer[i:]...) + + err := t.manager.backendSink.EmitRowChangedEvents(ctx, resolvedRows...) + if err != nil { + return t.manager.getCheckpointTs(tableID), errors.Trace(err) + } + atomic.StoreUint64(&t.emittedTs, resolvedTs) + ckpt, err := t.flushRedoLogs(ctx, resolvedTs) + if err != nil { + return ckpt, err + } + return t.manager.flushBackendSink(ctx, tableID) +} + +func (t *tableSink) flushRedoLogs(ctx context.Context, resolvedTs uint64) (uint64, error) { + if t.redoManager.Enabled() { + err := t.redoManager.FlushLog(ctx, t.tableID, resolvedTs) + if err != nil { + return t.manager.getCheckpointTs(t.tableID), err + } + } + return 0, nil +} + +// getResolvedTs returns resolved ts, which means all events before resolved ts +// have been sent to sink manager, if redo log is enabled, it also means all events +// before resolved ts have been persisted to redo log storage. +func (t *tableSink) getResolvedTs() uint64 { + ts := atomic.LoadUint64(&t.emittedTs) + if t.redoManager.Enabled() { + redoResolvedTs := t.redoManager.GetMinResolvedTs() + if redoResolvedTs < ts { + ts = redoResolvedTs + } + } + return ts +} + +func (t *tableSink) EmitCheckpointTs(ctx context.Context, ts uint64) error { + // the table sink doesn't receive the checkpoint event + return nil +} + +// Note once the Close is called, no more events can be written to this table sink +func (t *tableSink) Close(ctx context.Context) error { + return t.manager.destroyTableSink(ctx, t.tableID) +} + +// Barrier is not used in table sink +func (t *tableSink) Barrier(ctx context.Context, tableID model.TableID) error { + return nil +}