Skip to content

Commit

Permalink
pkg/config, sink(ticdc): support output raw change event for mq and c…
Browse files Browse the repository at this point in the history
…loud storage sink (#11226) (#11290)

close #11211
  • Loading branch information
ti-chi-bot authored Jun 11, 2024
1 parent 3426e46 commit c092599
Show file tree
Hide file tree
Showing 40 changed files with 1,304 additions and 100 deletions.
50 changes: 28 additions & 22 deletions cdc/api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,18 +354,20 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig(
SASLOAuthGrantType: c.Sink.KafkaConfig.SASLOAuthGrantType,
SASLOAuthAudience: c.Sink.KafkaConfig.SASLOAuthAudience,
LargeMessageHandle: largeMessageHandle,
OutputRawChangeEvent: c.Sink.KafkaConfig.OutputRawChangeEvent,
}
}

if c.Sink.CloudStorageConfig != nil {
res.Sink.CloudStorageConfig = &config.CloudStorageConfig{
WorkerCount: c.Sink.CloudStorageConfig.WorkerCount,
FlushInterval: c.Sink.CloudStorageConfig.FlushInterval,
FileSize: c.Sink.CloudStorageConfig.FileSize,
FlushConcurrency: c.Sink.CloudStorageConfig.FlushConcurrency,
OutputColumnID: c.Sink.CloudStorageConfig.OutputColumnID,
FileExpirationDays: c.Sink.CloudStorageConfig.FileExpirationDays,
FileCleanupCronSpec: c.Sink.CloudStorageConfig.FileCleanupCronSpec,
WorkerCount: c.Sink.CloudStorageConfig.WorkerCount,
FlushInterval: c.Sink.CloudStorageConfig.FlushInterval,
FileSize: c.Sink.CloudStorageConfig.FileSize,
FlushConcurrency: c.Sink.CloudStorageConfig.FlushConcurrency,
OutputColumnID: c.Sink.CloudStorageConfig.OutputColumnID,
FileExpirationDays: c.Sink.CloudStorageConfig.FileExpirationDays,
FileCleanupCronSpec: c.Sink.CloudStorageConfig.FileCleanupCronSpec,
OutputRawChangeEvent: c.Sink.CloudStorageConfig.OutputRawChangeEvent,
}
}
}
Expand Down Expand Up @@ -502,18 +504,20 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
SASLOAuthGrantType: cloned.Sink.KafkaConfig.SASLOAuthGrantType,
SASLOAuthAudience: cloned.Sink.KafkaConfig.SASLOAuthAudience,
LargeMessageHandle: largeMessageHandle,
OutputRawChangeEvent: cloned.Sink.KafkaConfig.OutputRawChangeEvent,
}
}

if cloned.Sink.CloudStorageConfig != nil {
res.Sink.CloudStorageConfig = &CloudStorageConfig{
WorkerCount: cloned.Sink.CloudStorageConfig.WorkerCount,
FlushInterval: cloned.Sink.CloudStorageConfig.FlushInterval,
FileSize: cloned.Sink.CloudStorageConfig.FileSize,
FlushConcurrency: cloned.Sink.CloudStorageConfig.FlushConcurrency,
OutputColumnID: cloned.Sink.CloudStorageConfig.OutputColumnID,
FileExpirationDays: cloned.Sink.CloudStorageConfig.FileExpirationDays,
FileCleanupCronSpec: cloned.Sink.CloudStorageConfig.FileCleanupCronSpec,
WorkerCount: cloned.Sink.CloudStorageConfig.WorkerCount,
FlushInterval: cloned.Sink.CloudStorageConfig.FlushInterval,
FileSize: cloned.Sink.CloudStorageConfig.FileSize,
FlushConcurrency: cloned.Sink.CloudStorageConfig.FlushConcurrency,
OutputColumnID: cloned.Sink.CloudStorageConfig.OutputColumnID,
FileExpirationDays: cloned.Sink.CloudStorageConfig.FileExpirationDays,
FileCleanupCronSpec: cloned.Sink.CloudStorageConfig.FileCleanupCronSpec,
OutputRawChangeEvent: cloned.Sink.CloudStorageConfig.OutputRawChangeEvent,
}
}
}
Expand Down Expand Up @@ -679,18 +683,20 @@ type KafkaConfig struct {
SASLOAuthGrantType *string `json:"sasl_oauth_grant_type,omitempty"`
SASLOAuthAudience *string `json:"sasl_oauth_audience,omitempty"`

LargeMessageHandle *LargeMessageHandleConfig `json:"large_message_handle,omitempty"`
LargeMessageHandle *LargeMessageHandleConfig `json:"large_message_handle,omitempty"`
OutputRawChangeEvent *bool `json:"output_raw_change_event,omitempty"`
}

