Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pkg/config, sink(ticdc): support output raw change event for mq and cloud storage sink #11226

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 30 additions & 21 deletions cdc/api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,7 @@
BasicPassword: c.Sink.PulsarConfig.BasicPassword,
AuthTLSCertificatePath: c.Sink.PulsarConfig.AuthTLSCertificatePath,
AuthTLSPrivateKeyPath: c.Sink.PulsarConfig.AuthTLSPrivateKeyPath,
OutputRawChangeEvent: c.Sink.PulsarConfig.OutputRawChangeEvent,

Check warning on line 320 in cdc/api/v2/model.go

View check run for this annotation

Codecov / codecov/patch

cdc/api/v2/model.go#L320

Added line #L320 was not covered by tests
}
if c.Sink.PulsarConfig.OAuth2 != nil {
pulsarConfig.OAuth2 = &config.OAuth2{
Expand Down Expand Up @@ -402,6 +403,7 @@
CodecConfig: codeConfig,
LargeMessageHandle: largeMessageHandle,
GlueSchemaRegistryConfig: glueSchemaRegistryConfig,
OutputRawChangeEvent: c.Sink.KafkaConfig.OutputRawChangeEvent,

Check warning on line 406 in cdc/api/v2/model.go

View check run for this annotation

Codecov / codecov/patch

cdc/api/v2/model.go#L406

Added line #L406 was not covered by tests
}
}
var mysqlConfig *config.MySQLConfig
Expand All @@ -427,13 +429,14 @@
var cloudStorageConfig *config.CloudStorageConfig
if c.Sink.CloudStorageConfig != nil {
cloudStorageConfig = &config.CloudStorageConfig{
WorkerCount: c.Sink.CloudStorageConfig.WorkerCount,
FlushInterval: c.Sink.CloudStorageConfig.FlushInterval,
FileSize: c.Sink.CloudStorageConfig.FileSize,
OutputColumnID: c.Sink.CloudStorageConfig.OutputColumnID,
FileExpirationDays: c.Sink.CloudStorageConfig.FileExpirationDays,
FileCleanupCronSpec: c.Sink.CloudStorageConfig.FileCleanupCronSpec,
FlushConcurrency: c.Sink.CloudStorageConfig.FlushConcurrency,
WorkerCount: c.Sink.CloudStorageConfig.WorkerCount,
FlushInterval: c.Sink.CloudStorageConfig.FlushInterval,
FileSize: c.Sink.CloudStorageConfig.FileSize,
OutputColumnID: c.Sink.CloudStorageConfig.OutputColumnID,
FileExpirationDays: c.Sink.CloudStorageConfig.FileExpirationDays,
FileCleanupCronSpec: c.Sink.CloudStorageConfig.FileCleanupCronSpec,
FlushConcurrency: c.Sink.CloudStorageConfig.FlushConcurrency,
OutputRawChangeEvent: c.Sink.CloudStorageConfig.OutputRawChangeEvent,

Check warning on line 439 in cdc/api/v2/model.go

View check run for this annotation

Codecov / codecov/patch

cdc/api/v2/model.go#L432-L439

Added lines #L432 - L439 were not covered by tests
}
}
var debeziumConfig *config.DebeziumConfig
Expand Down Expand Up @@ -666,6 +669,7 @@
CodecConfig: codeConfig,
LargeMessageHandle: largeMessageHandle,
GlueSchemaRegistryConfig: glueSchemaRegistryConfig,
OutputRawChangeEvent: cloned.Sink.KafkaConfig.OutputRawChangeEvent,

Check warning on line 672 in cdc/api/v2/model.go

View check run for this annotation

Codecov / codecov/patch

cdc/api/v2/model.go#L672

Added line #L672 was not covered by tests
}
}
var mysqlConfig *MySQLConfig
Expand Down Expand Up @@ -708,6 +712,7 @@
BasicPassword: cloned.Sink.PulsarConfig.BasicPassword,
AuthTLSCertificatePath: cloned.Sink.PulsarConfig.AuthTLSCertificatePath,
AuthTLSPrivateKeyPath: cloned.Sink.PulsarConfig.AuthTLSPrivateKeyPath,
OutputRawChangeEvent: cloned.Sink.PulsarConfig.OutputRawChangeEvent,

