Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#3682
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
3AceShowHand authored and ti-chi-bot committed Dec 7, 2021
1 parent f18635a commit 568bbb6
Show file tree
Hide file tree
Showing 11 changed files with 121 additions and 65 deletions.
5 changes: 0 additions & 5 deletions cdc/owner/async_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ const (
// The EmitCheckpointTs and EmitDDLEvent is asynchronous function for now
// Other functions are still synchronization
type AsyncSink interface {
Initialize(ctx cdcContext.Context, tableInfo []*model.SimpleTableInfo) error
// EmitCheckpointTs emits the checkpoint Ts to downstream data source
// this function will return after recording the checkpointTs specified in memory immediately
// and the recorded checkpointTs will be sent and updated to downstream data source every second
Expand Down Expand Up @@ -101,10 +100,6 @@ func newAsyncSink(ctx cdcContext.Context) (AsyncSink, error) {
return asyncSink, nil
}

func (s *asyncSinkImpl) Initialize(ctx cdcContext.Context, tableInfo []*model.SimpleTableInfo) error {
return s.sink.Initialize(ctx, tableInfo)
}

func (s *asyncSinkImpl) run(ctx cdcContext.Context) {
defer s.wg.Done()
// TODO make the tick duration configurable
Expand Down
25 changes: 4 additions & 21 deletions cdc/owner/async_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,10 @@ type asyncSinkSuite struct{}

type mockSink struct {
sink.Sink
initTableInfo []*model.SimpleTableInfo
checkpointTs model.Ts
ddl *model.DDLEvent
ddlMu sync.Mutex
ddlError error
}

func (m *mockSink) Initialize(ctx context.Context, tableInfo []*model.SimpleTableInfo) error {
m.initTableInfo = tableInfo
return nil
checkpointTs model.Ts
ddl *model.DDLEvent
ddlMu sync.Mutex
ddlError error
}

func (m *mockSink) EmitCheckpointTs(ctx context.Context, ts uint64) error {
Expand Down Expand Up @@ -87,17 +81,6 @@ func newAsyncSink4Test(ctx cdcContext.Context, c *check.C) (cdcContext.Context,
return ctx, sink, mockSink
}

func (s *asyncSinkSuite) TestInitialize(c *check.C) {
defer testleak.AfterTest(c)()
ctx := cdcContext.NewBackendContext4Test(false)
ctx, sink, mockSink := newAsyncSink4Test(ctx, c)
defer sink.Close(ctx)
tableInfos := []*model.SimpleTableInfo{{Schema: "test"}}
err := sink.Initialize(ctx, tableInfos)
c.Assert(err, check.IsNil)
c.Assert(tableInfos, check.DeepEquals, mockSink.initTableInfo)
}

func (s *asyncSinkSuite) TestCheckpoint(c *check.C) {
defer testleak.AfterTest(c)()
ctx := cdcContext.NewBackendContext4Test(false)
Expand Down
5 changes: 1 addition & 4 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,10 +249,7 @@ LOOP:
if err != nil {
return errors.Trace(err)
}
err = c.sink.Initialize(cancelCtx, c.schema.SinkTableInfos())
if err != nil {
return errors.Trace(err)
}

// Refer to the previous comment on why we use (checkpointTs-1).
c.ddlPuller, err = c.newDDLPuller(cancelCtx, checkpointTs-1)
if err != nil {
Expand Down
4 changes: 0 additions & 4 deletions cdc/processor/pipeline/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,6 @@ func (c *mockFlowController) GetConsumption() uint64 {
return 0
}

func (s *mockSink) Initialize(ctx context.Context, tableInfo []*model.SimpleTableInfo) error {
return nil
}

func (s *mockSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error {
for _, row := range rows {
s.received = append(s.received, struct {
Expand Down
5 changes: 0 additions & 5 deletions cdc/sink/black_hole.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,6 @@ func (b *blackHoleSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) e
return nil
}

// Initialize is no-op for blackhole
func (b *blackHoleSink) Initialize(ctx context.Context, tableInfo []*model.SimpleTableInfo) error {
return nil
}

func (b *blackHoleSink) Close(ctx context.Context) error {
return nil
}
Expand Down
8 changes: 0 additions & 8 deletions cdc/sink/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,6 @@ type checkSink struct {
lastResolvedTs uint64
}

func (c *checkSink) Initialize(ctx context.Context, tableInfo []*model.SimpleTableInfo) error {
panic("unreachable")
}

func (c *checkSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error {
c.rowsMu.Lock()
defer c.rowsMu.Unlock()
Expand Down Expand Up @@ -331,10 +327,6 @@ type errorSink struct {
*check.C
}

func (e *errorSink) Initialize(ctx context.Context, tableInfo []*model.SimpleTableInfo) error {
panic("unreachable")
}

func (e *errorSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error {
return errors.New("error in emit row changed events")
}
Expand Down
6 changes: 0 additions & 6 deletions cdc/sink/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,12 +262,6 @@ func (k *mqSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error {
return errors.Trace(err)
}

// Initialize registers Avro schemas for all tables
func (k *mqSink) Initialize(ctx context.Context, tableInfo []*model.SimpleTableInfo) error {
// No longer need it for now
return nil
}

func (k *mqSink) Close(ctx context.Context) error {
err := k.mqProducer.Close()
return errors.Trace(err)
Expand Down
5 changes: 0 additions & 5 deletions cdc/sink/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,11 +202,6 @@ func (s *mysqlSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error
return errors.Trace(err)
}

// Initialize is no-op for Mysql sink
func (s *mysqlSink) Initialize(ctx context.Context, tableInfo []*model.SimpleTableInfo) error {
return nil
}

func (s *mysqlSink) execDDLWithMaxRetries(ctx context.Context, ddl *model.DDLEvent) error {
return retry.Do(ctx, func() error {
err := s.execDDL(ctx, ddl)
Expand Down
5 changes: 0 additions & 5 deletions cdc/sink/simple_mysql_tester.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,6 @@ func newSimpleMySQLSink(ctx context.Context, sinkURI *url.URL, config *config.Re
return sink, nil
}

func (s *simpleMySQLSink) Initialize(ctx context.Context, tableInfo []*model.SimpleTableInfo) error {
// do nothing
return nil
}

// EmitRowChangedEvents sends Row Changed Event to Sink
// EmitRowChangedEvents may write rows to downstream directly;
func (s *simpleMySQLSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error {
Expand Down
2 changes: 0 additions & 2 deletions cdc/sink/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ const (

// Sink is an abstraction for anything that a changefeed may emit into.
type Sink interface {
Initialize(ctx context.Context, tableInfo []*model.SimpleTableInfo) error

// EmitRowChangedEvents sends Row Changed Event to Sink
// EmitRowChangedEvents may write rows to downstream directly;
EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error
Expand Down
116 changes: 116 additions & 0 deletions cdc/sink/table_sink.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package sink

import (
"context"
"sort"
"sync/atomic"

"github.com/pingcap/errors"
"github.com/pingcap/ticdc/cdc/model"
"github.com/pingcap/ticdc/cdc/redo"
)

type tableSink struct {
tableID model.TableID
manager *Manager
buffer []*model.RowChangedEvent
// emittedTs means all of events which of commitTs less than or equal to emittedTs is sent to backendSink
emittedTs model.Ts
redoManager redo.LogManager
}

func (t *tableSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error {
t.buffer = append(t.buffer, rows...)
t.manager.metricsTableSinkTotalRows.Add(float64(len(rows)))
if t.redoManager.Enabled() {
return t.redoManager.EmitRowChangedEvents(ctx, t.tableID, rows...)
}
return nil
}

func (t *tableSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error {
// the table sink doesn't receive the DDL event
return nil
}

// FlushRowChangedEvents flushes sorted rows to sink manager, note the resolvedTs
// is required to be no more than global resolvedTs, table barrierTs and table
// redo log watermarkTs.
func (t *tableSink) FlushRowChangedEvents(ctx context.Context, tableID model.TableID, resolvedTs uint64) (uint64, error) {
i := sort.Search(len(t.buffer), func(i int) bool {
return t.buffer[i].CommitTs > resolvedTs
})
if i == 0 {
atomic.StoreUint64(&t.emittedTs, resolvedTs)
ckpt, err := t.flushRedoLogs(ctx, resolvedTs)
if err != nil {
return ckpt, err
}
return t.manager.flushBackendSink(ctx, tableID)
}
resolvedRows := t.buffer[:i]
t.buffer = append(make([]*model.RowChangedEvent, 0, len(t.buffer[i:])), t.buffer[i:]...)

err := t.manager.backendSink.EmitRowChangedEvents(ctx, resolvedRows...)
if err != nil {
return t.manager.getCheckpointTs(tableID), errors.Trace(err)
}
atomic.StoreUint64(&t.emittedTs, resolvedTs)
ckpt, err := t.flushRedoLogs(ctx, resolvedTs)
if err != nil {
return ckpt, err
}
return t.manager.flushBackendSink(ctx, tableID)
}

func (t *tableSink) flushRedoLogs(ctx context.Context, resolvedTs uint64) (uint64, error) {
if t.redoManager.Enabled() {
err := t.redoManager.FlushLog(ctx, t.tableID, resolvedTs)
if err != nil {
return t.manager.getCheckpointTs(t.tableID), err
}
}
return 0, nil
}

// getResolvedTs returns resolved ts, which means all events before resolved ts
// have been sent to sink manager, if redo log is enabled, it also means all events
// before resolved ts have been persisted to redo log storage.
func (t *tableSink) getResolvedTs() uint64 {
ts := atomic.LoadUint64(&t.emittedTs)
if t.redoManager.Enabled() {
redoResolvedTs := t.redoManager.GetMinResolvedTs()
if redoResolvedTs < ts {
ts = redoResolvedTs
}
}
return ts
}

func (t *tableSink) EmitCheckpointTs(ctx context.Context, ts uint64) error {
// the table sink doesn't receive the checkpoint event
return nil
}

// Note once the Close is called, no more events can be written to this table sink
func (t *tableSink) Close(ctx context.Context) error {
return t.manager.destroyTableSink(ctx, t.tableID)
}

// Barrier is not used in table sink
func (t *tableSink) Barrier(ctx context.Context, tableID model.TableID) error {
return nil
}

0 comments on commit 568bbb6

Please sign in to comment.