// CloudStorageConfig represents a cloud storage sink configuration
type CloudStorageConfig struct {
WorkerCount *int `json:"worker_count,omitempty"`
FlushInterval *string `json:"flush_interval,omitempty"`
FileSize *int `json:"file_size,omitempty"`
FlushConcurrency *int `json:"flush_concurrency,omitempty"`
OutputColumnID *bool `json:"output_column_id,omitempty"`
FileExpirationDays *int `json:"file_expiration_days,omitempty"`
FileCleanupCronSpec *string `json:"file_cleanup_cron_spec,omitempty"`
WorkerCount *int `json:"worker_count,omitempty"`
FlushInterval *string `json:"flush_interval,omitempty"`
FileSize *int `json:"file_size,omitempty"`
FlushConcurrency *int `json:"flush_concurrency,omitempty"`
OutputColumnID *bool `json:"output_column_id,omitempty"`
FileExpirationDays *int `json:"file_expiration_days,omitempty"`
FileCleanupCronSpec *string `json:"file_cleanup_cron_spec,omitempty"`
OutputRawChangeEvent *bool `json:"output_raw_change_event,omitempty"`
}

// CSVConfig denotes the csv config
Expand Down
44 changes: 20 additions & 24 deletions cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ func (r *RedoLog) GetCommitTs() Ts {
}