Check warning on line 715 in cdc/api/v2/model.go

View check run for this annotation

Codecov / codecov/patch

cdc/api/v2/model.go#L715

Added line #L715 was not covered by tests
}
if cloned.Sink.PulsarConfig.OAuth2 != nil {
pulsarConfig.OAuth2 = &PulsarOAuth2{
Expand All @@ -722,13 +727,14 @@
var cloudStorageConfig *CloudStorageConfig
if cloned.Sink.CloudStorageConfig != nil {
cloudStorageConfig = &CloudStorageConfig{
WorkerCount: cloned.Sink.CloudStorageConfig.WorkerCount,
FlushInterval: cloned.Sink.CloudStorageConfig.FlushInterval,
FileSize: cloned.Sink.CloudStorageConfig.FileSize,
OutputColumnID: cloned.Sink.CloudStorageConfig.OutputColumnID,
FileExpirationDays: cloned.Sink.CloudStorageConfig.FileExpirationDays,
FileCleanupCronSpec: cloned.Sink.CloudStorageConfig.FileCleanupCronSpec,
FlushConcurrency: cloned.Sink.CloudStorageConfig.FlushConcurrency,
WorkerCount: cloned.Sink.CloudStorageConfig.WorkerCount,
FlushInterval: cloned.Sink.CloudStorageConfig.FlushInterval,
FileSize: cloned.Sink.CloudStorageConfig.FileSize,
OutputColumnID: cloned.Sink.CloudStorageConfig.OutputColumnID,
FileExpirationDays: cloned.Sink.CloudStorageConfig.FileExpirationDays,
FileCleanupCronSpec: cloned.Sink.CloudStorageConfig.FileCleanupCronSpec,
FlushConcurrency: cloned.Sink.CloudStorageConfig.FlushConcurrency,
OutputRawChangeEvent: cloned.Sink.CloudStorageConfig.OutputRawChangeEvent,

Check warning on line 737 in cdc/api/v2/model.go

View check run for this annotation

Codecov / codecov/patch

cdc/api/v2/model.go#L730-L737

Added lines #L730 - L737 were not covered by tests
}
}
var debeziumConfig *DebeziumConfig
Expand Down Expand Up @@ -1194,6 +1200,7 @@
AuthTLSCertificatePath *string `json:"auth-tls-certificate-path,omitempty"`
AuthTLSPrivateKeyPath *string `json:"auth-tls-private-key-path,omitempty"`
OAuth2 *PulsarOAuth2 `json:"oauth2,omitempty"`
OutputRawChangeEvent *bool `json:"output-raw-change-event,omitempty"`
}

// PulsarOAuth2 is the configuration for OAuth2
Expand Down Expand Up @@ -1243,6 +1250,7 @@
CodecConfig *CodecConfig `json:"codec_config,omitempty"`
LargeMessageHandle *LargeMessageHandleConfig `json:"large_message_handle,omitempty"`
GlueSchemaRegistryConfig *GlueSchemaRegistryConfig `json:"glue_schema_registry_config,omitempty"`
OutputRawChangeEvent *bool `json:"output_raw_change_event,omitempty"`
}

// MySQLConfig represents a MySQL sink configuration
Expand All @@ -1266,13 +1274,14 @@

// CloudStorageConfig represents a cloud storage sink configuration
type CloudStorageConfig struct {
WorkerCount *int `json:"worker_count,omitempty"`
FlushInterval *string `json:"flush_interval,omitempty"`
FileSize *int `json:"file_size,omitempty"`
OutputColumnID *bool `json:"output_column_id,omitempty"`
FileExpirationDays *int `json:"file_expiration_days,omitempty"`
FileCleanupCronSpec *string `json:"file_cleanup_cron_spec,omitempty"`
FlushConcurrency *int `json:"flush_concurrency,omitempty"`
WorkerCount *int `json:"worker_count,omitempty"`
FlushInterval *string `json:"flush_interval,omitempty"`
FileSize *int `json:"file_size,omitempty"`
OutputColumnID *bool `json:"output_column_id,omitempty"`
FileExpirationDays *int `json:"file_expiration_days,omitempty"`
FileCleanupCronSpec *string `json:"file_cleanup_cron_spec,omitempty"`
FlushConcurrency *int `json:"flush_concurrency,omitempty"`
OutputRawChangeEvent *bool `json:"output_raw_change_event,omitempty"`
}

