diff --git a/cdc/sink/codec/interface.go b/cdc/sink/codec/interface.go index 7e2ccd31745..b8394cda29c 100644 --- a/cdc/sink/codec/interface.go +++ b/cdc/sink/codec/interface.go @@ -14,6 +14,7 @@ package codec import ( + "encoding/binary" "strings" "time" @@ -63,9 +64,16 @@ type MQMessage struct { Protocol Protocol // protocol } +// maximumRecordOverhead is used to calculate ProducerMessage's byteSize by sarama kafka client. +// reference: https://github.com/Shopify/sarama/blob/66521126c71c522c15a36663ae9cddc2b024c799/async_producer.go#L233 +// for TiCDC, minimum supported kafka version is `0.11.0.2`, which will be treated as `version = 2` by sarama producer. +const maximumRecordOverhead = 5*binary.MaxVarintLen32 + binary.MaxVarintLen64 + 1 + // Length returns the expected size of the Kafka message +// We didn't append any `Headers` when send the message, so ignore the calculations related to it. +// If `ProducerMessage` Headers fields used, this method should also adjust. func (m *MQMessage) Length() int { - return len(m.Key) + len(m.Value) + return len(m.Key) + len(m.Value) + maximumRecordOverhead } // PhysicalTime returns physical time part of Ts in time.Time diff --git a/cdc/sink/codec/json.go b/cdc/sink/codec/json.go index edacbd7878f..16a36da13a8 100644 --- a/cdc/sink/codec/json.go +++ b/cdc/sink/codec/json.go @@ -427,7 +427,7 @@ func (d *JSONEventBatchEncoder) AppendRowChangedEvent(e *model.RowChangedEvent) if message.Length() > d.maxKafkaMessageSize { // `len(d.messageBuf) == 1` is implied - log.Warn("Event does not fit into max-message-bytes. Adjust relevant configurations to avoid service interruptions.", + log.Debug("Event does not fit into max-message-bytes. Adjust relevant configurations to avoid service interruptions.", zap.Int("event-len", message.Length()), zap.Int("max-message-bytes", d.maxKafkaMessageSize)) } d.curBatchSize++ diff --git a/cdc/sink/codec/json_test.go b/cdc/sink/codec/json_test.go index 98d56739a8e..95b6bbb561b 100644 --- a/cdc/sink/codec/json_test.go +++ b/cdc/sink/codec/json_test.go @@ -265,8 +265,6 @@ func (s *batchSuite) TestParamsEdgeCases(c *check.C) { func (s *batchSuite) TestMaxMessageBytes(c *check.C) { defer testleak.AfterTest(c)() encoder := NewJSONEventBatchEncoder() - err := encoder.SetParams(map[string]string{"max-message-bytes": "256"}) - c.Check(err, check.IsNil) testEvent := &model.RowChangedEvent{ CommitTs: 1, @@ -274,13 +272,30 @@ func (s *batchSuite) TestMaxMessageBytes(c *check.C) { Columns: []*model.Column{{Name: "col1", Type: 1, Value: "aa"}}, } - for i := 0; i < 10000; i++ { + // make producer's `max-message-bytes` must less than event, but we should still send it as possible. + err := encoder.SetParams(map[string]string{"max-message-bytes": "1"}) + c.Check(err, check.IsNil) + for i := 0; i < 100; i++ { r, err := encoder.AppendRowChangedEvent(testEvent) c.Check(r, check.Equals, EncoderNoOperation) c.Check(err, check.IsNil) } + // one message per batch, and can be build, which means the producer will try to send it. messages := encoder.Build() + c.Assert(len(messages), check.Equals, 100) + + // make sure each batch's `Length` not greater than `max-message-bytes` + err = encoder.SetParams(map[string]string{"max-message-bytes": "256"}) + c.Check(err, check.IsNil) + + for i := 0; i < 10000; i++ { + r, err := encoder.AppendRowChangedEvent(testEvent) + c.Check(r, check.Equals, EncoderNoOperation) + c.Check(err, check.IsNil) + } + + messages = encoder.Build() for _, msg := range messages { c.Assert(msg.Length(), check.LessEqual, 256) } diff --git a/cdc/sink/mq.go b/cdc/sink/mq.go index ef3239365cd..a2a47f7fa66 100644 --- a/cdc/sink/mq.go +++ b/cdc/sink/mq.go @@ -16,7 +16,6 @@ package sink import ( "context" "net/url" - "strconv" "strings" "sync/atomic" "time" @@ -399,99 +398,14 @@ func (k *mqSink) writeToProducer(ctx context.Context, message *codec.MQMessage, } func newKafkaSaramaSink(ctx context.Context, sinkURI *url.URL, filter *filter.Filter, replicaConfig *config.ReplicaConfig, opts map[string]string, errCh chan error) (*mqSink, error) { - config := kafka.NewKafkaConfig() - scheme := strings.ToLower(sinkURI.Scheme) if scheme != "kafka" && scheme != "kafka+ssl" { return nil, cerror.ErrKafkaInvalidConfig.GenWithStack("can't create MQ sink with unsupported scheme: %s", scheme) } - s := sinkURI.Query().Get("partition-num") - if s != "" { - c, err := strconv.Atoi(s) - if err != nil { - return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err) - } - config.PartitionNum = int32(c) - } - - s = sinkURI.Query().Get("replication-factor") - if s != "" { - c, err := strconv.Atoi(s) - if err != nil { - return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err) - } - config.ReplicationFactor = int16(c) - } - - s = sinkURI.Query().Get("kafka-version") - if s != "" { - config.Version = s - } - - s = sinkURI.Query().Get("max-message-bytes") - if s != "" { - c, err := strconv.Atoi(s) - if err != nil { - return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err) - } - config.MaxMessageBytes = c - opts["max-message-bytes"] = s - } - - s = sinkURI.Query().Get("max-batch-size") - if s != "" { - opts["max-batch-size"] = s - } - - s = sinkURI.Query().Get("compression") - if s != "" { - config.Compression = s - } - config.ClientID = sinkURI.Query().Get("kafka-client-id") - - s = sinkURI.Query().Get("protocol") - if s != "" { - replicaConfig.Sink.Protocol = s - } - - s = sinkURI.Query().Get("ca") - if s != "" { - config.Credential.CAPath = s - } - - s = sinkURI.Query().Get("cert") - if s != "" { - config.Credential.CertPath = s - } - - s = sinkURI.Query().Get("key") - if s != "" { - config.Credential.KeyPath = s - } - - s = sinkURI.Query().Get("sasl-user") - if s != "" { - config.SaslScram.SaslUser = s - } - - s = sinkURI.Query().Get("sasl-password") - if s != "" { - config.SaslScram.SaslPassword = s - } - - s = sinkURI.Query().Get("sasl-mechanism") - if s != "" { - config.SaslScram.SaslMechanism = s - } - - s = sinkURI.Query().Get("auto-create-topic") - if s != "" { - autoCreate, err := strconv.ParseBool(s) - if err != nil { - return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err) - } - config.TopicPreProcess = autoCreate + config := kafka.NewConfig() + if err := config.Initialize(sinkURI, replicaConfig, opts); err != nil { + return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err) } topic := strings.TrimFunc(sinkURI.Path, func(r rune) bool { diff --git a/cdc/sink/producer/kafka/kafka.go b/cdc/sink/producer/kafka/kafka.go index 0e6631fcee7..c7f6edff355 100644 --- a/cdc/sink/producer/kafka/kafka.go +++ b/cdc/sink/producer/kafka/kafka.go @@ -16,7 +16,9 @@ package kafka import ( "context" "fmt" + "net/url" "regexp" + "strconv" "strings" "sync" "sync/atomic" @@ -27,6 +29,7 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/log" "github.com/pingcap/ticdc/cdc/sink/codec" + "github.com/pingcap/ticdc/pkg/config" cerror "github.com/pingcap/ticdc/pkg/errors" "github.com/pingcap/ticdc/pkg/notify" "github.com/pingcap/ticdc/pkg/security" @@ -51,9 +54,9 @@ type Config struct { TopicPreProcess bool } -// NewKafkaConfig returns a default Kafka configuration -func NewKafkaConfig() Config { - return Config{ +// NewConfig returns a default Kafka configuration +func NewConfig() *Config { + return &Config{ Version: "2.4.0", MaxMessageBytes: 512 * 1024 * 1024, // 512M ReplicationFactor: 1, @@ -64,6 +67,104 @@ func NewKafkaConfig() Config { } } +// Initialize the kafka configuration +func (c *Config) Initialize(sinkURI *url.URL, replicaConfig *config.ReplicaConfig, opts map[string]string) error { + s := sinkURI.Query().Get("partition-num") + if s != "" { + a, err := strconv.Atoi(s) + if err != nil { + return err + } + c.PartitionNum = int32(a) + } + + s = sinkURI.Query().Get("replication-factor") + if s != "" { + a, err := strconv.Atoi(s) + if err != nil { + return err + } + c.ReplicationFactor = int16(a) + } + + s = sinkURI.Query().Get("kafka-version") + if s != "" { + c.Version = s + } + + s = sinkURI.Query().Get("max-message-bytes") + if s != "" { + a, err := strconv.Atoi(s) + if err != nil { + return err + } + // `MaxMessageBytes` is set to `512 mb` by default, but it's still possible that a larger value expected. + // TiCDC should send the message at best. + if a > c.MaxMessageBytes { + c.MaxMessageBytes = a + } + opts["max-message-bytes"] = s + } + + s = sinkURI.Query().Get("max-batch-size") + if s != "" { + opts["max-batch-size"] = s + } + + s = sinkURI.Query().Get("compression") + if s != "" { + c.Compression = s + } + + c.ClientID = sinkURI.Query().Get("kafka-client-id") + + s = sinkURI.Query().Get("protocol") + if s != "" { + replicaConfig.Sink.Protocol = s + } + + s = sinkURI.Query().Get("ca") + if s != "" { + c.Credential.CAPath = s + } + + s = sinkURI.Query().Get("cert") + if s != "" { + c.Credential.CertPath = s + } + + s = sinkURI.Query().Get("key") + if s != "" { + c.Credential.KeyPath = s + } + + s = sinkURI.Query().Get("sasl-user") + if s != "" { + c.SaslScram.SaslUser = s + } + + s = sinkURI.Query().Get("sasl-password") + if s != "" { + c.SaslScram.SaslPassword = s + } + + s = sinkURI.Query().Get("sasl-mechanism") + if s != "" { + c.SaslScram.SaslMechanism = s + } + + s = sinkURI.Query().Get("auto-create-topic") + if s != "" { + autoCreate, err := strconv.ParseBool(s) + if err != nil { + return err + } + c.TopicPreProcess = autoCreate + } + + return nil +} + type kafkaSaramaProducer struct { // clientLock is used to protect concurrent access of asyncClient and syncClient. // Since we don't close these two clients (which have an input chan) from the @@ -283,7 +384,7 @@ func (k *kafkaSaramaProducer) run(ctx context.Context) error { // kafkaTopicPreProcess gets partition number from existing topic, if topic doesn't // exit, creates it automatically. -func kafkaTopicPreProcess(topic, address string, config Config, cfg *sarama.Config) (int32, error) { +func kafkaTopicPreProcess(topic, address string, config *Config, cfg *sarama.Config) (int32, error) { admin, err := sarama.NewClusterAdmin(strings.Split(address, ","), cfg) if err != nil { return 0, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err) @@ -334,7 +435,7 @@ func kafkaTopicPreProcess(topic, address string, config Config, cfg *sarama.Conf var newSaramaConfigImpl = newSaramaConfig // NewKafkaSaramaProducer creates a kafka sarama producer -func NewKafkaSaramaProducer(ctx context.Context, address string, topic string, config Config, errCh chan error) (*kafkaSaramaProducer, error) { +func NewKafkaSaramaProducer(ctx context.Context, address string, topic string, config *Config, errCh chan error) (*kafkaSaramaProducer, error) { log.Info("Starting kafka sarama producer ...", zap.Reflect("config", config)) cfg, err := newSaramaConfigImpl(ctx, config) if err != nil { @@ -417,7 +518,7 @@ func kafkaClientID(role, captureAddr, changefeedID, configuredClientID string) ( } // NewSaramaConfig return the default config and set the according version and metrics -func newSaramaConfig(ctx context.Context, c Config) (*sarama.Config, error) { +func newSaramaConfig(ctx context.Context, c *Config) (*sarama.Config, error) { config := sarama.NewConfig() version, err := sarama.ParseKafkaVersion(c.Version) diff --git a/cdc/sink/producer/kafka/kafka_test.go b/cdc/sink/producer/kafka/kafka_test.go index affbb03b055..feae407ba28 100644 --- a/cdc/sink/producer/kafka/kafka_test.go +++ b/cdc/sink/producer/kafka/kafka_test.go @@ -15,6 +15,9 @@ package kafka import ( "context" + "fmt" + "net/url" + "strconv" "sync" "testing" "time" @@ -23,6 +26,7 @@ import ( "github.com/pingcap/check" "github.com/pingcap/errors" "github.com/pingcap/ticdc/cdc/sink/codec" + "github.com/pingcap/ticdc/pkg/config" cerror "github.com/pingcap/ticdc/pkg/errors" "github.com/pingcap/ticdc/pkg/security" "github.com/pingcap/ticdc/pkg/util" @@ -62,6 +66,51 @@ func (s *kafkaSuite) TestClientID(c *check.C) { } } +func (s *kafkaSuite) TestInitializeConfig(c *check.C) { + defer testleak.AfterTest(c) + cfg := NewConfig() + + uriTemplate := "kafka://127.0.0.1:9092/kafka-test?kafka-version=2.6.0&max-batch-size=5" + + "&max-message-bytes=%s&partition-num=1&replication-factor=3" + + "&kafka-client-id=unit-test&auto-create-topic=false&compression=gzip" + maxMessageSize := "4194304" + uri := fmt.Sprintf(uriTemplate, maxMessageSize) + + sinkURI, err := url.Parse(uri) + c.Assert(err, check.IsNil) + + replicaConfig := config.GetDefaultReplicaConfig() + + opts := make(map[string]string) + err = cfg.Initialize(sinkURI, replicaConfig, opts) + c.Assert(err, check.IsNil) + + c.Assert(cfg.PartitionNum, check.Equals, int32(1)) + c.Assert(cfg.ReplicationFactor, check.Equals, int16(3)) + c.Assert(cfg.Version, check.Equals, "2.6.0") + c.Assert(cfg.MaxMessageBytes, check.Equals, 512*1024*1024) + + expectedOpts := map[string]string{ + "max-message-bytes": maxMessageSize, + "max-batch-size": "5", + } + for k, v := range opts { + c.Assert(v, check.Equals, expectedOpts[k]) + } + + a := 512*1024*1024 + 1 + maxMessageSize = strconv.Itoa(a) + uri = fmt.Sprintf(uriTemplate, maxMessageSize) + + sinkURI, err = url.Parse(uri) + c.Assert(err, check.IsNil) + + err = cfg.Initialize(sinkURI, replicaConfig, opts) + c.Assert(err, check.IsNil) + + c.Assert(cfg.MaxMessageBytes, check.Equals, a) +} + func (s *kafkaSuite) TestSaramaProducer(c *check.C) { defer testleak.AfterTest(c)() ctx, cancel := context.WithCancel(context.Background()) @@ -86,7 +135,7 @@ func (s *kafkaSuite) TestSaramaProducer(c *check.C) { } errCh := make(chan error, 1) - config := NewKafkaConfig() + config := NewConfig() // Because the sarama mock broker is not compatible with version larger than 1.0.0 // We use a smaller version in the following producer tests. // Ref: https://github.com/Shopify/sarama/blob/89707055369768913defac030c15cf08e9e57925/async_producer_test.go#L1445-L1447 @@ -95,7 +144,7 @@ func (s *kafkaSuite) TestSaramaProducer(c *check.C) { config.TopicPreProcess = false newSaramaConfigImplBak := newSaramaConfigImpl - newSaramaConfigImpl = func(ctx context.Context, config Config) (*sarama.Config, error) { + newSaramaConfigImpl = func(ctx context.Context, config *Config) (*sarama.Config, error) { cfg, err := newSaramaConfigImplBak(ctx, config) c.Assert(err, check.IsNil) cfg.Producer.Flush.MaxMessages = 1 @@ -205,7 +254,7 @@ func (s *kafkaSuite) TestTopicPreProcess(c *check.C) { "DescribeConfigsRequest": sarama.NewMockDescribeConfigsResponse(c), }) - config := NewKafkaConfig() + config := NewConfig() config.PartitionNum = int32(0) cfg, err := newSaramaConfigImpl(ctx, config) c.Assert(err, check.IsNil) @@ -238,7 +287,7 @@ func (s *kafkaSuite) TestTopicPreProcessCreate(c *check.C) { }) defer broker.Close() - config := NewKafkaConfig() + config := NewConfig() config.PartitionNum = int32(0) cfg, err := newSaramaConfigImpl(ctx, config) c.Assert(err, check.IsNil) @@ -250,7 +299,7 @@ func (s *kafkaSuite) TestTopicPreProcessCreate(c *check.C) { func (s *kafkaSuite) TestNewSaramaConfig(c *check.C) { defer testleak.AfterTest(c)() ctx := context.Background() - config := NewKafkaConfig() + config := NewConfig() config.Version = "invalid" _, err := newSaramaConfigImpl(ctx, config) c.Assert(errors.Cause(err), check.ErrorMatches, "invalid version.*") @@ -286,7 +335,7 @@ func (s *kafkaSuite) TestNewSaramaConfig(c *check.C) { _, err = newSaramaConfigImpl(ctx, config) c.Assert(errors.Cause(err), check.ErrorMatches, ".*no such file or directory") - saslConfig := NewKafkaConfig() + saslConfig := NewConfig() saslConfig.Version = "2.6.0" saslConfig.ClientID = "test-sasl-scram" saslConfig.SaslScram = &security.SaslScram{ @@ -307,7 +356,7 @@ func (s *kafkaSuite) TestCreateProducerFailed(c *check.C) { defer testleak.AfterTest(c)() ctx := context.Background() errCh := make(chan error, 1) - config := NewKafkaConfig() + config := NewConfig() config.Version = "invalid" _, err := NewKafkaSaramaProducer(ctx, "127.0.0.1:1111", "topic", config, errCh) c.Assert(errors.Cause(err), check.ErrorMatches, "invalid version.*") @@ -333,7 +382,7 @@ func (s *kafkaSuite) TestProducerSendMessageFailed(c *check.C) { leader.Returns(metadataResponse) leader.Returns(metadataResponse) - config := NewKafkaConfig() + config := NewConfig() // Because the sarama mock broker is not compatible with version larger than 1.0.0 // We use a smaller version in the following producer tests. // Ref: https://github.com/Shopify/sarama/blob/89707055369768913defac030c15cf08e9e57925/async_producer_test.go#L1445-L1447 @@ -342,7 +391,7 @@ func (s *kafkaSuite) TestProducerSendMessageFailed(c *check.C) { config.TopicPreProcess = false newSaramaConfigImplBak := newSaramaConfigImpl - newSaramaConfigImpl = func(ctx context.Context, config Config) (*sarama.Config, error) { + newSaramaConfigImpl = func(ctx context.Context, config *Config) (*sarama.Config, error) { cfg, err := newSaramaConfigImplBak(ctx, config) c.Assert(err, check.IsNil) cfg.Producer.Flush.MaxMessages = 1 @@ -407,7 +456,7 @@ func (s *kafkaSuite) TestProducerDoubleClose(c *check.C) { leader.Returns(metadataResponse) leader.Returns(metadataResponse) - config := NewKafkaConfig() + config := NewConfig() // Because the sarama mock broker is not compatible with version larger than 1.0.0 // We use a smaller version in the following producer tests. // Ref: https://github.com/Shopify/sarama/blob/89707055369768913defac030c15cf08e9e57925/async_producer_test.go#L1445-L1447