Skip to content
This repository has been archived by the owner on Dec 8, 2021. It is now read-only.

*: support MySQL backend #221

Merged
merged 5 commits into from
Aug 14, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
*: support MySQL backend
  • Loading branch information
kennytm committed Jul 31, 2019
commit 8341f95baabd88549764fe8292267b4bf3cced66
8 changes: 4 additions & 4 deletions lightning/common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,12 @@ const (
defaultMaxRetry = 3
)

func ToDSN(host string, port int, user string, psw string) string {
return fmt.Sprintf("%s:%s@tcp(%s:%d)/?charset=utf8", user, psw, host, port)
func ToDSN(host string, port int, user string, psw string, sqlMode string) string {
return fmt.Sprintf("%s:%s@tcp(%s:%d)/?charset=utf8&sql_mode='%s'", user, psw, host, port, sqlMode)
}

func ConnectDB(host string, port int, user string, psw string) (*sql.DB, error) {
dbDSN := ToDSN(host, port, user, psw)
func ConnectDB(host string, port int, user string, psw string, sqlMode string) (*sql.DB, error) {
dbDSN := ToDSN(host, port, user, psw, sqlMode)
db, err := sql.Open("mysql", dbDSN)
if err != nil {
return nil, errors.Trace(err)
Expand Down
16 changes: 14 additions & 2 deletions lightning/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,8 @@ type MydumperRuntime struct {
}

type TikvImporter struct {
Addr string `toml:"addr" json:"addr"`
Addr string `toml:"addr" json:"addr"`
Backend string `toml:"backend" json:"backend"`
}

type Checkpoint struct {
Expand Down Expand Up @@ -181,6 +182,9 @@ func NewConfig() *Config {
Delimiter: `"`,
},
},
TikvImporter: TikvImporter{
Backend: "importer",
kennytm marked this conversation as resolved.
Show resolved Hide resolved
},
PostRestore: PostRestore{
Checksum: true,
},
Expand All @@ -200,6 +204,7 @@ func (cfg *Config) LoadFromGlobal(global *GlobalConfig) error {
cfg.TiDB.PdAddr = global.TiDB.PdAddr
cfg.Mydumper.SourceDir = global.Mydumper.SourceDir
cfg.TikvImporter.Addr = global.TikvImporter.Addr
cfg.TikvImporter.Backend = global.TikvImporter.Backend

return nil
}
Expand Down Expand Up @@ -290,6 +295,13 @@ func (cfg *Config) Adjust() error {
}
}

cfg.TikvImporter.Backend = strings.ToLower(cfg.TikvImporter.Backend)
switch cfg.TikvImporter.Backend {
case "mysql", "importer":
default:
return errors.New("invalid config: unsupported `tikv-importer.backend`")
kennytm marked this conversation as resolved.
Show resolved Hide resolved
}

var err error
cfg.TiDB.SQLMode, err = mysql.GetSQLMode(cfg.TiDB.StrSQLMode)
if err != nil {
Expand Down Expand Up @@ -358,7 +370,7 @@ func (cfg *Config) Adjust() error {
if len(cfg.Checkpoint.DSN) == 0 {
switch cfg.Checkpoint.Driver {
case "mysql":
cfg.Checkpoint.DSN = common.ToDSN(cfg.TiDB.Host, cfg.TiDB.Port, cfg.TiDB.User, cfg.TiDB.Psw)
cfg.Checkpoint.DSN = common.ToDSN(cfg.TiDB.Host, cfg.TiDB.Port, cfg.TiDB.User, cfg.TiDB.Psw, mysql.DefaultSQLMode)
case "file":
cfg.Checkpoint.DSN = "/tmp/" + cfg.Checkpoint.Schema + ".pb"
}
Expand Down
13 changes: 11 additions & 2 deletions lightning/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"testing"

. "github.com/pingcap/check"
"github.com/pingcap/parser/mysql"

"github.com/pingcap/tidb-lightning/lightning/config"
)

Expand Down Expand Up @@ -109,6 +111,13 @@ func (s *configTestSuite) TestAdjustConnectRefused(c *C) {
c.Assert(err, ErrorMatches, "cannot fetch settings from TiDB.*")
}

func (s *configTestSuite) TestAdjustInvalidBackend(c *C) {
cfg := config.NewConfig()
cfg.TikvImporter.Backend = "no_such_backend"
err := cfg.Adjust()
c.Assert(err, ErrorMatches, "invalid config: unsupported `tikv-importer\\.backend`")
}

func (s *configTestSuite) TestDecodeError(c *C) {
ts, host, port := startMockServer(c, http.StatusOK, "invalid-string")
defer ts.Close()
Expand Down Expand Up @@ -381,15 +390,15 @@ func (s *configTestSuite) TestLoadConfig(c *C) {
taskCfg.Checkpoint.Driver = "mysql"
err = taskCfg.Adjust()
c.Assert(err, IsNil)
c.Assert(taskCfg.Checkpoint.DSN, Equals, "guest:@tcp(172.16.30.11:4001)/?charset=utf8")
c.Assert(taskCfg.Checkpoint.DSN, Equals, "guest:@tcp(172.16.30.11:4001)/?charset=utf8&sql_mode='"+mysql.DefaultSQLMode+"'")

result := taskCfg.String()
c.Assert(result, Matches, `.*"pd-addr":"172.16.30.11:2379,172.16.30.12:2379".*`)
}

func (s *configTestSuite) TestLoadFromInvalidConfig(c *C) {
taskCfg := config.NewConfig()
err := taskCfg.LoadFromGlobal(&config.GlobalConfig {
err := taskCfg.LoadFromGlobal(&config.GlobalConfig{
ConfigFileContent: []byte("invalid toml"),
})
c.Assert(err, ErrorMatches, "Near line 1.*")
Expand Down
10 changes: 9 additions & 1 deletion lightning/config/global.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ type GlobalMydumper struct {
}

type GlobalImporter struct {
Addr string `toml:"addr" json:"addr"`
Addr string `toml:"addr" json:"addr"`
Backend string `toml:"backend" json:"backend"`
}

type GlobalConfig struct {
Expand All @@ -72,6 +73,9 @@ func NewGlobalConfig() *GlobalConfig {
StatusPort: 10080,
LogLevel: "error",
},
TikvImporter: GlobalImporter{
Backend: "importer",
},
}
}

Expand Down Expand Up @@ -111,6 +115,7 @@ func LoadGlobalConfig(args []string, extraFlags func(*flag.FlagSet)) (*GlobalCon
pdAddr := fs.String("pd-urls", "", "PD endpoint address")
dataSrcPath := fs.String("d", "", "Directory of the dump to import")
importerAddr := fs.String("importer", "", "address (host:port) to connect to tikv-importer")
backend := fs.String("backend", "", `delivery backend ("importer" or "mysql")`)

statusAddr := fs.String("status-addr", "", "the Lightning server address")
serverMode := fs.Bool("server-mode", false, "start Lightning in server mode, wait for multiple tasks instead of starting immediately")
Expand Down Expand Up @@ -171,6 +176,9 @@ func LoadGlobalConfig(args []string, extraFlags func(*flag.FlagSet)) (*GlobalCon
if *statusAddr != "" {
cfg.App.StatusAddr = *statusAddr
}
if *backend != "" {
cfg.TikvImporter.Backend = *backend
}
if cfg.App.StatusAddr == "" && cfg.App.PProfPort != 0 {
cfg.App.StatusAddr = fmt.Sprintf(":%d", cfg.App.PProfPort)
}
Expand Down
9 changes: 9 additions & 0 deletions lightning/kv/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,11 @@ type AbstractBackend interface {
// value will be used in `Rows.SplitIntoChunks`.
MaxChunkSize() int

// ShouldPostProcess returns whether KV-specific post-processing should be
// performed for this backend. Post-processing includes checksum, adjusting
// auto-increment ID, and analyze.
ShouldPostProcess() bool

// NewEncoder creates an encoder of a TiDB table.
NewEncoder(tbl table.Table, sqlMode mysql.SQLMode) Encoder

Expand Down Expand Up @@ -162,6 +167,10 @@ func (be Backend) NewEncoder(tbl table.Table, sqlMode mysql.SQLMode) Encoder {
return be.abstract.NewEncoder(tbl, sqlMode)
}

func (be Backend) ShouldPostProcess() bool {
return be.abstract.ShouldPostProcess()
}

// OpenEngine opens an engine with the given table name and engine ID.
func (be Backend) OpenEngine(ctx context.Context, tableName string, engineID int32) (*OpenedEngine, error) {
tag := makeTag(tableName, engineID)
Expand Down
4 changes: 4 additions & 0 deletions lightning/kv/importer.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ func (*importer) MaxChunkSize() int {
return 31 << 10
}

func (*importer) ShouldPostProcess() bool {
return true
}

// isIgnorableOpenCloseEngineError checks if the error from
// OpenEngine/CloseEngine can be safely ignored.
func isIgnorableOpenCloseEngineError(err error) bool {
Expand Down
Loading