Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sink/mq(ticdc): Add support for Confluent Cloud Kafka #5553

Merged
merged 22 commits into from
May 25, 2022
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions cdc/sink/mq/dispatcher/event_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ type partitionDispatchRule int

const (
partitionDispatchRuleDefault partitionDispatchRule = iota
partitionDispatchRuleRowID
partitionDispatchRuleTS
partitionDispatchRuleTable
partitionDispatchRuleIndexValue
Expand All @@ -51,17 +50,19 @@ func (r *partitionDispatchRule) fromString(rule string) {
switch strings.ToLower(rule) {
case "default":
*r = partitionDispatchRuleDefault
case "rowid":
zhaoxinyu marked this conversation as resolved.
Show resolved Hide resolved
*r = partitionDispatchRuleRowID
case "ts":
*r = partitionDispatchRuleTS
case "table":
*r = partitionDispatchRuleTable
case "rowid":
*r = partitionDispatchRuleIndexValue
log.Warn("rowid is deprecated, please use index-value instead.")
case "index-value":
*r = partitionDispatchRuleIndexValue
default:
*r = partitionDispatchRuleDefault
log.Warn("can't support dispatch rule, using default rule", zap.String("rule", rule))
log.Warn("the partition dispatch rule is not default/ts/table/index-value," +
" use the default rule instead.")
}
}

Expand Down Expand Up @@ -230,7 +231,7 @@ func getPartitionDispatcher(
)
rule.fromString(ruleConfig.PartitionRule)
switch rule {
case partitionDispatchRuleRowID, partitionDispatchRuleIndexValue:
case partitionDispatchRuleIndexValue:
if enableOldValue {
log.Warn("This index-value distribution mode " +
"does not guarantee row-level orderliness when " +
Expand Down
2 changes: 1 addition & 1 deletion cdc/sink/mq/dispatcher/event_router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func TestEventRouter(t *testing.T) {
},
{
Matcher: []string{"test.*"},
PartitionRule: "rowid",
PartitionRule: "index-value",
zhaoxinyu marked this conversation as resolved.
Show resolved Hide resolved
TopicRule: "hello_{schema}",
},
{
Expand Down
84 changes: 65 additions & 19 deletions cdc/sink/mq/producer/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package kafka

import (
"context"
"crypto/tls"
"net/url"
"strconv"
"strings"
Expand Down Expand Up @@ -42,6 +43,7 @@ type Config struct {
MaxMessageBytes int
Compression string
ClientID string
EnableTLS bool
Credential *security.Credential
SASL *security.SASL
// control whether to create topic
Expand Down Expand Up @@ -145,21 +147,6 @@ func (c *Config) Apply(sinkURI *url.URL) error {

c.ClientID = params.Get("kafka-client-id")

s = params.Get("ca")
if s != "" {
c.Credential.CAPath = s
}

s = params.Get("cert")
if s != "" {
c.Credential.CertPath = s
}

s = params.Get("key")
if s != "" {
c.Credential.KeyPath = s
}

s = params.Get("auto-create-topic")
if s != "" {
autoCreate, err := strconv.ParseBool(s)
Expand Down Expand Up @@ -201,6 +188,54 @@ func (c *Config) Apply(sinkURI *url.URL) error {
return err
}

err = c.applyTLS(params)
if err != nil {
return err
}

return nil
}

func (c *Config) applyTLS(params url.Values) error {
s := params.Get("ca")
if s != "" {
c.Credential.CAPath = s
}

s = params.Get("cert")
if s != "" {
c.Credential.CertPath = s
}

s = params.Get("key")
if s != "" {
c.Credential.KeyPath = s
}

// if enable-tls is not set, but credential files are set,
// then tls should be enabled, and the self-signed CA certificate is used.
// if enable-tls is set to true, and credential files are not set,
// then tls should be enabled, and the trusted CA certificate on OS is used.
// if enable-tls is set to false, and credential files are set,
// then an error is returned.
s = params.Get("enable-tls")
if s != "" {
enableTLS, err := strconv.ParseBool(s)
if err != nil {
return err
}

if c.Credential != nil && c.Credential.IsTLSEnabled() && !enableTLS {
zhaoxinyu marked this conversation as resolved.
Show resolved Hide resolved
return cerror.WrapError(cerror.ErrKafkaInvalidConfig,
errors.New("credential files are supplied, but 'enable-tls' is set to false"))
}
c.EnableTLS = enableTLS
} else {
if c.Credential != nil && c.Credential.IsTLSEnabled() {
c.EnableTLS = true
}
}

return nil
}

Expand Down Expand Up @@ -368,11 +403,22 @@ func NewSaramaConfig(ctx context.Context, c *Config) (*sarama.Config, error) {
config.Producer.Compression = sarama.CompressionNone
}

if c.Credential != nil && len(c.Credential.CAPath) != 0 {
if c.EnableTLS {
// for SSL encryption with a trust CA certificate, we must populate the
// following two params of config.Net.TLS
config.Net.TLS.Enable = true
config.Net.TLS.Config, err = c.Credential.ToTLSConfig()
if err != nil {
return nil, errors.Trace(err)
config.Net.TLS.Config = &tls.Config{
MinVersion: tls.VersionTLS12,
NextProtos: []string{"h2", "http/1.1"},
}

// for SSL encryption with self-signed CA certificate, we reassign the
// config.Net.TLS.Config using the relevant credential files.
if c.Credential != nil && c.Credential.IsTLSEnabled() {
config.Net.TLS.Config, err = c.Credential.ToTLSConfig()
if err != nil {
return nil, errors.Trace(err)
}
}
}

Expand Down
58 changes: 58 additions & 0 deletions cdc/sink/mq/producer/kafka/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func TestNewSaramaConfig(t *testing.T) {
require.Equal(t, cc.expected, cfg.Producer.Compression)
}

config.EnableTLS = true
config.Credential = &security.Credential{
CAPath: "/invalid/ca/path",
}
Expand Down Expand Up @@ -495,6 +496,63 @@ func TestApplySASL(t *testing.T) {
}
}

func TestApplyTLS(t *testing.T) {
t.Parallel()

tests := []struct {
name string
URI string
tlsEnabled bool
exceptErr string
}{
{
name: "tls config with 'enable-tls' set to true",
URI: "kafka://127.0.0.1:9092/abc?kafka-version=2.6.0&partition-num=0" +
"&sasl-user=user&sasl-password=password&sasl-mechanism=plain&enable-tls=true",
tlsEnabled: true,
exceptErr: "",
},
{
name: "tls config with no 'enable-tls', and credential files are supplied",
URI: "kafka://127.0.0.1:9092/abc?kafka-version=2.6.0&partition-num=0" +
"&sasl-user=user&sasl-password=password&sasl-mechanism=plain" +
"&ca=/root/ca.file&cert=/root/cert.file&key=/root/key.file",
tlsEnabled: true,
exceptErr: "",
},
{
name: "tls config with no 'enable-tls', and credential files are not supplied",
URI: "kafka://127.0.0.1:9092/abc?kafka-version=2.6.0&partition-num=0" +
"&sasl-user=user&sasl-password=password&sasl-mechanism=plain",
tlsEnabled: false,
exceptErr: "",
},
{
name: "tls config with 'enable-tls' set to false, and credential files are supplied",
URI: "kafka://127.0.0.1:9092/abc?kafka-version=2.6.0&partition-num=0" +
"&sasl-user=user&sasl-password=password&sasl-mechanism=plain&enable-tls=false" +
"&ca=/root/ca&cert=/root/cert&key=/root/key",
tlsEnabled: false,
exceptErr: "credential files are supplied, but 'enable-tls' is set to false",
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
t.Parallel()
cfg := NewConfig()
sinkURI, err := url.Parse(test.URI)
require.Nil(t, err)
if test.exceptErr == "" {
require.Nil(t, cfg.applyTLS(sinkURI.Query()))
} else {
require.Regexp(t, test.exceptErr, cfg.applyTLS(sinkURI.Query()).Error())
}
require.Equal(t, test.tlsEnabled, cfg.EnableTLS)
})
}
}

func TestCompleteSaramaSASLConfig(t *testing.T) {
t.Parallel()

Expand Down
11 changes: 9 additions & 2 deletions cdc/sink/mq/producer/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"fmt"
"regexp"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -507,6 +508,11 @@ func validateMinInsyncReplicas(

minInsyncReplicasStr, exists, err := minInsyncReplicasConfigGetter()
if err != nil {
// 'min.insync.replica' is invisible to us in Confluent Cloud Kafka.
if strings.Contains(err.Error(),
zhaoxinyu marked this conversation as resolved.
Show resolved Hide resolved
string(cerror.ErrKafkaBrokerConfigNotFound.RFCCode())) {
return nil
}
return err
}
minInsyncReplicas, err := strconv.Atoi(minInsyncReplicasStr)
Expand Down Expand Up @@ -551,8 +557,9 @@ func getBrokerConfig(admin kafka.ClusterAdminClient, brokerConfigName string) (s
}

if len(configEntries) == 0 || configEntries[0].Name != brokerConfigName {
return "", errors.New(fmt.Sprintf(
"cannot find the `%s` from the broker's configuration", brokerConfigName))
log.Warn("Kafka config item not found", zap.String("configName", brokerConfigName))
return "", cerror.ErrKafkaBrokerConfigNotFound.GenWithStack(
"cannot find the `%s` from the broker's configuration", brokerConfigName)
}

return configEntries[0].Value, nil
Expand Down
21 changes: 14 additions & 7 deletions cdc/sink/mq/producer/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package kafka

import (
"context"
"fmt"
"strings"
"sync"
"testing"
Expand Down Expand Up @@ -328,23 +329,29 @@ func TestAdjustConfigMinInsyncReplicas(t *testing.T) {
)

// topic not exist, and `min.insync.replicas` not found in broker's configuration
adminClient.DropBrokerConfig()
adminClient.DropBrokerConfig(kafka.MinInsyncReplicasConfigName)
topicName := "no-topic-no-min-insync-replicas"
err = AdjustConfig(adminClient, config, saramaConfig, "no-topic-no-min-insync-replicas")
require.Regexp(t, ".*cannot find the `min.insync.replicas` from the broker's configuration",
errors.Cause(err))
require.Nil(t, err)
err = adminClient.CreateTopic(topicName, &sarama.TopicDetail{ReplicationFactor: 1}, false)
require.Regexp(t, ".*kafka server: Request parameters do not satisfy the configured policy.",
err.Error())

// Report an error if the replication-factor is less than min.insync.replicas
// when the topic does exist.
saramaConfig, err = NewSaramaConfig(context.Background(), config)
require.Nil(t, err)
fmt.Println(config.PartitionNum)
zhaoxinyu marked this conversation as resolved.
Show resolved Hide resolved

// topic exist, but `min.insync.replicas` not found in topic and broker configuration
topicName := "topic-no-config-entry"
err = adminClient.CreateTopic(topicName, &sarama.TopicDetail{}, false)
topicName = "topic-no-config-entry"
err = adminClient.CreateTopic(topicName, &sarama.TopicDetail{
ReplicationFactor: 3,
NumPartitions: 3,
}, false)
require.Nil(t, err)
err = AdjustConfig(adminClient, config, saramaConfig, topicName)
require.Regexp(t, ".*cannot find the `min.insync.replicas` from the broker's configuration",
errors.Cause(err))
require.Nil(t, err)

// topic found, and have `min.insync.replicas`, but set to 2, larger than `replication-factor`.
adminClient.SetMinInsyncReplicas("2")
Expand Down
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,11 @@ error = '''
kafka async send message failed
'''

["CDC:ErrKafkaBrokerConfigNotFound"]
error = '''
kafka broker config item not found
'''

["CDC:ErrKafkaCreateTopic"]
error = '''
kafka create topic failed
Expand Down
4 changes: 4 additions & 0 deletions pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,10 @@ var (
"invalid topic expression",
errors.RFCCodeText("CDC:ErrKafkaTopicExprInvalid"),
)
ErrKafkaBrokerConfigNotFound = errors.Normalize(
"kafka broker config item not found",
errors.RFCCodeText("CDC:ErrKafkaBrokerConfigNotFound"),
)
ErrPulsarNewProducer = errors.Normalize(
"new pulsar producer",
errors.RFCCodeText("CDC:ErrPulsarNewProducer"),
Expand Down
29 changes: 27 additions & 2 deletions pkg/kafka/cluster_admin_client_mock_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ const (
DefaultMockPartitionNum = 3
// defaultMockControllerID specifies the default mock controller ID.
defaultMockControllerID = 1
// topic replication factor must be 3 for Confluent Cloud Kafka.
defaultReplicationFactor = 3
)

const (
Expand Down Expand Up @@ -110,6 +112,20 @@ func (c *ClusterAdminClientMockImpl) DescribeConfig(resource sarama.ConfigResour

// CreateTopic adds topic into map.
func (c *ClusterAdminClientMockImpl) CreateTopic(topic string, detail *sarama.TopicDetail, _ bool) error {
minInsyncReplicaConfigFound := false

for _, config := range c.brokerConfigs {
if config.Name == MinInsyncReplicasConfigName {
minInsyncReplicaConfigFound = true
}
}
// For Confluent Cloud, min.insync.replica is invisible and replication factor must be 3.
// Otherwise, ErrPolicyViolation is expected to be returned.
if !minInsyncReplicaConfigFound &&
detail.ReplicationFactor != defaultReplicationFactor {
return sarama.ErrPolicyViolation
}

c.topics[topic] = *detail
return nil
}
Expand Down Expand Up @@ -148,6 +164,15 @@ func (c *ClusterAdminClientMockImpl) GetTopicMaxMessageBytes() int {
}

// DropBrokerConfig remove all broker level configuration for test purpose.
func (c *ClusterAdminClientMockImpl) DropBrokerConfig() {
c.brokerConfigs = c.brokerConfigs[:0]
func (c *ClusterAdminClientMockImpl) DropBrokerConfig(configName string) {
targetIdx := 0
for i, config := range c.brokerConfigs {
if config.Name == configName {
targetIdx = i
}
}

if targetIdx != 0 {
c.brokerConfigs = append(c.brokerConfigs[:targetIdx], c.brokerConfigs[targetIdx+1:]...)
}
}
Loading