Skip to content
This repository has been archived by the owner on Aug 21, 2023. It is now read-only.

Commit

Permalink
*: cherry-pick recent PRs for dumpling v5.0.1 (#271)
Browse files Browse the repository at this point in the history
  • Loading branch information
lichunzhu authored Apr 16, 2021
1 parent 463e4cc commit 4cb1157
Show file tree
Hide file tree
Showing 11 changed files with 313 additions and 49 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ require (
github.com/pingcap/tidb-tools v4.0.9-0.20201127090955-2707c97b3853+incompatible
github.com/prometheus/client_golang v1.5.1
github.com/prometheus/client_model v0.2.0
github.com/siddontang/go-mysql v0.0.0-20200222075837-12e89848f047
github.com/soheilhy/cmux v0.1.4
github.com/spf13/pflag v1.0.5
github.com/syndtr/goleveldb v1.0.1-0.20190625010220-02440ea7a285 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -766,6 +766,7 @@ github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR
github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/sasha-s/go-deadlock v0.2.0/go.mod h1:StQn567HiB1fF2yJ44N9au7wOhrPS3iZqiDbRupzT10=
github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b h1:gQZ0qzfKHQIybLANtM3mBXNUtOfsCFXeTsnBqCsx1KM=
github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
github.com/segmentio/kafka-go v0.1.0/go.mod h1:X6itGqS9L4jDletMsxZ7Dz+JFWxM6JHfPOCvTvk+EJo=
github.com/segmentio/kafka-go v0.2.0/go.mod h1:X6itGqS9L4jDletMsxZ7Dz+JFWxM6JHfPOCvTvk+EJo=
Expand All @@ -789,9 +790,12 @@ github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeV
github.com/shurcooL/vfsgen v0.0.0-20181020040650-a97a25d856ca/go.mod h1:TrYk7fJVaAttu97ZZKrO9UbRa8izdowaMIZcxYMbVaw=
github.com/shurcooL/vfsgen v0.0.0-20181202132449-6a9ea43bcacd h1:ug7PpSOB5RBPK1Kg6qskGBoP3Vnj/aNYFTznWvlkGo0=
github.com/shurcooL/vfsgen v0.0.0-20181202132449-6a9ea43bcacd/go.mod h1:TrYk7fJVaAttu97ZZKrO9UbRa8izdowaMIZcxYMbVaw=
github.com/siddontang/go v0.0.0-20180604090527-bdc77568d726 h1:xT+JlYxNGqyT+XcU8iUrN18JYed2TvG9yN5ULG2jATM=
github.com/siddontang/go v0.0.0-20180604090527-bdc77568d726/go.mod h1:3yhqj7WBBfRhbBlzyOC3gUxftwsU0u8gqevxwIHQpMw=
github.com/siddontang/go-log v0.0.0-20180807004314-8d05993dda07/go.mod h1:yFdBgwXP24JziuRl2NMUahT7nGLNOKi1SIiFxMttVD4=
github.com/siddontang/go-log v0.0.0-20190221022429-1e957dd83bed h1:KMgQoLJGCq1IoZpLZE3AIffh9veYWoVlsvA4ib55TMM=
github.com/siddontang/go-log v0.0.0-20190221022429-1e957dd83bed/go.mod h1:yFdBgwXP24JziuRl2NMUahT7nGLNOKi1SIiFxMttVD4=
github.com/siddontang/go-mysql v0.0.0-20200222075837-12e89848f047 h1:boyJ8EgQN/aC3grvx8QUoJrptt7RvneezSJSCbW25a4=
github.com/siddontang/go-mysql v0.0.0-20200222075837-12e89848f047/go.mod h1:+W4RCzesQDI11HvIkaDjS8yM36SpAnGNQ7jmTLn5BnU=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.3.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
Expand Down
51 changes: 48 additions & 3 deletions tests/basic/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ cnt=$(grep -w "(.*)" ${DUMPLING_OUTPUT_DIR}/${DB_NAME}.${TABLE_NAME}.000000000.s
echo "records count is ${cnt}"
[ "$cnt" = 2 ]

# make sure that dumpling log contains version infomation
## make sure that dumpling log contains version infomation
cnt=$(grep -w "Welcome to dumpling.*Release Version.*Git Commit Hash.*Go Version" ${DUMPLING_OUTPUT_DIR}/dumpling.log|wc -l)
echo "version info count is ${cnt}"
[ "$cnt" = 1 ]
Expand Down Expand Up @@ -50,6 +50,7 @@ echo "expected 1 return error when specifying --filetype sql and --sql, actual $
[ "$actual" = 1 ]

export DUMPLING_TEST_PORT=4000

# Test for --sql option.
run_sql "drop database if exists \`$DB_NAME\`;"
run_sql "create database \`$DB_NAME\`;"
Expand Down Expand Up @@ -81,12 +82,56 @@ set +e
run_dumpling --sql "test" > ${DUMPLING_OUTPUT_DIR}/dumpling.log 2> ${DUMPLING_OUTPUT_DIR}/dumpling.err
set -e

# check stderr, should not contain panic info
## check stderr, should not contain panic info
actual=$(grep -w "panic" ${DUMPLING_OUTPUT_DIR}/dumpling.err|wc -l)
echo "expected panic 0, actual ${actual}"
[ "$actual" = 0 ]

# check stdout, should contain mysql error log
## check stdout, should contain mysql error log
actual=$(grep -w "Error 1064: You have an error in your SQL syntax" ${DUMPLING_OUTPUT_DIR}/dumpling.log|wc -l)
echo "expect contain Error 1064, actual ${actual}"
[ "$actual" -ge 1 ]

# TODO: Enable this after we use tidb cluster instead of mock tidb in interagtion test
## Test for snapshot configuration
#run_sql "drop database if exists \`$DB_NAME\`;"
#run_sql "create database \`$DB_NAME\`;"
#run_sql "create table \`$DB_NAME\`.\`$TABLE_NAME\` (a int);"
#run_sql "insert into \`$DB_NAME\`.\`$TABLE_NAME\` values (1);"
#
#snapshot=$(run_sql "show master status" | grep "Position" | sed 's/.*Position: \([0-9]*\).*/\1/g')
#echo "snapshot #1 is ${snapshot}"
#run_sql "insert into \`$DB_NAME\`.\`$TABLE_NAME\` values (2);"
#run_dumpling -f "$DB_NAME.$TABLE_NAME" -L ${DUMPLING_OUTPUT_DIR}/dumpling.log --snapshot $snapshot
#cnt=$(grep -w "(.*)" ${DUMPLING_OUTPUT_DIR}/${DB_NAME}.${TABLE_NAME}.000000000.sql|wc -l)
#echo "records count is ${cnt}"
#[ "$cnt" = 1 ]
#
#snapshot=$(run_sql "select now()" | grep "now()" | sed 's/.*now(): \(.*\)/\1/g')
#echo "snapshot #2 is ${snapshot}"
#run_sql "insert into \`$DB_NAME\`.\`$TABLE_NAME\` values (3);"
#run_dumpling -f "$DB_NAME.$TABLE_NAME" -L ${DUMPLING_OUTPUT_DIR}/dumpling.log --snapshot $snapshot
#cnt=$(grep -w "(.*)" ${DUMPLING_OUTPUT_DIR}/${DB_NAME}.${TABLE_NAME}.000000000.sql|wc -l)
#echo "records count is ${cnt}"
#[ "$cnt" = 2 ]
#
## Test for params configuration
#snapshot=$(run_sql "select now()" | grep "now()" | sed 's/.*now(): \(.*\)/\1/g')
#echo "snapshot #3 is ${snapshot}"
#run_sql "insert into \`$DB_NAME\`.\`$TABLE_NAME\` values (4);"
#run_dumpling -f "$DB_NAME.$TABLE_NAME" -L ${DUMPLING_OUTPUT_DIR}/dumpling.log --params "net_read_timeout=86400,interactive_timeout=28800,wait_timeout=2147483,net_write_timeout=86400,tidb_snapshot='$snapshot'"
#cnt=$(grep -w "(.*)" ${DUMPLING_OUTPUT_DIR}/${DB_NAME}.${TABLE_NAME}.000000000.sql|wc -l)
#echo "records count is ${cnt}"
#[ "$cnt" = 3 ]
#
#run_sql "insert into \`$DB_NAME\`.\`$TABLE_NAME\` values (3);"

# Test for params configuration
run_sql "drop database if exists \`$DB_NAME\`;"
run_sql "create database \`$DB_NAME\`;"
run_sql "create table \`$DB_NAME\`.\`$TABLE_NAME\` (a timestamp);"
run_sql "set time_zone='+08:00'; insert into \`$DB_NAME\`.\`$TABLE_NAME\` values ('2020-11-01 00:00:00');"
run_dumpling -f "$DB_NAME.$TABLE_NAME" -L ${DUMPLING_OUTPUT_DIR}/dumpling.log --params "net_read_timeout=86400,interactive_timeout=28800,wait_timeout=2147483,net_write_timeout=86400,time_zone=+00:00"
cnt=$(grep -w "2020-10-31 16:00:00" ${DUMPLING_OUTPUT_DIR}/${DB_NAME}.${TABLE_NAME}.000000000.sql|wc -l)
echo "records count is ${cnt}"
[ "$cnt" = 1 ]
21 changes: 18 additions & 3 deletions v4/export/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/pingcap/br/pkg/summary"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
pclog "github.com/pingcap/log"
pd "github.com/tikv/pd/client"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -212,6 +213,12 @@ func (d *Dumper) Dump() (dumpErr error) {
}
})

// get estimate total count
err = d.getEstimateTotalRowsCount(tctx, metaConn)
if err != nil {
tctx.L().Error("fail to get estimate total count", zap.Error(err))
}

if conf.SQL == "" {
if err = d.dumpDatabases(writerCtx, metaConn, taskChan); err != nil && !errors.ErrorEqual(err, context.Canceled) {
return err
Expand Down Expand Up @@ -610,6 +617,9 @@ func selectTiDBTableSample(conn *sql.Conn, dbName, tableName string) (pkFields [
if hasImplicitRowID {
pkFields, pkColTypes = []string{"_tidb_rowid"}, []string{"BIGINT"}
}
if len(pkFields) == 0 {
return pkFields, pkVals, nil
}
query := buildTiDBTableSampleQuery(pkFields, dbName, tableName)
rows, err := conn.QueryContext(context.Background(), query)
if err != nil {
Expand Down Expand Up @@ -757,19 +767,24 @@ func runSteps(d *Dumper, steps ...func(*Dumper) error) error {

func initLogger(d *Dumper) error {
conf := d.conf
var logger log.Logger
var (
logger log.Logger
err error
props *pclog.ZapProperties
)
// conf.Logger != nil means dumpling is used as a library
if conf.Logger != nil {
logger = log.NewAppLogger(conf.Logger)
} else {
var err error
logger, err = log.InitAppLogger(&log.Config{
logger, props, err = log.InitAppLogger(&log.Config{
Level: conf.LogLevel,
File: conf.LogFile,
Format: conf.LogFormat,
})
if err != nil {
return errors.Trace(err)
}
pclog.ReplaceGlobals(logger.Logger, props)
cli.LogLongVersion(logger)
}
d.tctx = d.tctx.WithLogger(logger)
Expand Down
2 changes: 1 addition & 1 deletion v4/export/dump_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func (s *testSQLSuite) TestDumpBlock(c *C) {
WillReturnRows(sqlmock.NewRows([]string{"Database", "Create Database"}).
AddRow("test", "CREATE DATABASE `test` /*!40100 DEFAULT CHARACTER SET utf8mb4 */"))

tctx, cancel := tcontext.Background().WithCancel()
tctx, cancel := tcontext.Background().WithLogger(appLogger).WithCancel()
defer cancel()
conn, err := db.Conn(tctx)
c.Assert(err, IsNil)
Expand Down
10 changes: 10 additions & 0 deletions v4/export/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ var (
finishedSizeCounter *prometheus.CounterVec
finishedRowsCounter *prometheus.CounterVec
finishedTablesCounter *prometheus.CounterVec
estimateTotalRowsCounter *prometheus.CounterVec
writeTimeHistogram *prometheus.HistogramVec
receiveWriteChunkTimeHistogram *prometheus.HistogramVec
errorCount *prometheus.CounterVec
Expand All @@ -33,6 +34,13 @@ func InitMetricsVector(labels prometheus.Labels) {
Name: "finished_size",
Help: "counter for dumpling finished file size",
}, labelNames)
estimateTotalRowsCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "dumpling",
Subsystem: "dump",
Name: "estimate_total_rows",
Help: "estimate total rows for dumpling tables",
}, labelNames)
finishedRowsCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "dumpling",
Expand Down Expand Up @@ -86,6 +94,7 @@ func RegisterMetrics(registry *prometheus.Registry) {
}
registry.MustRegister(finishedSizeCounter)
registry.MustRegister(finishedRowsCounter)
registry.MustRegister(estimateTotalRowsCounter)
registry.MustRegister(finishedTablesCounter)
registry.MustRegister(writeTimeHistogram)
registry.MustRegister(receiveWriteChunkTimeHistogram)
Expand All @@ -100,6 +109,7 @@ func RemoveLabelValuesWithTaskInMetrics(labels prometheus.Labels) {
}
finishedSizeCounter.Delete(labels)
finishedRowsCounter.Delete(labels)
estimateTotalRowsCounter.Delete(labels)
finishedTablesCounter.Delete(labels)
writeTimeHistogram.Delete(labels)
receiveWriteChunkTimeHistogram.Delete(labels)
Expand Down
23 changes: 20 additions & 3 deletions v4/export/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,8 +514,20 @@ func isUnknownSystemVariableErr(err error) bool {
func resetDBWithSessionParams(tctx *tcontext.Context, db *sql.DB, dsn string, params map[string]interface{}) (*sql.DB, error) {
support := make(map[string]interface{})
for k, v := range params {
var pv interface{}
if str, ok := v.(string); ok {
if pvi, err := strconv.ParseInt(str, 10, 64); err == nil {
pv = pvi
} else if pvf, err := strconv.ParseFloat(str, 64); err == nil {
pv = pvf
} else {
pv = str
}
} else {
pv = v
}
s := fmt.Sprintf("SET SESSION %s = ?", k)
_, err := db.ExecContext(tctx, s, v)
_, err := db.ExecContext(tctx, s, pv)
if err != nil {
if isUnknownSystemVariableErr(err) {
tctx.L().Info("session variable is not supported by db", zap.String("variable", k), zap.Reflect("value", v))
Expand All @@ -524,7 +536,7 @@ func resetDBWithSessionParams(tctx *tcontext.Context, db *sql.DB, dsn string, pa
return nil, errors.Trace(err)
}

support[k] = v
support[k] = pv
}

for k, v := range support {
Expand Down Expand Up @@ -837,7 +849,12 @@ func pickupPossibleField(dbName, tableName string, db *sql.Conn, conf *Config) (
}

func estimateCount(tctx *tcontext.Context, dbName, tableName string, db *sql.Conn, field string, conf *Config) uint64 {
query := fmt.Sprintf("EXPLAIN SELECT `%s` FROM `%s`.`%s`", escapeString(field), escapeString(dbName), escapeString(tableName))
var query string
if strings.TrimSpace(field) == "*" || strings.TrimSpace(field) == "" {
query = fmt.Sprintf("EXPLAIN SELECT * FROM `%s`.`%s`", escapeString(dbName), escapeString(tableName))
} else {
query = fmt.Sprintf("EXPLAIN SELECT `%s` FROM `%s`.`%s`", escapeString(field), escapeString(dbName), escapeString(tableName))
}

if conf.Where != "" {
query += " WHERE "
Expand Down
Loading

0 comments on commit 4cb1157

Please sign in to comment.