Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into support-duration
Browse files Browse the repository at this point in the history
  • Loading branch information
Yisaer committed Dec 13, 2022
2 parents 1769e8f + 621115b commit 68a3f07
Show file tree
Hide file tree
Showing 141 changed files with 16,052 additions and 11,335 deletions.
6 changes: 3 additions & 3 deletions WORKSPACE
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ load("@bazel_tools//tools/build_defs/repo:http.bzl", "http_archive")

http_archive(
name = "io_bazel_rules_go",
sha256 = "ae013bf35bd23234d1dea46b079f1e05ba74ac0321423830119d3e787ec73483",
sha256 = "56d8c5a5c91e1af73eca71a6fab2ced959b67c86d12ba37feedb0a2dfea441a6",
urls = [
"https://mirror.bazel.build/github.com/bazelbuild/rules_go/releases/download/v0.36.0/rules_go-v0.36.0.zip",
"https://github.com/bazelbuild/rules_go/releases/download/v0.36.0/rules_go-v0.36.0.zip",
"https://mirror.bazel.build/github.com/bazelbuild/rules_go/releases/download/v0.37.0/rules_go-v0.37.0.zip",
"https://github.com/bazelbuild/rules_go/releases/download/v0.37.0/rules_go-v0.37.0.zip",
],
)

