From 605760d1b2025d1e1a8b7d0c668c74863d7d1271 Mon Sep 17 00:00:00 2001 From: kennytm Date: Wed, 20 Nov 2019 16:25:53 +0800 Subject: [PATCH] config: adding `[tidb] max-allowed-packet` config (#248) this allows sending rows > 4 MiB when using TiDB backend --- lightning/common/util.go | 8 ++++---- lightning/common/util_test.go | 4 ++-- lightning/config/config.go | 14 ++++++++------ lightning/config/config_test.go | 2 +- lightning/config/const.go | 2 ++ lightning/restore/tidb.go | 2 +- tidb-lightning.toml | 4 ++++ 7 files changed, 22 insertions(+), 14 deletions(-) diff --git a/lightning/common/util.go b/lightning/common/util.go index 5624f81b6..6b69a8414 100644 --- a/lightning/common/util.go +++ b/lightning/common/util.go @@ -46,12 +46,12 @@ const ( defaultMaxRetry = 3 ) -func ToDSN(host string, port int, user string, psw string, sqlMode string) string { - return fmt.Sprintf("%s:%s@tcp(%s:%d)/?charset=utf8&sql_mode='%s'", user, psw, host, port, sqlMode) +func ToDSN(host string, port int, user string, psw string, sqlMode string, maxAllowedPacket uint64) string { + return fmt.Sprintf("%s:%s@tcp(%s:%d)/?charset=utf8&sql_mode='%s'&maxAllowedPacket=%d", user, psw, host, port, sqlMode, maxAllowedPacket) } -func ConnectDB(host string, port int, user string, psw string, sqlMode string) (*sql.DB, error) { - dbDSN := ToDSN(host, port, user, psw, sqlMode) +func ConnectDB(host string, port int, user string, psw string, sqlMode string, maxAllowedPacket uint64) (*sql.DB, error) { + dbDSN := ToDSN(host, port, user, psw, sqlMode, maxAllowedPacket) db, err := sql.Open("mysql", dbDSN) if err != nil { return nil, errors.Trace(err) diff --git a/lightning/common/util_test.go b/lightning/common/util_test.go index 33f8dfeec..f848f4534 100644 --- a/lightning/common/util_test.go +++ b/lightning/common/util_test.go @@ -121,8 +121,8 @@ func (s *utilSuite) TestIsRetryableError(c *C) { } func (s *utilSuite) TestToDSN(c *C) { - dsn := common.ToDSN("127.0.0.1", 4000, "root", "123456", "strict") - c.Assert(dsn, Equals, "root:123456@tcp(127.0.0.1:4000)/?charset=utf8&sql_mode='strict'") + dsn := common.ToDSN("127.0.0.1", 4000, "root", "123456", "strict", 1234) + c.Assert(dsn, Equals, "root:123456@tcp(127.0.0.1:4000)/?charset=utf8&sql_mode='strict'&maxAllowedPacket=1234") } func (s *utilSuite) TestIsContextCanceledError(c *C) { diff --git a/lightning/config/config.go b/lightning/config/config.go index 47eb2cdfd..181367609 100644 --- a/lightning/config/config.go +++ b/lightning/config/config.go @@ -67,7 +67,8 @@ type DBStore struct { PdAddr string `toml:"pd-addr" json:"pd-addr"` StrSQLMode string `toml:"sql-mode" json:"sql-mode"` - SQLMode mysql.SQLMode `toml:"-" json:"-"` + SQLMode mysql.SQLMode `toml:"-" json:"-"` + MaxAllowedPacket uint64 `toml:"max-allowed-packet" json:"max-allowed-packet"` DistSQLScanConcurrency int `toml:"distsql-scan-concurrency" json:"distsql-scan-concurrency"` BuildStatsConcurrency int `toml:"build-stats-concurrency" json:"build-stats-concurrency"` @@ -136,9 +137,9 @@ type MydumperRuntime struct { } type TikvImporter struct { - Addr string `toml:"addr" json:"addr"` - Backend string `toml:"backend" json:"backend"` - OnDuplicate string `toml:"on-duplicate" json:"on-duplicate"` + Addr string `toml:"addr" json:"addr"` + Backend string `toml:"backend" json:"backend"` + OnDuplicate string `toml:"on-duplicate" json:"on-duplicate"` } type Checkpoint struct { @@ -184,6 +185,7 @@ func NewConfig() *Config { User: "root", StatusPort: 10080, StrSQLMode: mysql.DefaultSQLMode, + MaxAllowedPacket: defaultMaxAllowedPacket, BuildStatsConcurrency: 20, DistSQLScanConcurrency: 100, IndexSerialScanConcurrency: 20, @@ -201,7 +203,7 @@ func NewConfig() *Config { }, }, TikvImporter: TikvImporter{ - Backend: BackendImporter, + Backend: BackendImporter, OnDuplicate: ReplaceOnDup, }, PostRestore: PostRestore{ @@ -419,7 +421,7 @@ func (cfg *Config) Adjust() error { if len(cfg.Checkpoint.DSN) == 0 { switch cfg.Checkpoint.Driver { case CheckpointDriverMySQL: - cfg.Checkpoint.DSN = common.ToDSN(cfg.TiDB.Host, cfg.TiDB.Port, cfg.TiDB.User, cfg.TiDB.Psw, mysql.DefaultSQLMode) + cfg.Checkpoint.DSN = common.ToDSN(cfg.TiDB.Host, cfg.TiDB.Port, cfg.TiDB.User, cfg.TiDB.Psw, mysql.DefaultSQLMode, defaultMaxAllowedPacket) case CheckpointDriverFile: cfg.Checkpoint.DSN = "/tmp/" + cfg.Checkpoint.Schema + ".pb" } diff --git a/lightning/config/config_test.go b/lightning/config/config_test.go index f3bb44f18..3c69357e2 100644 --- a/lightning/config/config_test.go +++ b/lightning/config/config_test.go @@ -391,7 +391,7 @@ func (s *configTestSuite) TestLoadConfig(c *C) { taskCfg.Checkpoint.Driver = config.CheckpointDriverMySQL err = taskCfg.Adjust() c.Assert(err, IsNil) - c.Assert(taskCfg.Checkpoint.DSN, Equals, "guest:@tcp(172.16.30.11:4001)/?charset=utf8&sql_mode='"+mysql.DefaultSQLMode+"'") + c.Assert(taskCfg.Checkpoint.DSN, Equals, "guest:@tcp(172.16.30.11:4001)/?charset=utf8&sql_mode='"+mysql.DefaultSQLMode+"'&maxAllowedPacket=67108864") result := taskCfg.String() c.Assert(result, Matches, `.*"pd-addr":"172.16.30.11:2379,172.16.30.12:2379".*`) diff --git a/lightning/config/const.go b/lightning/config/const.go index eb55d6d14..5f7541458 100644 --- a/lightning/config/const.go +++ b/lightning/config/const.go @@ -23,4 +23,6 @@ const ( MinRegionSize int64 = 256 * _M BufferSizeScale = 5 + + defaultMaxAllowedPacket = 64 * 1024 * 1024 ) diff --git a/lightning/restore/tidb.go b/lightning/restore/tidb.go index bf7311bab..627fd99e5 100644 --- a/lightning/restore/tidb.go +++ b/lightning/restore/tidb.go @@ -44,7 +44,7 @@ type TiDBManager struct { } func NewTiDBManager(dsn config.DBStore) (*TiDBManager, error) { - db, err := common.ConnectDB(dsn.Host, dsn.Port, dsn.User, dsn.Psw, dsn.StrSQLMode) + db, err := common.ConnectDB(dsn.Host, dsn.Port, dsn.User, dsn.Psw, dsn.StrSQLMode, dsn.MaxAllowedPacket) if err != nil { return nil, errors.Trace(err) } diff --git a/tidb-lightning.toml b/tidb-lightning.toml index 45f81ef2a..665312fe7 100644 --- a/tidb-lightning.toml +++ b/tidb-lightning.toml @@ -133,6 +133,10 @@ pd-addr = "127.0.0.1:2379" # lightning uses some code of tidb(used as library), and the flag controls it's log level. log-level = "error" +# sets maximum packet size allowed for SQL connections. +# set this to 0 to automatically fetch the `max_allowed_packet` variable from server on every connection. +# max-allowed-packet = 67_108_864 + # set tidb session variables to speed up checksum/analyze table. # see https://pingcap.com/docs/sql/statistics/#control-analyze-concurrency for the meaning of each setting build-stats-concurrency = 20