diff --git a/cdc/sink/dmlsink/mq/mq_dml_sink.go b/cdc/sink/dmlsink/mq/mq_dml_sink.go index 23d238ba856..9df6cbd378a 100644 --- a/cdc/sink/dmlsink/mq/mq_dml_sink.go +++ b/cdc/sink/dmlsink/mq/mq_dml_sink.go @@ -190,9 +190,10 @@ func (s *dmlSink) WriteEvents(txns ...*dmlsink.CallbackableEvent[*model.SingleTa // This never be blocked because this is an unbounded channel. s.alive.worker.msgChan.In() <- mqEvent{ key: model.TopicPartitionKey{ - Topic: topic, - Partition: index, - PartitionKey: key, + Topic: topic, + Partition: index, + PartitionKey: key, + TotalPartition: partitionNum, }, rowEvent: &dmlsink.RowChangeCallbackableEvent{ Event: row, diff --git a/pkg/sink/codec/bootstraper.go b/pkg/sink/codec/bootstraper.go index 08cb5797761..f59fb2cdb32 100644 --- a/pkg/sink/codec/bootstraper.go +++ b/pkg/sink/codec/bootstraper.go @@ -75,6 +75,7 @@ func newBootstrapWorker( } func (b *bootstrapWorker) run(ctx context.Context) error { + log.Info("Bootstrap worker is started", zap.Stringer("changefeed", b.changefeedID)) sendTicker := time.NewTicker(bootstrapWorkerTickerInterval) gcTicker := time.NewTicker(bootstrapWorkerGCInterval) defer func() {