// ChangefeedStatus holds common information of a changefeed in cdc
Expand Down
47 changes: 23 additions & 24 deletions cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@
}

// TrySplitAndSortUpdateEvent redo log do nothing
func (r *RedoLog) TrySplitAndSortUpdateEvent(_ string) error {
func (r *RedoLog) TrySplitAndSortUpdateEvent(_ string, _ bool) error {

Check warning on line 275 in cdc/model/sink.go

View check run for this annotation

Codecov / codecov/patch

cdc/model/sink.go#L275

Added line #L275 was not covered by tests
return nil
}

Expand Down Expand Up @@ -444,7 +444,7 @@
}

// TrySplitAndSortUpdateEvent do nothing
func (r *RowChangedEvent) TrySplitAndSortUpdateEvent(_ string) error {
func (r *RowChangedEvent) TrySplitAndSortUpdateEvent(_ string, _ bool) error {

Check warning on line 447 in cdc/model/sink.go

View check run for this annotation

Codecov / codecov/patch

cdc/model/sink.go#L447

Added line #L447 was not covered by tests
return nil
}

Expand Down Expand Up @@ -1140,10 +1140,19 @@
}

// TrySplitAndSortUpdateEvent split update events if unique key is updated
func (t *SingleTableTxn) TrySplitAndSortUpdateEvent(scheme string) error {
if !t.shouldSplitUpdateEvent(scheme) {
func (t *SingleTableTxn) TrySplitAndSortUpdateEvent(scheme string, outputRawChangeEvent bool) error {
if sink.IsMySQLCompatibleScheme(scheme) || outputRawChangeEvent {
// For MySQL Sink, all update events will be split into insert and delete at the puller side
// according to whether the changefeed is in safemode. We don't split update event here(in sink)
// since there may be OOM issues. For more information, ref https://github.com/tikv/tikv/issues/17062.
//
// For the Kafka and Storage sink, the outputRawChangeEvent parameter is introduced to control
// split behavior. TiCDC only output original change event if outputRawChangeEvent is true.
return nil
}

// Try to split update events for the Kafka and Storage sink if outputRawChangeEvent is false.
// Note it is only for backward compatibility, and we should remove this logic in the future.
newRows, err := trySplitAndSortUpdateEvent(t.Rows)
if err != nil {
return errors.Trace(err)
Expand All @@ -1152,21 +1161,6 @@
return nil
}

// Whether split a single update event into delete and insert events?
//
// For the MySQL Sink, we don't split any update event.
// This may cause error like "duplicate entry" when sink to the downstream.
// This kind of error will cause the changefeed to restart,
// and then the related update rows will be splitted to insert and delete at puller side.
//
// For the Kafka and Storage sink, always split a single unique key changed update event, since:
// 1. Avro and CSV does not output the previous column values for the update event, so it would
// cause consumer missing data if the unique key changed event is not split.
// 2. Index-Value Dispatcher cannot work correctly if the unique key changed event isn't split.
func (t *SingleTableTxn) shouldSplitUpdateEvent(sinkScheme string) bool {
return !sink.IsMySQLCompatibleScheme(sinkScheme)
}

// trySplitAndSortUpdateEvent try to split update events if unique key is updated
// returns true if some updated events is split
func trySplitAndSortUpdateEvent(
Expand All @@ -1176,8 +1170,7 @@
split := false
for _, e := range events {
if e == nil {
log.Warn("skip emit nil event",
zap.Any("event", e))
log.Warn("skip emit nil event", zap.Any("event", e))
continue
}

Expand All @@ -1187,8 +1180,7 @@
// begin; insert into t (id) values (1); delete from t where id=1; commit;
// Just ignore these row changed events.
if colLen == 0 && preColLen == 0 {
log.Warn("skip emit empty row event",
zap.Any("event", e))
log.Warn("skip emit empty row event", zap.Any("event", e))
continue
}

Expand Down Expand Up @@ -1222,7 +1214,7 @@

// ShouldSplitUpdateEvent determines if the split event is needed to align the old format based on
// whether the handle key column or unique key has been modified.
// If is modified, we need to use splitUpdateEvent to split the update event into a delete and an insert event.
// If is modified, we need to use splitUpdateEvent to split the update event into a delete and an insert event.
func ShouldSplitUpdateEvent(updateEvent *RowChangedEvent) bool {
// nil event will never be split.
if updateEvent == nil {
Expand Down Expand Up @@ -1265,6 +1257,13 @@
// NOTICE: clean up pre cols for insert event.
insertEvent.PreColumns = nil

log.Debug("split update event", zap.Uint64("startTs", updateEvent.StartTs),
zap.Uint64("commitTs", updateEvent.CommitTs),
zap.String("schema", updateEvent.TableInfo.TableName.Schema),
zap.String("table", updateEvent.TableInfo.GetTableName()),
zap.Any("preCols", updateEvent.PreColumns),
zap.Any("cols", updateEvent.Columns))

return &deleteEvent, &insertEvent, nil
}

Expand Down
17 changes: 14 additions & 3 deletions cdc/model/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -604,21 +604,32 @@ func TestTxnTrySplitAndSortUpdateEvent(t *testing.T) {
Rows: []*RowChangedEvent{ukUpdatedEvent},
}

err := txn.TrySplitAndSortUpdateEvent(sink.KafkaScheme)
outputRawChangeEvent := true
notOutputRawChangeEvent := false
err := txn.TrySplitAndSortUpdateEvent(sink.KafkaScheme, outputRawChangeEvent)
require.NoError(t, err)
require.Len(t, txn.Rows, 1)
err = txn.TrySplitAndSortUpdateEvent(sink.KafkaScheme, notOutputRawChangeEvent)
require.NoError(t, err)
require.Len(t, txn.Rows, 2)

txn = &SingleTableTxn{
Rows: []*RowChangedEvent{ukUpdatedEvent},
}
err = txn.TrySplitAndSortUpdateEvent(sink.MySQLScheme)
err = txn.TrySplitAndSortUpdateEvent(sink.MySQLScheme, outputRawChangeEvent)
require.NoError(t, err)
require.Len(t, txn.Rows, 1)
err = txn.TrySplitAndSortUpdateEvent(sink.MySQLScheme, notOutputRawChangeEvent)
require.NoError(t, err)
require.Len(t, txn.Rows, 1)

txn2 := &SingleTableTxn{
Rows: []*RowChangedEvent{ukUpdatedEvent, ukUpdatedEvent},
}
err = txn.TrySplitAndSortUpdateEvent(sink.MySQLScheme)
err = txn.TrySplitAndSortUpdateEvent(sink.MySQLScheme, outputRawChangeEvent)
require.NoError(t, err)
require.Len(t, txn2.Rows, 2)
err = txn.TrySplitAndSortUpdateEvent(sink.MySQLScheme, notOutputRawChangeEvent)
require.NoError(t, err)
require.Len(t, txn2.Rows, 2)
}
Expand Down
4 changes: 2 additions & 2 deletions cdc/processor/sinkmanager/table_sink_wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ func (m *mockSink) WriteEvents(events ...*dmlsink.CallbackableEvent[*model.RowCh
return nil
}

func (m *mockSink) Scheme() string {
return sink.BlackHoleScheme
func (m *mockSink) SchemeOption() (string, bool) {
return sink.BlackHoleScheme, false
}

func (m *mockSink) GetEvents() []*dmlsink.CallbackableEvent[*model.RowChangedEvent] {
Expand Down
6 changes: 3 additions & 3 deletions cdc/sink/dmlsink/blackhole/black_hole_dml_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ func (s *DMLSink) WriteEvents(rows ...*dmlsink.CallbackableEvent[*model.RowChang
return
}

// Scheme return the scheme of the sink.
func (s *DMLSink) Scheme() string {
return sink.BlackHoleScheme
// SchemeOption returns the scheme and the option.
func (s *DMLSink) SchemeOption() (string, bool) {
return sink.BlackHoleScheme, true
}

// Close do nothing.
Expand Down
26 changes: 14 additions & 12 deletions cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,9 @@
// messages to individual dmlWorkers.
// The dmlWorkers will write the encoded messages to external storage in parallel between different tables.
type DMLSink struct {
changefeedID model.ChangeFeedID
scheme string
changefeedID model.ChangeFeedID
scheme string
outputRawChangeEvent bool
// last sequence number
lastSeqNum uint64
// encodingWorkers defines a group of workers for encoding events.
Expand Down Expand Up @@ -144,13 +145,14 @@

wgCtx, wgCancel := context.WithCancel(ctx)
s := &DMLSink{
changefeedID: changefeedID,
scheme: strings.ToLower(sinkURI.Scheme),
encodingWorkers: make([]*encodingWorker, defaultEncodingConcurrency),
workers: make([]*dmlWorker, cfg.WorkerCount),
statistics: metrics.NewStatistics(wgCtx, changefeedID, sink.TxnSink),
cancel: wgCancel,
dead: make(chan struct{}),
changefeedID: changefeedID,
scheme: strings.ToLower(sinkURI.Scheme),
outputRawChangeEvent: replicaConfig.Sink.CloudStorageConfig.GetOutputRawChangeEvent(),
encodingWorkers: make([]*encodingWorker, defaultEncodingConcurrency),
workers: make([]*dmlWorker, cfg.WorkerCount),
statistics: metrics.NewStatistics(wgCtx, changefeedID, sink.TxnSink),
cancel: wgCancel,
dead: make(chan struct{}),
}
s.alive.msgCh = chann.NewAutoDrainChann[eventFragment]()

Expand Down Expand Up @@ -295,7 +297,7 @@
return s.dead
}

// Scheme returns the sink scheme.
func (s *DMLSink) Scheme() string {
return s.scheme
// SchemeOption returns the scheme and the option.
func (s *DMLSink) SchemeOption() (string, bool) {
return s.scheme, s.outputRawChangeEvent

Check warning on line 302 in cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink.go

View check run for this annotation

Codecov / codecov/patch

cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink.go#L301-L302

Added lines #L301 - L302 were not covered by tests
}
2 changes: 1 addition & 1 deletion cdc/sink/dmlsink/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type TableEvent interface {
// GetCommitTs returns the commit timestamp of the event.
GetCommitTs() uint64
// TrySplitAndSortUpdateEvent split the update to delete and insert if the unique key is updated
TrySplitAndSortUpdateEvent(scheme string) error
TrySplitAndSortUpdateEvent(scheme string, outputRawChangeEvent bool) error
}

// CallbackFunc is the callback function for callbackable event.
Expand Down
6 changes: 2 additions & 4 deletions cdc/sink/dmlsink/event_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,8 @@ type EventSink[E TableEvent] interface {
// WriteEvents writes events to the sink.
// This is an asynchronously and thread-safe method.
WriteEvents(events ...*CallbackableEvent[E]) error

// Scheme returns the sink scheme.
Scheme() string

// SchemeOption returns the sink scheme and whether the sink should output raw change event.
SchemeOption() (scheme string, outputRawChangeEvent bool)
// Close closes the sink. Can be called with `WriteEvents` concurrently.
Close()
// The EventSink meets internal errors and has been dead already.
Expand Down
4 changes: 2 additions & 2 deletions cdc/sink/dmlsink/mq/kafka_dml_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,8 @@ func NewKafkaDMLSink(
metricsCollector := factory.MetricsCollector(tiflowutil.RoleProcessor, adminClient)
dmlProducer := producerCreator(ctx, changefeedID, asyncProducer, metricsCollector, errCh, failpointCh)
encoderGroup := codec.NewEncoderGroup(replicaConfig.Sink, encoderBuilder, changefeedID)
s := newDMLSink(ctx, changefeedID, dmlProducer, adminClient, topicManager,
eventRouter, trans, encoderGroup, protocol, scheme, errCh)
s := newDMLSink(ctx, changefeedID, dmlProducer, adminClient, topicManager, eventRouter, trans, encoderGroup,
protocol, scheme, replicaConfig.Sink.KafkaConfig.GetOutputRawChangeEvent(), errCh)
log.Info("DML sink producer created",
zap.String("namespace", changefeedID.Namespace),
zap.String("changefeedID", changefeedID.ID))
Expand Down
Loading
Loading