// TrySplitAndSortUpdateEvent redo log do nothing
func (r *RedoLog) TrySplitAndSortUpdateEvent(sinkScheme string) error {
func (r *RedoLog) TrySplitAndSortUpdateEvent(_ string, _ bool) error {
return nil
}

Expand Down Expand Up @@ -377,7 +377,7 @@ func (r *RowChangedEvent) GetCommitTs() uint64 {
}

// TrySplitAndSortUpdateEvent do nothing
func (r *RowChangedEvent) TrySplitAndSortUpdateEvent(sinkScheme string) error {
func (r *RowChangedEvent) TrySplitAndSortUpdateEvent(_ string, _ bool) error {
return nil
}

Expand Down Expand Up @@ -794,11 +794,19 @@ func (t *SingleTableTxn) GetCommitTs() uint64 {
}

// TrySplitAndSortUpdateEvent split update events if unique key is updated
func (t *SingleTableTxn) TrySplitAndSortUpdateEvent(sinkScheme string) error {
if !t.shouldSplitUpdateEvent(sinkScheme) {
func (t *SingleTableTxn) TrySplitAndSortUpdateEvent(scheme string, outputRawChangeEvent bool) error {
if sink.IsMySQLCompatibleScheme(scheme) || outputRawChangeEvent {
// For MySQL Sink, all update events will be split into insert and delete at the puller side
// according to whether the changefeed is in safemode. We don't split update event here(in sink)
// since there may be OOM issues. For more information, ref https://github.com/tikv/tikv/issues/17062.
//
// For the Kafka and Storage sink, the outputRawChangeEvent parameter is introduced to control
// split behavior. TiCDC only output original change event if outputRawChangeEvent is true.
return nil
}

// Try to split update events for the Kafka and Storage sink if outputRawChangeEvent is false.
// Note it is only for backward compatibility, and we should remove this logic in the future.
newRows, err := trySplitAndSortUpdateEvent(t.Rows)
if err != nil {
return errors.Trace(err)
Expand All @@ -807,21 +815,6 @@ func (t *SingleTableTxn) TrySplitAndSortUpdateEvent(sinkScheme string) error {
return nil
}

// Whether split a single update event into delete and insert events?
//
// For the MySQL Sink, we don't split any update event.
// This may cause error like "duplicate entry" when sink to the downstream.
// This kind of error will cause the changefeed to restart,
// and then the related update rows will be splitted to insert and delete at puller side.
//
// For the Kafka and Storage sink, always split a single unique key changed update event, since:
// 1. Avro and CSV does not output the previous column values for the update event, so it would
// cause consumer missing data if the unique key changed event is not split.
// 2. Index-Value Dispatcher cannot work correctly if the unique key changed event isn't split.
func (t *SingleTableTxn) shouldSplitUpdateEvent(sinkScheme string) bool {
return !sink.IsMySQLCompatibleScheme(sinkScheme)
}

// trySplitAndSortUpdateEvent try to split update events if unique key is updated
// returns true if some updated events is split
func trySplitAndSortUpdateEvent(
Expand All @@ -831,8 +824,7 @@ func trySplitAndSortUpdateEvent(
split := false
for _, e := range events {
if e == nil {
log.Warn("skip emit nil event",
zap.Any("event", e))
log.Warn("skip emit nil event", zap.Any("event", e))
continue
}

Expand All @@ -842,8 +834,7 @@ func trySplitAndSortUpdateEvent(
// begin; insert into t (id) values (1); delete from t where id=1; commit;
// Just ignore these row changed events.
if colLen == 0 && preColLen == 0 {
log.Warn("skip emit empty row event",
zap.Any("event", e))
log.Warn("skip emit empty row event", zap.Any("event", e))
continue
}

Expand All @@ -869,7 +860,7 @@ func trySplitAndSortUpdateEvent(

// ShouldSplitUpdateEvent determines if the split event is needed to align the old format based on
// whether the handle key column or unique key has been modified.
// If is modified, we need to use splitUpdateEvent to split the update event into a delete and an insert event.
// If is modified, we need to use splitUpdateEvent to split the update event into a delete and an insert event.
func ShouldSplitUpdateEvent(updateEvent *RowChangedEvent) bool {
// nil event will never be split.
if updateEvent == nil {
Expand Down Expand Up @@ -912,6 +903,11 @@ func SplitUpdateEvent(
// NOTICE: clean up pre cols for insert event.
insertEvent.PreColumns = nil

log.Debug("split update event", zap.Uint64("startTs", updateEvent.StartTs),
zap.Uint64("commitTs", updateEvent.CommitTs),
zap.Any("preCols", updateEvent.PreColumns),
zap.Any("cols", updateEvent.Columns))

return &deleteEvent, &insertEvent, nil
}

Expand Down
26 changes: 23 additions & 3 deletions cdc/model/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,7 @@ func TestTrySplitAndSortUpdateEvent(t *testing.T) {

events := []*RowChangedEvent{
{
TableInfo: &TableInfo{},
CommitTs: 1,
Columns: columns,
PreColumns: preColumns,
Expand Down Expand Up @@ -573,6 +574,7 @@ func TestTrySplitAndSortUpdateEvent(t *testing.T) {

events = []*RowChangedEvent{
{
TableInfo: &TableInfo{},
CommitTs: 1,
Columns: columns,
PreColumns: preColumns,
Expand Down Expand Up @@ -613,6 +615,7 @@ func TestTrySplitAndSortUpdateEvent(t *testing.T) {

events = []*RowChangedEvent{
{
TableInfo: &TableInfo{},
CommitTs: 1,
Columns: columns,
PreColumns: preColumns,
Expand All @@ -624,6 +627,12 @@ func TestTrySplitAndSortUpdateEvent(t *testing.T) {
}

var ukUpdatedEvent = &RowChangedEvent{
TableInfo: &TableInfo{
TableName: TableName{
Schema: "test",
Table: "t1",
},
},
PreColumns: []*Column{
{
Name: "col1",
Expand Down Expand Up @@ -656,21 +665,32 @@ func TestTrySplitAndSortUpdateEventOne(t *testing.T) {
Rows: []*RowChangedEvent{ukUpdatedEvent},
}

err := txn.TrySplitAndSortUpdateEvent(sink.KafkaScheme)
outputRawChangeEvent := true
notOutputRawChangeEvent := false
err := txn.TrySplitAndSortUpdateEvent(sink.KafkaScheme, outputRawChangeEvent)
require.NoError(t, err)
require.Len(t, txn.Rows, 1)
err = txn.TrySplitAndSortUpdateEvent(sink.KafkaScheme, notOutputRawChangeEvent)
require.NoError(t, err)
require.Len(t, txn.Rows, 2)

txn = &SingleTableTxn{
Rows: []*RowChangedEvent{ukUpdatedEvent},
}
err = txn.TrySplitAndSortUpdateEvent(sink.MySQLScheme)
err = txn.TrySplitAndSortUpdateEvent(sink.MySQLScheme, outputRawChangeEvent)
require.NoError(t, err)
require.Len(t, txn.Rows, 1)
err = txn.TrySplitAndSortUpdateEvent(sink.MySQLScheme, notOutputRawChangeEvent)
require.NoError(t, err)
require.Len(t, txn.Rows, 1)

txn2 := &SingleTableTxn{
Rows: []*RowChangedEvent{ukUpdatedEvent, ukUpdatedEvent},
}
err = txn.TrySplitAndSortUpdateEvent(sink.MySQLScheme)
err = txn.TrySplitAndSortUpdateEvent(sink.MySQLScheme, outputRawChangeEvent)
require.NoError(t, err)
require.Len(t, txn2.Rows, 2)
err = txn.TrySplitAndSortUpdateEvent(sink.MySQLScheme, notOutputRawChangeEvent)
require.NoError(t, err)
require.Len(t, txn2.Rows, 2)
}
4 changes: 4 additions & 0 deletions cdc/processor/sinkmanager/table_sink_wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ func (m *mockSink) WriteEvents(events ...*eventsink.CallbackableEvent[*model.Row
return nil
}

func (m *mockSink) SchemeOption() (string, bool) {
return sink.BlackHoleScheme, false
}

func (m *mockSink) GetEvents() []*eventsink.CallbackableEvent[*model.RowChangedEvent] {
m.mu.Lock()
defer m.mu.Unlock()
Expand Down
6 changes: 3 additions & 3 deletions cdc/sinkv2/eventsink/blackhole/black_hole_dml_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ func (s *Sink) WriteEvents(rows ...*eventsink.CallbackableEvent[*model.RowChange
return
}

// Scheme returns the sink scheme.
func (s *Sink) Scheme() string {
return sink.BlackHoleScheme
// SchemeOption returns the scheme and the option.
func (s *Sink) SchemeOption() (string, bool) {
return sink.BlackHoleScheme, true
}

// Close do nothing.
Expand Down
26 changes: 14 additions & 12 deletions cdc/sinkv2/eventsink/cloudstorage/cloud_storage_dml_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,9 @@ type eventFragment struct {
// dmlSink is the cloud storage sink.
// It will send the events to cloud storage systems.
type dmlSink struct {
changefeedID model.ChangeFeedID
scheme string
changefeedID model.ChangeFeedID
scheme string
outputRawChangeEvent bool
// last sequence number
lastSeqNum uint64
// encodingWorkers defines a group of workers for encoding events.
Expand Down Expand Up @@ -133,13 +134,14 @@ func NewCloudStorageSink(

wgCtx, wgCancel := context.WithCancel(ctx)
s := &dmlSink{
changefeedID: contextutil.ChangefeedIDFromCtx(wgCtx),
scheme: strings.ToLower(sinkURI.Scheme),
encodingWorkers: make([]*encodingWorker, defaultEncodingConcurrency),
workers: make([]*dmlWorker, cfg.WorkerCount),
statistics: metrics.NewStatistics(wgCtx, sink.TxnSink),
cancel: wgCancel,
dead: make(chan struct{}),
changefeedID: contextutil.ChangefeedIDFromCtx(wgCtx),
scheme: strings.ToLower(sinkURI.Scheme),
outputRawChangeEvent: replicaConfig.Sink.CloudStorageConfig.GetOutputRawChangeEvent(),
encodingWorkers: make([]*encodingWorker, defaultEncodingConcurrency),
workers: make([]*dmlWorker, cfg.WorkerCount),
statistics: metrics.NewStatistics(wgCtx, sink.TxnSink),
cancel: wgCancel,
dead: make(chan struct{}),
}
s.alive.msgCh = chann.NewDrainableChann[eventFragment]()

Expand Down Expand Up @@ -244,9 +246,9 @@ func (s *dmlSink) WriteEvents(txns ...*eventsink.CallbackableEvent[*model.Single
return nil
}

// Scheme returns the sink scheme.
func (s *dmlSink) Scheme() string {
return s.scheme
// SchemeOption returns the scheme and the option.
func (s *dmlSink) SchemeOption() (string, bool) {
return s.scheme, s.outputRawChangeEvent
}

// Close closes the cloud storage sink.
Expand Down
5 changes: 2 additions & 3 deletions cdc/sinkv2/eventsink/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,8 @@ import (
type TableEvent interface {
// GetCommitTs returns the commit timestamp of the event.
GetCommitTs() uint64
// TrySplitAndSortUpdateEvent split the update to delete and insert if the unique key is updated.
// Note that sinkScheme is used to control the split behavior.
TrySplitAndSortUpdateEvent(sinkScheme string) error
// TrySplitAndSortUpdateEvent split the update to delete and insert if the unique key is updated
TrySplitAndSortUpdateEvent(scheme string, outputRawChangeEvent bool) error
}

// CallbackFunc is the callback function for callbackable event.
Expand Down
4 changes: 2 additions & 2 deletions cdc/sinkv2/eventsink/event_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ type EventSink[E TableEvent] interface {
// WriteEvents writes events to the sink.
// This is an asynchronously and thread-safe method.
WriteEvents(events ...*CallbackableEvent[E]) error
// Scheme returns the sink scheme.
Scheme() string
// SchemeOption returns the sink scheme and whether the sink should output raw change event.
SchemeOption() (scheme string, outputRawChangeEvent bool)
// Close closes the sink. Can be called with `WriteEvents` concurrently.
Close()
// The EventSink meets internal errors and has been dead already.
Expand Down
2 changes: 1 addition & 1 deletion cdc/sinkv2/eventsink/mq/kafka_dml_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func NewKafkaDMLSink(
}

s, err := newSink(ctx, sinkURI, p, topicManager, eventRouter, encoderConfig,
replicaConfig.Sink.EncoderConcurrency, errCh)
replicaConfig.Sink.EncoderConcurrency, replicaConfig.Sink.KafkaConfig.GetOutputRawChangeEvent(), errCh)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
Loading

0 comments on commit c092599

Please sign in to comment.