Skip to content

Commit

Permalink
Merge branch 'master' into engine-fix-predispatch-failure-handling
Browse files Browse the repository at this point in the history
  • Loading branch information
liuzix authored May 26, 2022
2 parents 5751ec2 + 664b773 commit babbde1
Show file tree
Hide file tree
Showing 19 changed files with 488 additions and 49 deletions.
11 changes: 6 additions & 5 deletions cdc/sink/mq/dispatcher/event_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ type partitionDispatchRule int

const (
partitionDispatchRuleDefault partitionDispatchRule = iota
partitionDispatchRuleRowID
partitionDispatchRuleTS
partitionDispatchRuleTable
partitionDispatchRuleIndexValue
Expand All @@ -51,17 +50,19 @@ func (r *partitionDispatchRule) fromString(rule string) {
switch strings.ToLower(rule) {
case "default":
*r = partitionDispatchRuleDefault
case "rowid":
*r = partitionDispatchRuleRowID
case "ts":
*r = partitionDispatchRuleTS
case "table":
*r = partitionDispatchRuleTable
case "rowid":
*r = partitionDispatchRuleIndexValue
log.Warn("rowid is deprecated, please use index-value instead.")
case "index-value":
*r = partitionDispatchRuleIndexValue
default:
*r = partitionDispatchRuleDefault
log.Warn("can't support dispatch rule, using default rule", zap.String("rule", rule))
log.Warn("the partition dispatch rule is not default/ts/table/index-value," +
" use the default rule instead.")
}
}

Expand Down Expand Up @@ -230,7 +231,7 @@ func getPartitionDispatcher(
)
rule.fromString(ruleConfig.PartitionRule)
switch rule {
case partitionDispatchRuleRowID, partitionDispatchRuleIndexValue:
case partitionDispatchRuleIndexValue:
if enableOldValue {
log.Warn("This index-value distribution mode " +
"does not guarantee row-level orderliness when " +
Expand Down
90 changes: 71 additions & 19 deletions cdc/sink/mq/producer/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package kafka

import (
"context"
"crypto/tls"
"net/url"
"strconv"
"strings"
Expand Down Expand Up @@ -42,6 +43,7 @@ type Config struct {
MaxMessageBytes int
Compression string
ClientID string
EnableTLS bool
Credential *security.Credential
SASL *security.SASL
// control whether to create topic
Expand Down Expand Up @@ -145,21 +147,6 @@ func (c *Config) Apply(sinkURI *url.URL) error {

c.ClientID = params.Get("kafka-client-id")

s = params.Get("ca")
if s != "" {
c.Credential.CAPath = s
}

s = params.Get("cert")
if s != "" {
c.Credential.CertPath = s
}

s = params.Get("key")
if s != "" {
c.Credential.KeyPath = s
}

s = params.Get("auto-create-topic")
if s != "" {
autoCreate, err := strconv.ParseBool(s)
Expand Down Expand Up @@ -201,6 +188,60 @@ func (c *Config) Apply(sinkURI *url.URL) error {
return err
}

err = c.applyTLS(params)
if err != nil {
return err
}

return nil
}

func (c *Config) applyTLS(params url.Values) error {
s := params.Get("ca")
if s != "" {
c.Credential.CAPath = s
}

s = params.Get("cert")
if s != "" {
c.Credential.CertPath = s
}

s = params.Get("key")
if s != "" {
c.Credential.KeyPath = s
}

if c.Credential != nil && !c.Credential.IsEmpty() &&
!c.Credential.IsTLSEnabled() {
return cerror.WrapError(cerror.ErrKafkaInvalidConfig,
errors.New("ca, cert and key files should all be supplied"))
}

// if enable-tls is not set, but credential files are set,
// then tls should be enabled, and the self-signed CA certificate is used.
// if enable-tls is set to true, and credential files are not set,
// then tls should be enabled, and the trusted CA certificate on OS is used.
// if enable-tls is set to false, and credential files are set,
// then an error is returned.
s = params.Get("enable-tls")
if s != "" {
enableTLS, err := strconv.ParseBool(s)
if err != nil {
return err
}

if c.Credential != nil && c.Credential.IsTLSEnabled() && !enableTLS {
return cerror.WrapError(cerror.ErrKafkaInvalidConfig,
errors.New("credential files are supplied, but 'enable-tls' is set to false"))
}
c.EnableTLS = enableTLS
} else {
if c.Credential != nil && c.Credential.IsTLSEnabled() {
c.EnableTLS = true
}
}

return nil
}

Expand Down Expand Up @@ -368,11 +409,22 @@ func NewSaramaConfig(ctx context.Context, c *Config) (*sarama.Config, error) {
config.Producer.Compression = sarama.CompressionNone
}

if c.Credential != nil && len(c.Credential.CAPath) != 0 {
if c.EnableTLS {
// for SSL encryption with a trust CA certificate, we must populate the
// following two params of config.Net.TLS
config.Net.TLS.Enable = true
config.Net.TLS.Config, err = c.Credential.ToTLSConfig()
if err != nil {
return nil, errors.Trace(err)
config.Net.TLS.Config = &tls.Config{
MinVersion: tls.VersionTLS12,
NextProtos: []string{"h2", "http/1.1"},
}

// for SSL encryption with self-signed CA certificate, we reassign the
// config.Net.TLS.Config using the relevant credential files.
if c.Credential != nil && c.Credential.IsTLSEnabled() {
config.Net.TLS.Config, err = c.Credential.ToTLSConfig()
if err != nil {
return nil, errors.Trace(err)
}
}
}

Expand Down
71 changes: 70 additions & 1 deletion cdc/sink/mq/producer/kafka/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,11 @@ func TestNewSaramaConfig(t *testing.T) {
require.Equal(t, cc.expected, cfg.Producer.Compression)
}

config.EnableTLS = true
config.Credential = &security.Credential{
CAPath: "/invalid/ca/path",
CAPath: "/invalid/ca/path",
CertPath: "/invalid/cert/path",
KeyPath: "/invalid/key/path",
}
_, err = NewSaramaConfig(ctx, config)
require.Regexp(t, ".*no such file or directory", errors.Cause(err))
Expand Down Expand Up @@ -495,6 +498,72 @@ func TestApplySASL(t *testing.T) {
}
}

func TestApplyTLS(t *testing.T) {
t.Parallel()

tests := []struct {
name string
URI string
tlsEnabled bool
exceptErr string
}{
{
name: "tls config with 'enable-tls' set to true",
URI: "kafka://127.0.0.1:9092/abc?kafka-version=2.6.0&partition-num=0" +
"&sasl-user=user&sasl-password=password&sasl-mechanism=plain&enable-tls=true",
tlsEnabled: true,
exceptErr: "",
},
{
name: "tls config with no 'enable-tls', and credential files are supplied",
URI: "kafka://127.0.0.1:9092/abc?kafka-version=2.6.0&partition-num=0" +
"&sasl-user=user&sasl-password=password&sasl-mechanism=plain" +
"&ca=/root/ca.file&cert=/root/cert.file&key=/root/key.file",
tlsEnabled: true,
exceptErr: "",
},
{
name: "tls config with no 'enable-tls', and credential files are not supplied",
URI: "kafka://127.0.0.1:9092/abc?kafka-version=2.6.0&partition-num=0" +
"&sasl-user=user&sasl-password=password&sasl-mechanism=plain",
tlsEnabled: false,
exceptErr: "",
},
{
name: "tls config with 'enable-tls' set to false, and credential files are supplied",
URI: "kafka://127.0.0.1:9092/abc?kafka-version=2.6.0&partition-num=0" +
"&sasl-user=user&sasl-password=password&sasl-mechanism=plain&enable-tls=false" +
"&ca=/root/ca&cert=/root/cert&key=/root/key",
tlsEnabled: false,
exceptErr: "credential files are supplied, but 'enable-tls' is set to false",
},
{
name: "tls config with 'enable-tls' set to true, and some of " +
"the credential files are not supplied ",
URI: "kafka://127.0.0.1:9092/abc?kafka-version=2.6.0&partition-num=0" +
"&sasl-user=user&sasl-password=password&sasl-mechanism=plain&enable-tls=true" +
"&ca=/root/ca&cert=/root/cert&",
tlsEnabled: false,
exceptErr: "ca, cert and key files should all be supplied",
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
t.Parallel()
cfg := NewConfig()
sinkURI, err := url.Parse(test.URI)
require.Nil(t, err)
if test.exceptErr == "" {
require.Nil(t, cfg.applyTLS(sinkURI.Query()))
} else {
require.Regexp(t, test.exceptErr, cfg.applyTLS(sinkURI.Query()).Error())
}
require.Equal(t, test.tlsEnabled, cfg.EnableTLS)
})
}
}

func TestCompleteSaramaSASLConfig(t *testing.T) {
t.Parallel()

Expand Down
9 changes: 7 additions & 2 deletions cdc/sink/mq/producer/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,10 @@ func validateMinInsyncReplicas(

minInsyncReplicasStr, exists, err := minInsyncReplicasConfigGetter()
if err != nil {
// 'min.insync.replica' is invisible to us in Confluent Cloud Kafka.
if cerror.ErrKafkaBrokerConfigNotFound.Equal(err) {
return nil
}
return err
}
minInsyncReplicas, err := strconv.Atoi(minInsyncReplicasStr)
Expand Down Expand Up @@ -551,8 +555,9 @@ func getBrokerConfig(admin kafka.ClusterAdminClient, brokerConfigName string) (s
}

if len(configEntries) == 0 || configEntries[0].Name != brokerConfigName {
return "", errors.New(fmt.Sprintf(
"cannot find the `%s` from the broker's configuration", brokerConfigName))
log.Warn("Kafka config item not found", zap.String("configName", brokerConfigName))
return "", cerror.ErrKafkaBrokerConfigNotFound.GenWithStack(
"cannot find the `%s` from the broker's configuration", brokerConfigName)
}

return configEntries[0].Value, nil
Expand Down
19 changes: 12 additions & 7 deletions cdc/sink/mq/producer/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,23 +328,28 @@ func TestAdjustConfigMinInsyncReplicas(t *testing.T) {
)

// topic not exist, and `min.insync.replicas` not found in broker's configuration
adminClient.DropBrokerConfig()
adminClient.DropBrokerConfig(kafka.MinInsyncReplicasConfigName)
topicName := "no-topic-no-min-insync-replicas"
err = AdjustConfig(adminClient, config, saramaConfig, "no-topic-no-min-insync-replicas")
require.Regexp(t, ".*cannot find the `min.insync.replicas` from the broker's configuration",
errors.Cause(err))
require.Nil(t, err)
err = adminClient.CreateTopic(topicName, &sarama.TopicDetail{ReplicationFactor: 1}, false)
require.Regexp(t, ".*kafka server: Request parameters do not satisfy the configured policy.",
err.Error())

// Report an error if the replication-factor is less than min.insync.replicas
// when the topic does exist.
saramaConfig, err = NewSaramaConfig(context.Background(), config)
require.Nil(t, err)

// topic exist, but `min.insync.replicas` not found in topic and broker configuration
topicName := "topic-no-config-entry"
err = adminClient.CreateTopic(topicName, &sarama.TopicDetail{}, false)
topicName = "topic-no-config-entry"
err = adminClient.CreateTopic(topicName, &sarama.TopicDetail{
ReplicationFactor: 3,
NumPartitions: 3,
}, false)
require.Nil(t, err)
err = AdjustConfig(adminClient, config, saramaConfig, topicName)
require.Regexp(t, ".*cannot find the `min.insync.replicas` from the broker's configuration",
errors.Cause(err))
require.Nil(t, err)

// topic found, and have `min.insync.replicas`, but set to 2, larger than `replication-factor`.
adminClient.SetMinInsyncReplicas("2")
Expand Down
11 changes: 6 additions & 5 deletions dm/dm/config/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -680,11 +680,12 @@ func (c *TaskConfig) adjust() error {
}
case ModeIncrement:
if inst.Meta == nil {
return terror.ErrConfigMetadataNotSet.Generate(inst.SourceID, c.TaskMode)
}
err := inst.Meta.Verify()
if err != nil {
return terror.Annotatef(err, "mysql-instance: %d", i)
log.L().Warn("mysql-instance doesn't set meta for incremental mode, user should specify start_time to start task.", zap.String("sourceID", inst.SourceID))
} else {
err := inst.Meta.Verify()
if err != nil {
return terror.Annotatef(err, "mysql-instance: %d", i)
}
}
}

Expand Down
20 changes: 20 additions & 0 deletions dm/dm/master/openapi_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,26 @@ func (s *OpenAPIControllerSuite) TestTaskController() {
s.NotNil(taskCfg2)
s.EqualValues(task2, task)
s.Equal(taskCfg2.String(), taskCfg.String())

// incremental task without source meta
taskTest := *s.testTask
taskTest.TaskMode = config.ModeIncrement
req = openapi.ConverterTaskRequest{Task: &taskTest}
task3, taskCfg3, err := server.convertTaskConfig(ctx, req)
s.NoError(err)
s.NotNil(task3)
s.NotNil(taskCfg3)
s.EqualValues(&taskTest, task3)

req.Task = nil
taskCfgStr = taskCfg3.String()
req.TaskConfigFile = &taskCfgStr
task4, taskCfg4, err := server.convertTaskConfig(ctx, req)
s.NoError(err)
s.NotNil(task4)
s.NotNil(taskCfg4)
s.EqualValues(task4, task3)
s.Equal(taskCfg4.String(), taskCfg3.String())
}
}

Expand Down
11 changes: 10 additions & 1 deletion dm/dm/master/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1297,7 +1297,7 @@ func (s *Server) CheckTask(ctx context.Context, req *pb.CheckTaskRequest) (*pb.C
}, nil
}
resp := &pb.CheckTaskResponse{}
_, stCfgs, err := s.generateSubTask(ctx, req.Task, nil)
_, stCfgs, err := s.generateSubTask(ctx, req.Task, &cliArgs)
if err != nil {
resp.Msg = err.Error()
// nolint:nilerr
Expand Down Expand Up @@ -1605,6 +1605,15 @@ func (s *Server) generateSubTask(
return nil, nil, terror.WithClass(err, terror.ClassDMMaster)
}

if cfg.TaskMode == config.ModeIncrement && (cliArgs == nil || cliArgs.StartTime == "") {
for _, inst := range cfg.MySQLInstances {
// incremental task need to specify meta or start time
if inst.Meta == nil {
return nil, nil, terror.ErrConfigMetadataNotSet.Generate(inst.SourceID, config.ModeIncrement)
}
}
}

err = adjustTargetDB(ctx, cfg.TargetDB)
if err != nil {
return nil, nil, terror.WithClass(err, terror.ClassDMMaster)
Expand Down
14 changes: 14 additions & 0 deletions dm/tests/dmctl_basic/check_list/check_task.sh
Original file line number Diff line number Diff line change
Expand Up @@ -115,3 +115,17 @@ function check_task_empty_config() {
check_task_empty_load_config $1
check_task_empty_sync_config $1
}

function check_task_wrong_no_source_meta() {
task_conf=$1
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"check-task $task_conf" \
"must set meta for task-mode incremental" 1
}

function check_task_no_source_meta_but_start_time() {
task_conf=$1
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"check-task $task_conf --start-time '2006-01-02 15:04:05'" \
"pre-check is passed" 1
}
Loading

0 comments on commit babbde1

Please sign in to comment.