Expand Down
2 changes: 1 addition & 1 deletion br/cmd/tidb-lightning-ctl/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func run() error {
if err != nil {
return err
}
if err = cfg.TiDB.Security.RegisterMySQL(); err != nil {
if err = cfg.TiDB.Security.BuildTLSConfig(); err != nil {
return err
}

Expand Down
24 changes: 14 additions & 10 deletions br/pkg/lightning/common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package common

import (
"context"
"crypto/tls"
"database/sql"
"encoding/base64"
"encoding/json"
Expand Down Expand Up @@ -47,15 +48,16 @@ const (

// MySQLConnectParam records the parameters needed to connect to a MySQL database.
type MySQLConnectParam struct {
Host string
Port int
User string
Password string
SQLMode string
MaxAllowedPacket uint64
TLS string
Net string
Vars map[string]string
Host string
Port int
User string
Password string
SQLMode string
MaxAllowedPacket uint64
TLSConfig *tls.Config
AllowFallbackToPlaintext bool
Net string
Vars map[string]string
}

func (param *MySQLConnectParam) ToDriverConfig() *mysql.Config {
Expand All @@ -72,7 +74,9 @@ func (param *MySQLConnectParam) ToDriverConfig() *mysql.Config {
cfg.Params["charset"] = "utf8mb4"
cfg.Params["sql_mode"] = fmt.Sprintf("'%s'", param.SQLMode)
cfg.MaxAllowedPacket = int(param.MaxAllowedPacket)
cfg.TLSConfig = param.TLS

cfg.TLS = param.TLSConfig
cfg.AllowFallbackToPlaintext = param.AllowFallbackToPlaintext

for k, v := range param.Vars {
cfg.Params[k] = fmt.Sprintf("'%s'", v)
Expand Down
1 change: 0 additions & 1 deletion br/pkg/lightning/config/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ go_library(
"@com_github_carlmjohnson_flagext//:flagext",
"@com_github_docker_go_units//:go-units",
"@com_github_go_sql_driver_mysql//:mysql",
"@com_github_google_uuid//:uuid",
"@com_github_pingcap_errors//:errors",
"@org_uber_go_atomic//:atomic",
"@org_uber_go_zap//:zap",
Expand Down
70 changes: 33 additions & 37 deletions br/pkg/lightning/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package config

import (
"context"
"crypto/tls"
"encoding/json"
"fmt"
"math"
Expand All @@ -32,7 +33,6 @@ import (
"github.com/BurntSushi/toml"
"github.com/docker/go-units"
gomysql "github.com/go-sql-driver/mysql"
"github.com/google/uuid"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/br/pkg/lightning/log"
Expand Down Expand Up @@ -578,20 +578,18 @@ type Security struct {
// RedactInfoLog indicates that whether enabling redact log
RedactInfoLog bool `toml:"redact-info-log" json:"redact-info-log"`

// TLSConfigName is used to set tls config for lightning in DM, so we don't expose this field to user
// DM may running many lightning instances at same time, so we need to set different tls config name for each lightning
TLSConfigName string `toml:"-" json:"-"`
TLSConfig *tls.Config `toml:"-" json:"-"`
AllowFallbackToPlaintext bool `toml:"-" json:"-"`

// When DM/engine uses lightning as a library, it can directly pass in the content
CABytes []byte `toml:"-" json:"-"`
CertBytes []byte `toml:"-" json:"-"`
KeyBytes []byte `toml:"-" json:"-"`
}

// RegisterMySQL registers the TLS config with name "cluster" or security.TLSConfigName
// for use in `sql.Open()`. This method is goroutine-safe.
func (sec *Security) RegisterMySQL() error {
if sec == nil {
// BuildTLSConfig builds the tls config which is used by SQL drier later.
func (sec *Security) BuildTLSConfig() error {
if sec == nil || sec.TLSConfig != nil {
return nil
}

Expand All @@ -604,21 +602,10 @@ func (sec *Security) RegisterMySQL() error {
if err != nil {
return errors.Trace(err)
}
if tlsConfig != nil {
// error happens only when the key coincides with the built-in names.
_ = gomysql.RegisterTLSConfig(sec.TLSConfigName, tlsConfig)
}
sec.TLSConfig = tlsConfig
return nil
}

// DeregisterMySQL deregisters the TLS config with security.TLSConfigName
func (sec *Security) DeregisterMySQL() {
if sec == nil || len(sec.CAPath) == 0 {
return
}
gomysql.DeregisterTLSConfig(sec.TLSConfigName)
}

// A duration which can be deserialized from a TOML string.
// Implemented as https://github.com/BurntSushi/toml#using-the-encodingtextunmarshaler-interface
type Duration struct {
Expand Down Expand Up @@ -1139,18 +1126,27 @@ func (cfg *Config) AdjustCheckPoint() {
switch cfg.Checkpoint.Driver {
case CheckpointDriverMySQL:
param := common.MySQLConnectParam{
Host: cfg.TiDB.Host,
Port: cfg.TiDB.Port,
User: cfg.TiDB.User,
Password: cfg.TiDB.Psw,
SQLMode: mysql.DefaultSQLMode,
MaxAllowedPacket: defaultMaxAllowedPacket,
TLS: cfg.TiDB.TLS,
Host: cfg.TiDB.Host,
Port: cfg.TiDB.Port,
User: cfg.TiDB.User,
Password: cfg.TiDB.Psw,
SQLMode: mysql.DefaultSQLMode,
MaxAllowedPacket: defaultMaxAllowedPacket,
TLSConfig: cfg.TiDB.Security.TLSConfig,
AllowFallbackToPlaintext: cfg.TiDB.Security.AllowFallbackToPlaintext,
}
cfg.Checkpoint.MySQLParam = &param
case CheckpointDriverFile:
cfg.Checkpoint.DSN = "/tmp/" + cfg.Checkpoint.Schema + ".pb"
}
} else {
// try to remove allowAllFiles
mysqlCfg, err := gomysql.ParseDSN(cfg.Checkpoint.DSN)
if err != nil {
return
}
mysqlCfg.AllowAllFiles = false
cfg.Checkpoint.DSN = mysqlCfg.FormatDSN()
}
}

Expand Down Expand Up @@ -1183,22 +1179,22 @@ func (cfg *Config) CheckAndAdjustSecurity() error {
}

switch cfg.TiDB.TLS {
case "":
if len(cfg.TiDB.Security.CAPath) > 0 || len(cfg.TiDB.Security.CABytes) > 0 ||
len(cfg.TiDB.Security.CertPath) > 0 || len(cfg.TiDB.Security.CertBytes) > 0 ||
len(cfg.TiDB.Security.KeyPath) > 0 || len(cfg.TiDB.Security.KeyBytes) > 0 {
if cfg.TiDB.Security.TLSConfigName == "" {
cfg.TiDB.Security.TLSConfigName = uuid.NewString() // adjust this the default value
case "skip-verify", "preferred":
if cfg.TiDB.Security.TLSConfig == nil {
/* #nosec G402 */
cfg.TiDB.Security.TLSConfig = &tls.Config{
MinVersion: tls.VersionTLS10,
InsecureSkipVerify: true,
NextProtos: []string{"h2", "http/1.1"}, // specify `h2` to let Go use HTTP/2.
}
cfg.TiDB.TLS = cfg.TiDB.Security.TLSConfigName
} else {
cfg.TiDB.TLS = "false"
cfg.TiDB.Security.AllowFallbackToPlaintext = true
}
case "cluster":
if len(cfg.Security.CAPath) == 0 {
return common.ErrInvalidConfig.GenWithStack("cannot set `tidb.tls` to 'cluster' without a [security] section")
}
case "false", "skip-verify", "preferred":
case "", "false":
cfg.TiDB.TLS = "false"
return nil
default:
return common.ErrInvalidConfig.GenWithStack("unsupported `tidb.tls` config %s", cfg.TiDB.TLS)
Expand Down
78 changes: 53 additions & 25 deletions br/pkg/lightning/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,40 +279,44 @@ func TestAdjustWillBatchImportRatioInvalid(t *testing.T) {
}

func TestAdjustSecuritySection(t *testing.T) {
uuidHolder := "<uuid>"
testCases := []struct {
input string
expectedCA string
expectedTLS string
input string
expectedCA string
hasTLS bool
fallback2NoTLS bool
}{
{
input: ``,
expectedCA: "",
expectedTLS: "false",
input: ``,
expectedCA: "",
hasTLS: false,
fallback2NoTLS: false,
},
{
input: `
[security]
`,
expectedCA: "",
expectedTLS: "false",
expectedCA: "",
hasTLS: false,
fallback2NoTLS: false,
},
{
input: `
[security]
ca-path = "/path/to/ca.pem"
`,
expectedCA: "/path/to/ca.pem",
expectedTLS: uuidHolder,
expectedCA: "/path/to/ca.pem",
hasTLS: false,
fallback2NoTLS: false,
},
{
input: `
[security]
ca-path = "/path/to/ca.pem"
[tidb.security]
`,
expectedCA: "",
expectedTLS: "false",
expectedCA: "",
hasTLS: false,
fallback2NoTLS: false,
},
{
input: `
Expand All @@ -321,17 +325,19 @@ func TestAdjustSecuritySection(t *testing.T) {
[tidb.security]
ca-path = "/path/to/ca2.pem"
`,
expectedCA: "/path/to/ca2.pem",
expectedTLS: uuidHolder,
expectedCA: "/path/to/ca2.pem",
hasTLS: false,
fallback2NoTLS: false,
},
{
input: `
[security]
[tidb.security]
ca-path = "/path/to/ca2.pem"
`,
expectedCA: "/path/to/ca2.pem",
expectedTLS: uuidHolder,
expectedCA: "/path/to/ca2.pem",
hasTLS: false,
fallback2NoTLS: false,
},
{
input: `
Expand All @@ -340,8 +346,20 @@ func TestAdjustSecuritySection(t *testing.T) {
tls = "skip-verify"
[tidb.security]
`,
expectedCA: "",
expectedTLS: "skip-verify",
expectedCA: "",
hasTLS: true,
fallback2NoTLS: true,
},
{
input: `
[security]
[tidb]
tls = "false"
[tidb.security]
`,
expectedCA: "",
hasTLS: false,
fallback2NoTLS: false,
},
}

Expand All @@ -357,19 +375,18 @@ func TestAdjustSecuritySection(t *testing.T) {
err = cfg.Adjust(context.Background())
require.NoError(t, err, comment)
require.Equal(t, tc.expectedCA, cfg.TiDB.Security.CAPath, comment)
if tc.expectedTLS == uuidHolder {
require.NotEmpty(t, cfg.TiDB.TLS, comment)
if tc.hasTLS {
require.NotNil(t, cfg.TiDB.Security.TLSConfig, comment)
} else {
require.Equal(t, tc.expectedTLS, cfg.TiDB.TLS, comment)
require.Nil(t, cfg.TiDB.Security.TLSConfig, comment)
}
require.Equal(t, tc.fallback2NoTLS, cfg.TiDB.Security.AllowFallbackToPlaintext, comment)
}
// test different tls config name
cfg := config.NewConfig()
assignMinimalLegalValue(cfg)
cfg.Security.CAPath = "/path/to/ca.pem"
cfg.Security.TLSConfigName = "tidb-tls"
require.NoError(t, cfg.Adjust(context.Background()))
require.Equal(t, cfg.TiDB.TLS, cfg.TiDB.Security.TLSConfigName)
}

func TestInvalidCSV(t *testing.T) {
Expand Down Expand Up @@ -626,7 +643,7 @@ func TestLoadConfig(t *testing.T) {
err = taskCfg.Adjust(context.Background())
require.NoError(t, err)
equivalentDSN := taskCfg.Checkpoint.MySQLParam.ToDriverConfig().FormatDSN()
expectedDSN := "guest:12345@tcp(172.16.30.11:4001)/?tls=false&maxAllowedPacket=67108864&charset=utf8mb4&sql_mode=%27ONLY_FULL_GROUP_BY%2CSTRICT_TRANS_TABLES%2CNO_ZERO_IN_DATE%2CNO_ZERO_DATE%2CERROR_FOR_DIVISION_BY_ZERO%2CNO_AUTO_CREATE_USER%2CNO_ENGINE_SUBSTITUTION%27"
expectedDSN := "guest:12345@tcp(172.16.30.11:4001)/?maxAllowedPacket=67108864&charset=utf8mb4&sql_mode=%27ONLY_FULL_GROUP_BY%2CSTRICT_TRANS_TABLES%2CNO_ZERO_IN_DATE%2CNO_ZERO_DATE%2CERROR_FOR_DIVISION_BY_ZERO%2CNO_AUTO_CREATE_USER%2CNO_ENGINE_SUBSTITUTION%27"
require.Equal(t, expectedDSN, equivalentDSN)

result := taskCfg.String()
Expand Down Expand Up @@ -783,6 +800,17 @@ func TestAdjustDiskQuota(t *testing.T) {
require.Equal(t, int64(0), int64(cfg.TikvImporter.DiskQuota))
}

func TestRemoveAllowAllFiles(t *testing.T) {
cfg := config.NewConfig()
assignMinimalLegalValue(cfg)
ctx := context.Background()

cfg.Checkpoint.Driver = config.CheckpointDriverMySQL
cfg.Checkpoint.DSN = "guest:12345@tcp(172.16.30.11:4001)/?tls=false&allowAllFiles=true&charset=utf8mb4"
require.NoError(t, cfg.Adjust(ctx))
require.Equal(t, "guest:12345@tcp(172.16.30.11:4001)/?tls=false&charset=utf8mb4", cfg.Checkpoint.DSN)
}

func TestDataCharacterSet(t *testing.T) {
testCases := []struct {
input string
Expand Down
11 changes: 2 additions & 9 deletions br/pkg/lightning/lightning.go
Original file line number Diff line number Diff line change
Expand Up @@ -479,18 +479,11 @@ func (l *Lightning) run(taskCtx context.Context, taskCfg *config.Config, o *opti
}()
})

if err := taskCfg.TiDB.Security.RegisterMySQL(); err != nil {
if err := taskCfg.TiDB.Security.BuildTLSConfig(); err != nil {
return common.ErrInvalidTLSConfig.Wrap(err)
}
defer func() {
// deregister TLS config with name "cluster"
if taskCfg.TiDB.Security == nil {
return
}
taskCfg.TiDB.Security.DeregisterMySQL()
}()

// initiation of default glue should be after RegisterMySQL, which is ready to be called after taskCfg.Adjust
// initiation of default glue should be after BuildTLSConfig, which is ready to be called after taskCfg.Adjust
// and also put it here could avoid injecting another two SkipRunTask failpoint to caller
g := o.glue
if g == nil {
Expand Down
Loading

0 comments on commit 68a3f07

Please sign in to comment.