Skip to content
This repository has been archived by the owner on Dec 8, 2021. It is now read-only.

*: support MySQL backend #221

Merged
merged 5 commits into from
Aug 14, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions lightning/common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,12 @@ const (
defaultMaxRetry = 3
)

func ToDSN(host string, port int, user string, psw string) string {
return fmt.Sprintf("%s:%s@tcp(%s:%d)/?charset=utf8", user, psw, host, port)
func ToDSN(host string, port int, user string, psw string, sqlMode string) string {
return fmt.Sprintf("%s:%s@tcp(%s:%d)/?charset=utf8&sql_mode='%s'", user, psw, host, port, sqlMode)
}

func ConnectDB(host string, port int, user string, psw string) (*sql.DB, error) {
dbDSN := ToDSN(host, port, user, psw)
func ConnectDB(host string, port int, user string, psw string, sqlMode string) (*sql.DB, error) {
dbDSN := ToDSN(host, port, user, psw, sqlMode)
db, err := sql.Open("mysql", dbDSN)
if err != nil {
return nil, errors.Trace(err)
Expand Down
16 changes: 14 additions & 2 deletions lightning/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,8 @@ type MydumperRuntime struct {
}

type TikvImporter struct {
Addr string `toml:"addr" json:"addr"`
Addr string `toml:"addr" json:"addr"`
Backend string `toml:"backend" json:"backend"`
}

type Checkpoint struct {
Expand Down Expand Up @@ -181,6 +182,9 @@ func NewConfig() *Config {
Delimiter: `"`,
},
},
TikvImporter: TikvImporter{
Backend: "importer",
kennytm marked this conversation as resolved.
Show resolved Hide resolved
},
PostRestore: PostRestore{
Checksum: true,
},
Expand All @@ -200,6 +204,7 @@ func (cfg *Config) LoadFromGlobal(global *GlobalConfig) error {
cfg.TiDB.PdAddr = global.TiDB.PdAddr
cfg.Mydumper.SourceDir = global.Mydumper.SourceDir
cfg.TikvImporter.Addr = global.TikvImporter.Addr
cfg.TikvImporter.Backend = global.TikvImporter.Backend

return nil
}
Expand Down Expand Up @@ -290,6 +295,13 @@ func (cfg *Config) Adjust() error {
}
}

cfg.TikvImporter.Backend = strings.ToLower(cfg.TikvImporter.Backend)
switch cfg.TikvImporter.Backend {
case "mysql", "importer":
default:
return errors.New("invalid config: unsupported `tikv-importer.backend`")
kennytm marked this conversation as resolved.
Show resolved Hide resolved
}

var err error
cfg.TiDB.SQLMode, err = mysql.GetSQLMode(cfg.TiDB.StrSQLMode)
if err != nil {
Expand Down Expand Up @@ -358,7 +370,7 @@ func (cfg *Config) Adjust() error {
if len(cfg.Checkpoint.DSN) == 0 {
switch cfg.Checkpoint.Driver {
case "mysql":
cfg.Checkpoint.DSN = common.ToDSN(cfg.TiDB.Host, cfg.TiDB.Port, cfg.TiDB.User, cfg.TiDB.Psw)
cfg.Checkpoint.DSN = common.ToDSN(cfg.TiDB.Host, cfg.TiDB.Port, cfg.TiDB.User, cfg.TiDB.Psw, mysql.DefaultSQLMode)
case "file":
cfg.Checkpoint.DSN = "/tmp/" + cfg.Checkpoint.Schema + ".pb"
}
Expand Down
13 changes: 11 additions & 2 deletions lightning/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"testing"

. "github.com/pingcap/check"
"github.com/pingcap/parser/mysql"

"github.com/pingcap/tidb-lightning/lightning/config"
)

Expand Down Expand Up @@ -109,6 +111,13 @@ func (s *configTestSuite) TestAdjustConnectRefused(c *C) {
c.Assert(err, ErrorMatches, "cannot fetch settings from TiDB.*")
}

func (s *configTestSuite) TestAdjustInvalidBackend(c *C) {
cfg := config.NewConfig()
cfg.TikvImporter.Backend = "no_such_backend"
err := cfg.Adjust()
c.Assert(err, ErrorMatches, "invalid config: unsupported `tikv-importer\\.backend`")
}

func (s *configTestSuite) TestDecodeError(c *C) {
ts, host, port := startMockServer(c, http.StatusOK, "invalid-string")
defer ts.Close()
Expand Down Expand Up @@ -381,15 +390,15 @@ func (s *configTestSuite) TestLoadConfig(c *C) {
taskCfg.Checkpoint.Driver = "mysql"
err = taskCfg.Adjust()
c.Assert(err, IsNil)
c.Assert(taskCfg.Checkpoint.DSN, Equals, "guest:@tcp(172.16.30.11:4001)/?charset=utf8")
c.Assert(taskCfg.Checkpoint.DSN, Equals, "guest:@tcp(172.16.30.11:4001)/?charset=utf8&sql_mode='"+mysql.DefaultSQLMode+"'")

result := taskCfg.String()
c.Assert(result, Matches, `.*"pd-addr":"172.16.30.11:2379,172.16.30.12:2379".*`)
}

func (s *configTestSuite) TestLoadFromInvalidConfig(c *C) {
taskCfg := config.NewConfig()
err := taskCfg.LoadFromGlobal(&config.GlobalConfig {
err := taskCfg.LoadFromGlobal(&config.GlobalConfig{
ConfigFileContent: []byte("invalid toml"),
})
c.Assert(err, ErrorMatches, "Near line 1.*")
Expand Down
10 changes: 9 additions & 1 deletion lightning/config/global.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ type GlobalMydumper struct {
}

type GlobalImporter struct {
Addr string `toml:"addr" json:"addr"`
Addr string `toml:"addr" json:"addr"`
Backend string `toml:"backend" json:"backend"`
}

type GlobalConfig struct {
Expand All @@ -72,6 +73,9 @@ func NewGlobalConfig() *GlobalConfig {
StatusPort: 10080,
LogLevel: "error",
},
TikvImporter: GlobalImporter{
Backend: "importer",
},
}
}

Expand Down Expand Up @@ -111,6 +115,7 @@ func LoadGlobalConfig(args []string, extraFlags func(*flag.FlagSet)) (*GlobalCon
pdAddr := fs.String("pd-urls", "", "PD endpoint address")
dataSrcPath := fs.String("d", "", "Directory of the dump to import")
importerAddr := fs.String("importer", "", "address (host:port) to connect to tikv-importer")
backend := fs.String("backend", "", `delivery backend ("importer" or "mysql")`)

statusAddr := fs.String("status-addr", "", "the Lightning server address")
serverMode := fs.Bool("server-mode", false, "start Lightning in server mode, wait for multiple tasks instead of starting immediately")
Expand Down Expand Up @@ -171,6 +176,9 @@ func LoadGlobalConfig(args []string, extraFlags func(*flag.FlagSet)) (*GlobalCon
if *statusAddr != "" {
cfg.App.StatusAddr = *statusAddr
}
if *backend != "" {
cfg.TikvImporter.Backend = *backend
}
if cfg.App.StatusAddr == "" && cfg.App.PProfPort != 0 {
cfg.App.StatusAddr = fmt.Sprintf(":%d", cfg.App.PProfPort)
}
Expand Down
9 changes: 9 additions & 0 deletions lightning/kv/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,11 @@ type AbstractBackend interface {
// value will be used in `Rows.SplitIntoChunks`.
MaxChunkSize() int

// ShouldPostProcess returns whether KV-specific post-processing should be
// performed for this backend. Post-processing includes checksum, adjusting
// auto-increment ID, and analyze.
ShouldPostProcess() bool

// NewEncoder creates an encoder of a TiDB table.
NewEncoder(tbl table.Table, sqlMode mysql.SQLMode) Encoder

Expand Down Expand Up @@ -162,6 +167,10 @@ func (be Backend) NewEncoder(tbl table.Table, sqlMode mysql.SQLMode) Encoder {
return be.abstract.NewEncoder(tbl, sqlMode)
}

func (be Backend) ShouldPostProcess() bool {
return be.abstract.ShouldPostProcess()
}

// OpenEngine opens an engine with the given table name and engine ID.
func (be Backend) OpenEngine(ctx context.Context, tableName string, engineID int32) (*OpenedEngine, error) {
tag := makeTag(tableName, engineID)
Expand Down
4 changes: 4 additions & 0 deletions lightning/kv/importer.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ func (*importer) MaxChunkSize() int {
return 31 << 10
}

func (*importer) ShouldPostProcess() bool {
return true
}

// isIgnorableOpenCloseEngineError checks if the error from
// OpenEngine/CloseEngine can be safely ignored.
func isIgnorableOpenCloseEngineError(err error) bool {
Expand Down
Loading