Skip to content

Commit

Permalink
diff: add unit test && make importer a pkg (#133)
Browse files Browse the repository at this point in the history
  • Loading branch information
WangXiangUSTC authored Dec 3, 2018
1 parent 41116b5 commit 442a71d
Show file tree
Hide file tree
Showing 13 changed files with 379 additions and 77 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ FILES := $$(find . -name '*.go' -type f | grep -vE 'vendor')
VENDOR_TIDB := vendor/github.com/pingcap/tidb


build: prepare check test importer checker dump_region binlogctl sync_diff_inspector ddl_checker finish
build: prepare check importer checker dump_region binlogctl sync_diff_inspector ddl_checker finish

prepare:
cp go.mod1 go.mod
Expand Down
40 changes: 5 additions & 35 deletions importer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@ import (

"github.com/BurntSushi/toml"
"github.com/pingcap/errors"
"github.com/pingcap/tidb-tools/pkg/importer"
"github.com/pingcap/tidb-tools/pkg/utils"
)

// NewConfig creates a new config.
func NewConfig() *Config {
cfg := &Config{}
cfg.Config = &importer.Config{}
cfg.FlagSet = flag.NewFlagSet("importer", flag.ContinueOnError)
fs := cfg.FlagSet

Expand All @@ -40,7 +42,7 @@ func NewConfig() *Config {
fs.StringVar(&cfg.DBCfg.Host, "h", "127.0.0.1", "set the database host ip")
fs.StringVar(&cfg.DBCfg.User, "u", "root", "set the database user")
fs.StringVar(&cfg.DBCfg.Password, "p", "", "set the database password")
fs.StringVar(&cfg.DBCfg.Name, "D", "test", "set the database name")
fs.StringVar(&cfg.DBCfg.Schema, "D", "test", "set the database name")
fs.IntVar(&cfg.DBCfg.Port, "P", 3306, "set the database host port")

fs.StringVar(&cfg.LogLevel, "L", "info", "log level: debug, info, warn, error, fatal")
Expand All @@ -49,43 +51,11 @@ func NewConfig() *Config {
return cfg
}

// DBConfig is the DB configuration.
type DBConfig struct {
Host string `toml:"host" json:"host"`

User string `toml:"user" json:"user"`

Password string `toml:"password" json:"password"`

Name string `toml:"name" json:"name"`

Port int `toml:"port" json:"port"`
}

func (c *DBConfig) String() string {
if c == nil {
return "<nil>"
}
return fmt.Sprintf("DBConfig(%+v)", *c)
}

// Config is the configuration.
type Config struct {
*flag.FlagSet `json:"-"`

TableSQL string `toml:"table-sql" json:"table-sql"`

IndexSQL string `toml:"index-sql" json:"index-sql"`

LogLevel string `toml:"log-level" json:"log-level"`
*importer.Config

WorkerCount int `toml:"worker-count" json:"worker-count"`

JobCount int `toml:"job-count" json:"job-count"`

Batch int `toml:"batch" json:"batch"`

DBCfg DBConfig `toml:"db" json:"db"`
*flag.FlagSet `json:"-"`

printVersion bool
configFile string
Expand Down
36 changes: 10 additions & 26 deletions importer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"os"

"github.com/pingcap/errors"
"github.com/pingcap/tidb-tools/pkg/importer"
log "github.com/sirupsen/logrus"
)

Expand All @@ -33,32 +34,15 @@ func main() {
os.Exit(2)
}

table := newTable()
err = parseTableSQL(table, cfg.TableSQL)
if err != nil {
log.Fatal(err)
importerCfg := &importer.Config{
TableSQL: cfg.TableSQL,
IndexSQL: cfg.IndexSQL,
LogLevel: cfg.LogLevel,
WorkerCount: cfg.WorkerCount,
JobCount: cfg.JobCount,
Batch: cfg.Batch,
DBCfg: cfg.DBCfg,
}

err = parseIndexSQL(table, cfg.IndexSQL)
if err != nil {
log.Fatal(err)
}

dbs, err := createDBs(cfg.DBCfg, cfg.WorkerCount)
if err != nil {
log.Fatal(err)
}
defer closeDBs(dbs)

err = execSQL(dbs[0], cfg.TableSQL)
if err != nil {
log.Fatal(err)
}

err = execSQL(dbs[0], cfg.IndexSQL)
if err != nil {
log.Fatal(err)
}

doProcess(table, dbs, cfg.JobCount, cfg.WorkerCount, cfg.Batch)
importer.DoProcess(importerCfg)
}
26 changes: 26 additions & 0 deletions pkg/dbutil/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"database/sql"
"fmt"
"os"
"strconv"
"strings"

Expand Down Expand Up @@ -59,6 +60,31 @@ func (c *DBConfig) String() string {
return fmt.Sprintf("DBConfig(%+v)", *c)
}

// GetDBConfigFromEnv returns DBConfig from environment
func GetDBConfigFromEnv(schema string) DBConfig {
host := os.Getenv("MYSQL_HOST")
if host == "" {
host = "127.0.0.1"
}
port, _ := strconv.Atoi(os.Getenv("MYSQL_PORT"))
if port == 0 {
port = 3306
}
user := os.Getenv("MYSQL_USER")
if user == "" {
user = "root"
}
pswd := os.Getenv("MYSQL_PSWD")

return DBConfig{
Host: host,
Port: port,
User: user,
Password: pswd,
Schema: schema,
}
}

// OpenDB opens a mysql connection FD
func OpenDB(cfg DBConfig) (*sql.DB, error) {
dbDSN := fmt.Sprintf("%s:%s@tcp(%s:%d)/?charset=utf8mb4", cfg.User, cfg.Password, cfg.Host, cfg.Port)
Expand Down
26 changes: 24 additions & 2 deletions pkg/diff/diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ type TableDiff struct {

// Equal tests whether two database have same data and schema.
func (t *TableDiff) Equal(ctx context.Context, writeFixSQL func(string) error) (bool, bool, error) {
t.adjustConfig()

t.sqlCh = make(chan string)
t.wg.Add(1)
go func() {
Expand Down Expand Up @@ -142,6 +144,23 @@ func (t *TableDiff) CheckTableStruct(ctx context.Context) (bool, error) {
return true, nil
}

func (t *TableDiff) adjustConfig() {
if t.ChunkSize <= 0 {
t.ChunkSize = 100
}

if len(t.Range) == 0 {
t.Range = "true"
}
if t.Sample <= 0 {
t.Sample = 100
}

if t.CheckThreadCount <= 0 {
t.CheckThreadCount = 4
}
}

// CheckTableData checks table's data
func (t *TableDiff) CheckTableData(ctx context.Context) (bool, error) {
return t.EqualTableData(ctx)
Expand All @@ -157,6 +176,9 @@ func (t *TableDiff) EqualTableData(ctx context.Context) (bool, error) {
checkNums := len(allJobs) * t.Sample / 100
checkNumArr := getRandomN(len(allJobs), checkNums)
log.Infof("total has %d check jobs, check %d of them", len(allJobs), len(checkNumArr))
if checkNums == 0 {
return true, nil
}

checkResultCh := make(chan bool, t.CheckThreadCount)
defer close(checkResultCh)
Expand Down Expand Up @@ -419,7 +441,7 @@ func generateDML(tp string, data map[string][]byte, null map[string]bool, keys [
}

if needQuotes(col.FieldType) {
values = append(values, fmt.Sprintf("\"%s\"", string(data[col.Name.O])))
values = append(values, fmt.Sprintf("'%s'", string(data[col.Name.O])))
} else {
values = append(values, string(data[col.Name.O]))
}
Expand All @@ -435,7 +457,7 @@ func generateDML(tp string, data map[string][]byte, null map[string]bool, keys [
}

if needQuotes(col.FieldType) {
kvs = append(kvs, fmt.Sprintf("`%s` = \"%s\"", col.Name.O, string(data[col.Name.O])))
kvs = append(kvs, fmt.Sprintf("`%s` = '%s'", col.Name.O, string(data[col.Name.O])))
} else {
kvs = append(kvs, fmt.Sprintf("`%s` = %s", col.Name.O, string(data[col.Name.O])))
}
Expand Down
Loading

0 comments on commit 442a71d

Please sign in to comment.