diff --git a/Makefile b/Makefile index a3972bb4e..577ded8c4 100644 --- a/Makefile +++ b/Makefile @@ -48,6 +48,9 @@ default: clean lightning lightning-ctl checksuccess prepare: $(PREPARE_MOD) +finish-prepare: + $(FINISH_MOD) + clean: rm -f $(LIGHTNING_BIN) $(LIGHTNING_CTRL_BIN) $(FAILPOINT_CTL_BIN) $(REVIVE_BIN) $(VFSGENDEV_BIN) go.mod go.sum diff --git a/go.mod1 b/go.mod1 index 827c710b2..ec76eca90 100644 --- a/go.mod1 +++ b/go.mod1 @@ -10,6 +10,7 @@ require ( github.com/cockroachdb/pebble v0.0.0-20201023120638-f1224da22976 github.com/coreos/go-semver v0.3.0 github.com/dgraph-io/ristretto v0.0.2-0.20200115201040-8f368f2f2ab3 // indirect + github.com/docker/go-units v0.4.0 github.com/fsouza/fake-gcs-server v1.19.0 // indirect github.com/go-sql-driver/mysql v1.5.0 github.com/gogo/protobuf v1.3.1 diff --git a/lightning/config/bytesize.go b/lightning/config/bytesize.go new file mode 100644 index 000000000..d511c72c1 --- /dev/null +++ b/lightning/config/bytesize.go @@ -0,0 +1,44 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package config + +import ( + "encoding/json" + + "github.com/docker/go-units" +) + +// ByteSize is an alias of int64 which accepts human-friendly strings like +// '10G' when read from TOML. +type ByteSize int64 + +// UnmarshalText implements encoding.TextUnmarshaler +func (size *ByteSize) UnmarshalText(b []byte) error { + res, err := units.RAMInBytes(string(b)) + if err != nil { + return err + } + *size = ByteSize(res) + return nil +} + +// UnmarshalJSON implements json.Unmarshaler (for testing) +func (size *ByteSize) UnmarshalJSON(b []byte) error { + var res int64 + if err := json.Unmarshal(b, &res); err != nil { + return err + } + *size = ByteSize(res) + return nil +} diff --git a/lightning/config/bytesize_test.go b/lightning/config/bytesize_test.go new file mode 100644 index 000000000..4a834cb0b --- /dev/null +++ b/lightning/config/bytesize_test.go @@ -0,0 +1,129 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package config_test + +import ( + "encoding/json" + "strings" + + "github.com/BurntSushi/toml" + . "github.com/pingcap/check" + + "github.com/pingcap/tidb-lightning/lightning/config" +) + +type byteSizeTestSuite struct{} + +var _ = Suite(&byteSizeTestSuite{}) + +func (s *byteSizeTestSuite) TestByteSizeTOMLDecode(c *C) { + testCases := []struct { + input string + output config.ByteSize + err string + }{ + { + input: "x = 10000", + output: 10000, + }, + { + input: "x = 107_374_182_400", + output: 107_374_182_400, + }, + { + input: "x = '10k'", + output: 10 * 1024, + }, + { + input: "x = '10PiB'", + output: 10 * 1024 * 1024 * 1024 * 1024 * 1024, + }, + { + input: "x = '10 KB'", + output: 10 * 1024, + }, + { + input: "x = '32768'", + output: 32768, + }, + { + input: "x = -1", + err: "invalid size: '-1'", + }, + { + input: "x = 'invalid value'", + err: "invalid size: 'invalid value'", + }, + { + input: "x = true", + err: "invalid size: 'true'", + }, + { + input: "x = 256.0", + output: 256, + }, + { + input: "x = 256.9", + output: 256, + }, + { + input: "x = 10e+9", + output: 10_000_000_000, + }, + { + input: "x = '2.5MB'", + output: 5 * 512 * 1024, + }, + { + input: "x = 2020-01-01T00:00:00Z", + err: "invalid size: '2020-01-01T00:00:00Z'", + }, + { + input: "x = ['100000']", + err: "toml: cannot load TOML value.*", + }, + { + input: "x = { size = '100000' }", + err: "toml: cannot load TOML value.*", + }, + } + + for _, tc := range testCases { + comment := Commentf("input: `%s`", tc.input) + var output struct{ X config.ByteSize } + err := toml.Unmarshal([]byte(tc.input), &output) + if tc.err != "" { + c.Assert(err, ErrorMatches, tc.err, comment) + } else { + c.Assert(err, IsNil, comment) + c.Assert(output.X, Equals, tc.output, comment) + } + } +} + +func (s *byteSizeTestSuite) TestByteSizeTOMLAndJSONEncode(c *C) { + var input struct { + X config.ByteSize `toml:"x" json:"x"` + } + input.X = 1048576 + + var output strings.Builder + err := toml.NewEncoder(&output).Encode(input) + c.Assert(err, IsNil) + c.Assert(output.String(), Equals, "x = 1048576\n") + + js, err := json.Marshal(input) + c.Assert(err, IsNil) + c.Assert(string(js), Equals, `{"x":1048576}`) +} diff --git a/lightning/config/config.go b/lightning/config/config.go index 29337750a..2ab3c5bbc 100644 --- a/lightning/config/config.go +++ b/lightning/config/config.go @@ -220,8 +220,8 @@ type CSVConfig struct { } type MydumperRuntime struct { - ReadBlockSize int64 `toml:"read-block-size" json:"read-block-size"` - BatchSize int64 `toml:"batch-size" json:"batch-size"` + ReadBlockSize ByteSize `toml:"read-block-size" json:"read-block-size"` + BatchSize ByteSize `toml:"batch-size" json:"batch-size"` BatchImportRatio float64 `toml:"batch-import-ratio" json:"batch-import-ratio"` SourceDir string `toml:"data-source-dir" json:"data-source-dir"` NoSchema bool `toml:"no-schema" json:"no-schema"` @@ -229,7 +229,7 @@ type MydumperRuntime struct { CSV CSVConfig `toml:"csv" json:"csv"` CaseSensitive bool `toml:"case-sensitive" json:"case-sensitive"` StrictFormat bool `toml:"strict-format" json:"strict-format"` - MaxRegionSize int64 `toml:"max-region-size" json:"max-region-size"` + MaxRegionSize ByteSize `toml:"max-region-size" json:"max-region-size"` Filter []string `toml:"filter" json:"filter"` FileRouters []*FileRouteRule `toml:"files" json:"files"` DefaultFileRules bool `toml:"default-file-rules" json:"default-file-rules"` @@ -246,14 +246,14 @@ type FileRouteRule struct { } type TikvImporter struct { - Addr string `toml:"addr" json:"addr"` - Backend string `toml:"backend" json:"backend"` - OnDuplicate string `toml:"on-duplicate" json:"on-duplicate"` - MaxKVPairs int `toml:"max-kv-pairs" json:"max-kv-pairs"` - SendKVPairs int `toml:"send-kv-pairs" json:"send-kv-pairs"` - RegionSplitSize int64 `toml:"region-split-size" json:"region-split-size"` - SortedKVDir string `toml:"sorted-kv-dir" json:"sorted-kv-dir"` - RangeConcurrency int `toml:"range-concurrency" json:"range-concurrency"` + Addr string `toml:"addr" json:"addr"` + Backend string `toml:"backend" json:"backend"` + OnDuplicate string `toml:"on-duplicate" json:"on-duplicate"` + MaxKVPairs int `toml:"max-kv-pairs" json:"max-kv-pairs"` + SendKVPairs int `toml:"send-kv-pairs" json:"send-kv-pairs"` + RegionSplitSize ByteSize `toml:"region-split-size" json:"region-split-size"` + SortedKVDir string `toml:"sorted-kv-dir" json:"sorted-kv-dir"` + RangeConcurrency int `toml:"range-concurrency" json:"range-concurrency"` } type Checkpoint struct { @@ -618,7 +618,7 @@ func (cfg *Config) Adjust() error { if cfg.Mydumper.BatchSize <= 0 { // if rows in source files are not sorted by primary key(if primary is number or cluster index enabled), // the key range in each data engine may have overlap, thus a bigger engine size can somewhat alleviate it. - cfg.Mydumper.BatchSize = 100 * _G + cfg.Mydumper.BatchSize = defaultBatchSize } if cfg.Mydumper.BatchImportRatio < 0.0 || cfg.Mydumper.BatchImportRatio >= 1.0 { diff --git a/lightning/config/const.go b/lightning/config/const.go index ad7863708..240d37e27 100644 --- a/lightning/config/const.go +++ b/lightning/config/const.go @@ -13,18 +13,20 @@ package config -const ( - _K = int64(1 << 10) - _M = _K << 10 - _G = _M << 10 +import ( + "github.com/docker/go-units" +) +const ( // mydumper - ReadBlockSize int64 = 64 * _K - MinRegionSize int64 = 256 * _M - MaxRegionSize int64 = 256 * _M - SplitRegionSize int64 = 96 * _M + ReadBlockSize ByteSize = 64 * units.KiB + MinRegionSize ByteSize = 256 * units.MiB + MaxRegionSize ByteSize = 256 * units.MiB + SplitRegionSize ByteSize = 96 * units.MiB BufferSizeScale = 5 - defaultMaxAllowedPacket = 64 * 1024 * 1024 + defaultMaxAllowedPacket = 64 * units.MiB + + defaultBatchSize ByteSize = 100 * units.GiB ) diff --git a/lightning/mydump/csv_parser_test.go b/lightning/mydump/csv_parser_test.go index 144a5facb..2e42e9b72 100644 --- a/lightning/mydump/csv_parser_test.go +++ b/lightning/mydump/csv_parser_test.go @@ -140,7 +140,7 @@ func (s *testMydumpCSVParserSuite) TestTPCH(c *C) { TrimLastSep: true, } - parser := mydump.NewCSVParser(&cfg, reader, config.ReadBlockSize, s.ioWorkers, false) + parser := mydump.NewCSVParser(&cfg, reader, int64(config.ReadBlockSize), s.ioWorkers, false) c.Assert(parser.ReadRow(), IsNil) c.Assert(parser.LastRow(), DeepEquals, mydump.Row{ @@ -195,7 +195,7 @@ func (s *testMydumpCSVParserSuite) TestTPCHMultiBytes(c *C) { } reader := mydump.NewStringReader(inputStr) - parser := mydump.NewCSVParser(&cfg, reader, config.ReadBlockSize, s.ioWorkers, false) + parser := mydump.NewCSVParser(&cfg, reader, int64(config.ReadBlockSize), s.ioWorkers, false) c.Assert(parser.ReadRow(), IsNil) c.Assert(parser.LastRow(), DeepEquals, mydump.Row{ RowID: 1, @@ -230,7 +230,7 @@ func (s *testMydumpCSVParserSuite) TestRFC4180(c *C) { // example 1, trailing new lines - parser := mydump.NewCSVParser(&cfg, mydump.NewStringReader("aaa,bbb,ccc\nzzz,yyy,xxx\n"), config.ReadBlockSize, s.ioWorkers, false) + parser := mydump.NewCSVParser(&cfg, mydump.NewStringReader("aaa,bbb,ccc\nzzz,yyy,xxx\n"), int64(config.ReadBlockSize), s.ioWorkers, false) c.Assert(parser.ReadRow(), IsNil) c.Assert(parser.LastRow(), DeepEquals, mydump.Row{ @@ -258,7 +258,7 @@ func (s *testMydumpCSVParserSuite) TestRFC4180(c *C) { // example 2, no trailing new lines - parser = mydump.NewCSVParser(&cfg, mydump.NewStringReader("aaa,bbb,ccc\nzzz,yyy,xxx"), config.ReadBlockSize, s.ioWorkers, false) + parser = mydump.NewCSVParser(&cfg, mydump.NewStringReader("aaa,bbb,ccc\nzzz,yyy,xxx"), int64(config.ReadBlockSize), s.ioWorkers, false) c.Assert(parser.ReadRow(), IsNil) c.Assert(parser.LastRow(), DeepEquals, mydump.Row{ @@ -286,7 +286,7 @@ func (s *testMydumpCSVParserSuite) TestRFC4180(c *C) { // example 5, quoted fields - parser = mydump.NewCSVParser(&cfg, mydump.NewStringReader(`"aaa","bbb","ccc"`+"\nzzz,yyy,xxx"), config.ReadBlockSize, s.ioWorkers, false) + parser = mydump.NewCSVParser(&cfg, mydump.NewStringReader(`"aaa","bbb","ccc"`+"\nzzz,yyy,xxx"), int64(config.ReadBlockSize), s.ioWorkers, false) c.Assert(parser.ReadRow(), IsNil) c.Assert(parser.LastRow(), DeepEquals, mydump.Row{ @@ -316,7 +316,7 @@ func (s *testMydumpCSVParserSuite) TestRFC4180(c *C) { parser = mydump.NewCSVParser(&cfg, mydump.NewStringReader(`"aaa","b bb","ccc" -zzz,yyy,xxx`), config.ReadBlockSize, s.ioWorkers, false) +zzz,yyy,xxx`), int64(config.ReadBlockSize), s.ioWorkers, false) c.Assert(parser.ReadRow(), IsNil) c.Assert(parser.LastRow(), DeepEquals, mydump.Row{ @@ -344,7 +344,7 @@ zzz,yyy,xxx`), config.ReadBlockSize, s.ioWorkers, false) // example 7, quote escaping - parser = mydump.NewCSVParser(&cfg, mydump.NewStringReader(`"aaa","b""bb","ccc"`), config.ReadBlockSize, s.ioWorkers, false) + parser = mydump.NewCSVParser(&cfg, mydump.NewStringReader(`"aaa","b""bb","ccc"`), int64(config.ReadBlockSize), s.ioWorkers, false) c.Assert(parser.ReadRow(), IsNil) c.Assert(parser.LastRow(), DeepEquals, mydump.Row{ @@ -371,7 +371,7 @@ func (s *testMydumpCSVParserSuite) TestMySQL(c *C) { parser := mydump.NewCSVParser(&cfg, mydump.NewStringReader(`"\"","\\","\?" "\ -",\N,\\N`), config.ReadBlockSize, s.ioWorkers, false) +",\N,\\N`), int64(config.ReadBlockSize), s.ioWorkers, false) c.Assert(parser.ReadRow(), IsNil) c.Assert(parser.LastRow(), DeepEquals, mydump.Row{ @@ -417,10 +417,10 @@ func (s *testMydumpCSVParserSuite) TestSyntaxError(c *C) { "\"\x01", } - s.runFailingTestCases(c, &cfg, config.ReadBlockSize, inputs) + s.runFailingTestCases(c, &cfg, int64(config.ReadBlockSize), inputs) cfg.BackslashEscape = false - s.runFailingTestCases(c, &cfg, config.ReadBlockSize, []string{`"\`}) + s.runFailingTestCases(c, &cfg, int64(config.ReadBlockSize), []string{`"\`}) } func (s *testMydumpCSVParserSuite) TestTSV(c *C) { @@ -436,7 +436,7 @@ func (s *testMydumpCSVParserSuite) TestTSV(c *C) { parser := mydump.NewCSVParser(&cfg, mydump.NewStringReader(`a b c d e f 0 foo 0000-00-00 0 foo 0000-00-00 -0 abc def ghi bar 1999-12-31`), config.ReadBlockSize, s.ioWorkers, true) +0 abc def ghi bar 1999-12-31`), int64(config.ReadBlockSize), s.ioWorkers, true) c.Assert(parser.ReadRow(), IsNil) c.Assert(parser.LastRow(), DeepEquals, mydump.Row{ @@ -490,7 +490,7 @@ func (s *testMydumpCSVParserSuite) TestCsvWithWhiteSpaceLine(c *C) { Delimiter: `"`, } data := " \r\n\r\n0,,abc\r\n \r\n123,1999-12-31,test\r\n" - parser := mydump.NewCSVParser(&cfg, mydump.NewStringReader(data), config.ReadBlockSize, s.ioWorkers, false) + parser := mydump.NewCSVParser(&cfg, mydump.NewStringReader(data), int64(config.ReadBlockSize), s.ioWorkers, false) c.Assert(parser.ReadRow(), IsNil) c.Assert(parser.LastRow(), DeepEquals, mydump.Row{ RowID: 1, @@ -515,7 +515,7 @@ func (s *testMydumpCSVParserSuite) TestCsvWithWhiteSpaceLine(c *C) { cfg.Header = true data = " \r\na,b,c\r\n0,,abc\r\n" - parser = mydump.NewCSVParser(&cfg, mydump.NewStringReader(data), config.ReadBlockSize, s.ioWorkers, true) + parser = mydump.NewCSVParser(&cfg, mydump.NewStringReader(data), int64(config.ReadBlockSize), s.ioWorkers, true) c.Assert(parser.ReadRow(), IsNil) c.Assert(parser.Columns(), DeepEquals, []string{"a", "b", "c"}) c.Assert(parser.LastRow(), DeepEquals, mydump.Row{ @@ -537,17 +537,17 @@ func (s *testMydumpCSVParserSuite) TestEmpty(c *C) { Delimiter: `"`, } - parser := mydump.NewCSVParser(&cfg, mydump.NewStringReader(""), config.ReadBlockSize, s.ioWorkers, false) + parser := mydump.NewCSVParser(&cfg, mydump.NewStringReader(""), int64(config.ReadBlockSize), s.ioWorkers, false) c.Assert(errors.Cause(parser.ReadRow()), Equals, io.EOF) // Try again with headers. cfg.Header = true - parser = mydump.NewCSVParser(&cfg, mydump.NewStringReader(""), config.ReadBlockSize, s.ioWorkers, true) + parser = mydump.NewCSVParser(&cfg, mydump.NewStringReader(""), int64(config.ReadBlockSize), s.ioWorkers, true) c.Assert(errors.Cause(parser.ReadRow()), Equals, io.EOF) - parser = mydump.NewCSVParser(&cfg, mydump.NewStringReader("h\n"), config.ReadBlockSize, s.ioWorkers, true) + parser = mydump.NewCSVParser(&cfg, mydump.NewStringReader("h\n"), int64(config.ReadBlockSize), s.ioWorkers, true) c.Assert(errors.Cause(parser.ReadRow()), Equals, io.EOF) } @@ -556,7 +556,7 @@ func (s *testMydumpCSVParserSuite) TestCRLF(c *C) { Separator: ",", Delimiter: `"`, } - parser := mydump.NewCSVParser(&cfg, mydump.NewStringReader("a\rb\r\nc\n\n\n\nd"), config.ReadBlockSize, s.ioWorkers, false) + parser := mydump.NewCSVParser(&cfg, mydump.NewStringReader("a\rb\r\nc\n\n\n\nd"), int64(config.ReadBlockSize), s.ioWorkers, false) c.Assert(parser.ReadRow(), IsNil) c.Assert(parser.LastRow(), DeepEquals, mydump.Row{ @@ -591,7 +591,7 @@ func (s *testMydumpCSVParserSuite) TestQuotedSeparator(c *C) { Delimiter: `"`, } - parser := mydump.NewCSVParser(&cfg, mydump.NewStringReader(`",",','`), config.ReadBlockSize, s.ioWorkers, false) + parser := mydump.NewCSVParser(&cfg, mydump.NewStringReader(`",",','`), int64(config.ReadBlockSize), s.ioWorkers, false) c.Assert(parser.ReadRow(), IsNil) c.Assert(parser.LastRow(), DeepEquals, mydump.Row{ RowID: 1, @@ -624,7 +624,7 @@ func (s *testMydumpCSVParserSuite) TestConsecutiveFields(c *C) { "\"\"\v", } - s.runFailingTestCases(c, &cfg, config.ReadBlockSize, testCases) + s.runFailingTestCases(c, &cfg, int64(config.ReadBlockSize), testCases) } func (s *testMydumpCSVParserSuite) TestSpecialChars(c *C) { @@ -660,7 +660,7 @@ func (s *testMydumpCSVParserSuite) TestSpecialChars(c *C) { }, } - s.runTestCases(c, &cfg, config.ReadBlockSize, testCases) + s.runTestCases(c, &cfg, int64(config.ReadBlockSize), testCases) } func (s *testMydumpCSVParserSuite) TestContinuation(c *C) { @@ -762,7 +762,7 @@ func (s *testMydumpCSVParserSuite) TestReadError(c *C) { Delimiter: `"`, } - parser := mydump.NewCSVParser(&cfg, &errorReader{}, config.ReadBlockSize, s.ioWorkers, false) + parser := mydump.NewCSVParser(&cfg, &errorReader{}, int64(config.ReadBlockSize), s.ioWorkers, false) c.Assert(parser.ReadRow(), ErrorMatches, "fake read error") } diff --git a/lightning/mydump/parser_test.go b/lightning/mydump/parser_test.go index 63f36db15..a0b285a92 100644 --- a/lightning/mydump/parser_test.go +++ b/lightning/mydump/parser_test.go @@ -66,7 +66,7 @@ func (s *testMydumpParserSuite) TestReadRow(c *C) { "insert another_table values (10,11e1,12, '(13)', '(', 14, ')');", ) - parser := mydump.NewChunkParser(mysql.ModeNone, reader, config.ReadBlockSize, s.ioWorkers) + parser := mydump.NewChunkParser(mysql.ModeNone, reader, int64(config.ReadBlockSize), s.ioWorkers) c.Assert(parser.ReadRow(), IsNil) c.Assert(parser.LastRow(), DeepEquals, mydump.Row{ @@ -136,7 +136,7 @@ func (s *testMydumpParserSuite) TestReadChunks(c *C) { INSERT foo VALUES (29,30,31,32),(33,34,35,36); `) - parser := mydump.NewChunkParser(mysql.ModeNone, reader, config.ReadBlockSize, s.ioWorkers) + parser := mydump.NewChunkParser(mysql.ModeNone, reader, int64(config.ReadBlockSize), s.ioWorkers) chunks, err := mydump.ReadChunks(parser, 32) c.Assert(err, IsNil) @@ -182,7 +182,7 @@ func (s *testMydumpParserSuite) TestNestedRow(c *C) { ("789",CONVERT("[]" USING UTF8MB4)); `) - parser := mydump.NewChunkParser(mysql.ModeNone, reader, config.ReadBlockSize, s.ioWorkers) + parser := mydump.NewChunkParser(mysql.ModeNone, reader, int64(config.ReadBlockSize), s.ioWorkers) chunks, err := mydump.ReadChunks(parser, 96) c.Assert(err, IsNil) @@ -349,7 +349,7 @@ func (s *testMydumpParserSuite) TestVariousSyntax(c *C) { }, } - s.runTestCases(c, mysql.ModeNone, config.ReadBlockSize, testCases) + s.runTestCases(c, mysql.ModeNone, int64(config.ReadBlockSize), testCases) } func (s *testMydumpParserSuite) TestContinuation(c *C) { @@ -415,7 +415,7 @@ func (s *testMydumpParserSuite) TestPseudoKeywords(c *C) { ) VALUES (); `) - parser := mydump.NewChunkParser(mysql.ModeNone, reader, config.ReadBlockSize, s.ioWorkers) + parser := mydump.NewChunkParser(mysql.ModeNone, reader, int64(config.ReadBlockSize), s.ioWorkers) c.Assert(parser.ReadRow(), IsNil) c.Assert(parser.Columns(), DeepEquals, []string{ "c", "c", @@ -482,7 +482,7 @@ func (s *testMydumpParserSuite) TestSyntaxError(c *C) { "/* ...", } - s.runFailingTestCases(c, mysql.ModeNone, config.ReadBlockSize, inputs) + s.runFailingTestCases(c, mysql.ModeNone, int64(config.ReadBlockSize), inputs) } // Various syntax error cases collected via fuzzing. diff --git a/lightning/mydump/region.go b/lightning/mydump/region.go index 54736f3e0..d40c213b0 100644 --- a/lightning/mydump/region.go +++ b/lightning/mydump/region.go @@ -160,7 +160,7 @@ func MakeTableRegions( // If a csv file is overlarge, we need to split it into multiple regions. // Note: We can only split a csv file whose format is strict. - if isCsvFile && dataFileSize > cfg.Mydumper.MaxRegionSize && cfg.Mydumper.StrictFormat { + if isCsvFile && dataFileSize > int64(cfg.Mydumper.MaxRegionSize) && cfg.Mydumper.StrictFormat { var ( regions []*TableRegion subFileSizes []float64 @@ -197,7 +197,7 @@ func MakeTableRegions( } log.L().Debug("in makeTableRegions", - zap.Int64("maxRegionSize", cfg.Mydumper.MaxRegionSize), + zap.Int64("maxRegionSize", int64(cfg.Mydumper.MaxRegionSize)), zap.Int("len fileRegions", len(filesRegions))) AllocateEngineIDs(filesRegions, dataFileSizes, float64(cfg.Mydumper.BatchSize), cfg.Mydumper.BatchImportRatio, float64(cfg.App.TableConcurrency)) @@ -257,7 +257,7 @@ func SplitLargeFile( ioWorker *worker.Pool, store storage.ExternalStorage, ) (prevRowIdMax int64, regions []*TableRegion, dataFileSizes []float64, err error) { - maxRegionSize := cfg.Mydumper.MaxRegionSize + maxRegionSize := int64(cfg.Mydumper.MaxRegionSize) dataFileSizes = make([]float64, 0, dataFile.Size/maxRegionSize+1) startOffset, endOffset := int64(0), maxRegionSize var columns []string @@ -266,7 +266,7 @@ func SplitLargeFile( if err != nil { return 0, nil, nil, err } - parser := NewCSVParser(&cfg.Mydumper.CSV, r, cfg.Mydumper.ReadBlockSize, ioWorker, true) + parser := NewCSVParser(&cfg.Mydumper.CSV, r, int64(cfg.Mydumper.ReadBlockSize), ioWorker, true) if err = parser.ReadColumns(); err != nil { return 0, nil, nil, err } @@ -282,7 +282,7 @@ func SplitLargeFile( if err != nil { return 0, nil, nil, err } - parser := NewCSVParser(&cfg.Mydumper.CSV, r, cfg.Mydumper.ReadBlockSize, ioWorker, false) + parser := NewCSVParser(&cfg.Mydumper.CSV, r, int64(cfg.Mydumper.ReadBlockSize), ioWorker, false) if err = parser.SetPos(endOffset, prevRowIdMax); err != nil { return 0, nil, nil, err } diff --git a/lightning/mydump/region_test.go b/lightning/mydump/region_test.go index bfb41beaa..4abe0ba1c 100644 --- a/lightning/mydump/region_test.go +++ b/lightning/mydump/region_test.go @@ -216,7 +216,7 @@ func (s *testMydumpRegionSuite) TestSplitLargeFile(c *C) { colCnt := int64(3) columns := []string{"a", "b", "c"} for _, tc := range []struct { - maxRegionSize int64 + maxRegionSize config.ByteSize chkCnt int offsets [][]int64 }{ diff --git a/lightning/restore/checksum_test.go b/lightning/restore/checksum_test.go index c3b78cfb9..cab23d774 100644 --- a/lightning/restore/checksum_test.go +++ b/lightning/restore/checksum_test.go @@ -258,7 +258,7 @@ func (c *testPDClient) UpdateServiceGCSafePoint(ctx context.Context, serviceID s func (s *checksumSuite) TestGcTTLManagerSingle(c *C) { pdClient := &testPDClient{} manager := newGCTTLManager(pdClient) - c.Assert(manager.serviceID != "", IsTrue) + c.Assert(manager.serviceID, Not(Equals), "") ctx, cancel := context.WithCancel(context.Background()) defer cancel() oldTTL := serviceSafePointTTL @@ -271,16 +271,17 @@ func (s *checksumSuite) TestGcTTLManagerSingle(c *C) { err := manager.addOneJob(ctx, "test", uint64(time.Now().Unix())) c.Assert(err, IsNil) - time.Sleep(6 * time.Second) + time.Sleep(6*time.Second + 10*time.Millisecond) - // after 11 seconds, must at least update 5 times + // after 6 seconds, must at least update 5 times val := atomic.LoadInt32(&pdClient.count) - c.Assert(val >= 5, IsTrue) + c.Assert(val, GreaterEqual, int32(5)) // after remove the job, there are no job remain, gc ttl needn't to be updated manager.removeOneJob("test") + time.Sleep(10 * time.Millisecond) val = atomic.LoadInt32(&pdClient.count) - time.Sleep(3 * time.Second) + time.Sleep(3*time.Second + 10*time.Millisecond) c.Assert(atomic.LoadInt32(&pdClient.count), Equals, val) } diff --git a/lightning/restore/restore.go b/lightning/restore/restore.go index 22926102b..c890434b0 100644 --- a/lightning/restore/restore.go +++ b/lightning/restore/restore.go @@ -204,7 +204,7 @@ func NewRestoreControllerWithPauser( case config.BackendTiDB: backend = kv.NewTiDBBackend(tidbMgr.db, cfg.TikvImporter.OnDuplicate) case config.BackendLocal: - backend, err = kv.NewLocalBackend(ctx, tls, cfg.TiDB.PdAddr, cfg.TikvImporter.RegionSplitSize, + backend, err = kv.NewLocalBackend(ctx, tls, cfg.TiDB.PdAddr, int64(cfg.TikvImporter.RegionSplitSize), cfg.TikvImporter.SortedKVDir, cfg.TikvImporter.RangeConcurrency, cfg.TikvImporter.SendKVPairs, cfg.Checkpoint.Enable) if err != nil { @@ -451,7 +451,7 @@ func (rc *RestoreController) estimateChunkCountIntoMetrics(ctx context.Context) } if fileMeta.FileMeta.Type == mydump.SourceTypeCSV { cfg := rc.cfg.Mydumper - if fileMeta.Size > cfg.MaxRegionSize && cfg.StrictFormat && !cfg.CSV.Header { + if fileMeta.Size > int64(cfg.MaxRegionSize) && cfg.StrictFormat && !cfg.CSV.Header { estimatedChunkCount += math.Round(float64(fileMeta.Size) / float64(cfg.MaxRegionSize)) } else { estimatedChunkCount += 1 @@ -1366,7 +1366,7 @@ func newChunkRestore( store storage.ExternalStorage, tableInfo *TidbTableInfo, ) (*chunkRestore, error) { - blockBufSize := cfg.Mydumper.ReadBlockSize + blockBufSize := int64(cfg.Mydumper.ReadBlockSize) reader, err := store.Open(ctx, chunk.Key.Path) if err != nil { diff --git a/tidb-lightning.toml b/tidb-lightning.toml index 2150581bc..12067df66 100644 --- a/tidb-lightning.toml +++ b/tidb-lightning.toml @@ -83,7 +83,7 @@ addr = "127.0.0.1:8287" #on-duplicate = "replace" # Maximum KV size of SST files produced in the 'local' backend. This should be the same as # the TiKV region size to avoid further region splitting. The default value is 96 MiB. -#region-split-size = 100_663_296 +#region-split-size = '96MiB' # write key-values pairs to tikv batch size #send-kv-pairs = 32768 # local storage directory used in "local" backend. @@ -95,10 +95,10 @@ addr = "127.0.0.1:8287" [mydumper] # block size of file reading -read-block-size = 65536 # Byte (default = 64 KB) +read-block-size = '64KiB' # minimum size (in terms of source data file) of each batch of import. # Lightning will split a large table into multiple engine files according to this size. -#batch-size = 107_374_182_400 # Byte (default = 100 GiB) +#batch-size = '100GiB' # Engine file needs to be imported sequentially. Due to table-concurrency, multiple engines will be # imported nearly the same time, and this will create a queue and this wastes resources. Therefore, @@ -131,7 +131,7 @@ case-sensitive = false strict-format = false # if strict-format is true, large CSV files will be split to multiple chunks, which Lightning # will restore in parallel. The size of each chunk is `max-region-size`, where the default is 256 MiB. -#max-region-size = 268_435_456 +#max-region-size = '256MiB' # enable file router to use the default rules. By default, it will be set to true if no `mydumper.files` # rule is provided, else false. You can explicitly set it to `true` to enable the default rules, they will