Skip to content
This repository was archived by the owner on Aug 21, 2023. It is now read-only.

Commit cf05a05

Browse files
tangentakennytm
authored andcommitted
*: add integration test for dumping in given file size (#26)
* *: add integration test for dumping in given file size * log: use zap.Error instead of zap.String('err', xx) * *: fix style problems
1 parent f9d9f7d commit cf05a05

File tree

8 files changed

+295
-85
lines changed

8 files changed

+295
-85
lines changed

cmd/dumpling/main.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ func main() {
100100

101101
err = export.Dump(conf)
102102
if err != nil {
103-
log.Zap().Error("dump failed", zap.String("error", err.Error()))
103+
log.Zap().Error("dump failed", zap.Error(err))
104104
os.Exit(1)
105105
}
106106
return

tests/file_size/run.sh

+32
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
#!/bin/sh
2+
3+
set -eu
4+
5+
run_sql "drop database if exists file_size"
6+
run_sql "create database file_size"
7+
export DUMPLING_TEST_DATABASE=file_size
8+
run_sql "create table t (a varchar(255))"
9+
10+
chars_20="1111_0000_1111_0000_"
11+
12+
# insert 100 records, each occupies 20 bytes
13+
i=0; while [ $i -lt 100 ]; do
14+
run_sql "insert into t values (\"$chars_20\")"
15+
i=$(( i + 1 ))
16+
done
17+
18+
# dumping with file size = 200 bytes
19+
run_dumpling -filesize 200
20+
21+
# the dumping result is expected to be:
22+
# 10 files for insertion(each conatins 10 records / 200 bytes)
23+
file_num=$(find "$DUMPLING_OUTPUT_DIR" -maxdepth 1 -iname "file_size.t.*.sql" | wc -l)
24+
if [ "$file_num" -ne 10 ]; then
25+
echo "obtain file number: $file_num, but expect: 10" && exit 1
26+
fi
27+
28+
total_lines=$(find "$DUMPLING_OUTPUT_DIR" -maxdepth 1 -iname "file_size.t.*.sql" -print0 \
29+
| xargs -0 cat | grep "$chars_20" -c)
30+
if [ ! "$total_lines" = 100 ]; then
31+
echo "obtain record number: $total_lines, but expect: 100" && exit 1
32+
fi

v4/export/consistency_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ func (s *testConsistencySuite) TestConsistencyController(c *C) {
6868
"db1": {"t1", "t2", "t3"},
6969
"db2": {"t4"},
7070
}
71-
for i := 0; i < 4; i += 1 {
71+
for i := 0; i < 4; i++ {
7272
mock.ExpectExec("LOCK TABLES").WillReturnResult(resultOk)
7373
}
7474
mock.ExpectExec("UNLOCK TABLES").WillReturnResult(resultOk)

v4/export/ir_impl.go

+6-2
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ func (s *sizedRowIter) Next(row RowReceiver) error {
4848
}
4949

5050
func (s *sizedRowIter) HasNext() bool {
51-
if s.currentSize > s.sizeLimit {
51+
if s.currentSize >= s.sizeLimit {
5252
return false
5353
}
5454
return s.rowIter.HasNext()
@@ -117,12 +117,16 @@ func (td *tableData) SpecialComments() StringIter {
117117

118118
type tableDataChunks struct {
119119
TableDataIR
120+
rows SQLRowIter
120121
sizeLimit uint64
121122
}
122123

123124
func (t *tableDataChunks) Rows() SQLRowIter {
125+
if t.rows == nil {
126+
t.rows = t.TableDataIR.Rows()
127+
}
124128
return &sizedRowIter{
125-
rowIter: t.Rows(),
129+
rowIter: t.rows,
126130
sizeLimit: t.sizeLimit,
127131
}
128132
}

v4/export/ir_impl_test.go

+52-9
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package export
22

33
import (
4+
"strings"
5+
46
"github.com/DATA-DOG/go-sqlmock"
57
. "github.com/pingcap/check"
68
)
@@ -10,15 +12,25 @@ var _ = Suite(&testIRImplSuite{})
1012
type testIRImplSuite struct{}
1113

1214
type simpleRowReceiver struct {
13-
data string
15+
data []string
16+
}
17+
18+
func newSimpleRowReceiver(length int) *simpleRowReceiver {
19+
return &simpleRowReceiver{data: make([]string, length)}
1420
}
1521

16-
func (s *simpleRowReceiver) BindAddress(arg []interface{}) {
17-
arg[0] = &s.data
22+
func (s *simpleRowReceiver) BindAddress(args []interface{}) {
23+
for i := range args {
24+
args[i] = &s.data[i]
25+
}
1826
}
1927

2028
func (s *simpleRowReceiver) ReportSize() uint64 {
21-
panic("not implement")
29+
var sum uint64
30+
for _, datum := range s.data {
31+
sum += uint64(len(datum))
32+
}
33+
return sum
2234
}
2335

2436
func (s *testIRImplSuite) TestRowIter(c *C) {
@@ -35,18 +47,49 @@ func (s *testIRImplSuite) TestRowIter(c *C) {
3547
c.Assert(err, IsNil)
3648

3749
iter := newRowIter(rows, 1)
38-
for i := 0; i < 100; i += 1 {
50+
for i := 0; i < 100; i++ {
3951
c.Assert(iter.HasNext(), IsTrue)
4052
}
41-
res := &simpleRowReceiver{}
53+
res := newSimpleRowReceiver(1)
4254
c.Assert(iter.Next(res), IsNil)
43-
c.Assert(res.data, Equals, "1")
55+
c.Assert(res.data, DeepEquals, []string{"1"})
4456
c.Assert(iter.HasNext(), IsTrue)
4557
c.Assert(iter.HasNext(), IsTrue)
4658
c.Assert(iter.Next(res), IsNil)
47-
c.Assert(res.data, Equals, "2")
59+
c.Assert(res.data, DeepEquals, []string{"2"})
4860
c.Assert(iter.HasNext(), IsTrue)
4961
c.Assert(iter.Next(res), IsNil)
50-
c.Assert(res.data, Equals, "3")
62+
c.Assert(res.data, DeepEquals, []string{"3"})
5163
c.Assert(iter.HasNext(), IsFalse)
5264
}
65+
66+
func (s *testIRImplSuite) TestSizedRowIter(c *C) {
67+
db, mock, err := sqlmock.New()
68+
c.Assert(err, IsNil)
69+
defer db.Close()
70+
71+
twentyBytes := strings.Repeat("x", 20)
72+
thirtyBytes := strings.Repeat("x", 30)
73+
expectedRows := mock.NewRows([]string{"a", "b"})
74+
for i := 0; i < 10; i++ {
75+
expectedRows.AddRow(twentyBytes, thirtyBytes)
76+
}
77+
mock.ExpectQuery("SELECT a, b FROM t").WillReturnRows(expectedRows)
78+
rows, err := db.Query("SELECT a, b FROM t")
79+
c.Assert(err, IsNil)
80+
81+
sizedRowIter := sizedRowIter{
82+
rowIter: newRowIter(rows, 2),
83+
sizeLimit: 200,
84+
}
85+
res := newSimpleRowReceiver(2)
86+
for i := 0; i < 200/50; i++ {
87+
c.Assert(sizedRowIter.HasNext(), IsTrue)
88+
err := sizedRowIter.Next(res)
89+
c.Assert(err, IsNil)
90+
}
91+
c.Assert(sizedRowIter.HasNext(), IsFalse)
92+
c.Assert(sizedRowIter.HasNext(), IsFalse)
93+
rows.Close()
94+
c.Assert(sizedRowIter.Next(res), NotNil)
95+
}

v4/export/prepare_test.go

+71
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
package export
2+
3+
import (
4+
"fmt"
5+
6+
"github.com/DATA-DOG/go-sqlmock"
7+
. "github.com/pingcap/check"
8+
)
9+
10+
var _ = Suite(&testPrepareSuite{})
11+
12+
type testPrepareSuite struct{}
13+
14+
func (s *testPrepareSuite) TestPrepareDumpingDatabases(c *C) {
15+
db, mock, err := sqlmock.New()
16+
c.Assert(err, IsNil)
17+
defer db.Close()
18+
19+
conf := DefaultConfig()
20+
conf.Database = "db1,db2,db3"
21+
result, err := prepareDumpingDatabases(conf, db)
22+
c.Assert(err, IsNil)
23+
c.Assert(result, DeepEquals, []string{"db1", "db2", "db3"})
24+
25+
conf.Database = ""
26+
rows := sqlmock.NewRows([]string{"Database"}).
27+
AddRow("db1").
28+
AddRow("db2")
29+
mock.ExpectQuery("SHOW DATABASES").WillReturnRows(rows)
30+
result, err = prepareDumpingDatabases(conf, db)
31+
c.Assert(err, IsNil)
32+
c.Assert(result, DeepEquals, []string{"db1", "db2"})
33+
34+
mock.ExpectQuery("SHOW DATABASES").WillReturnError(fmt.Errorf("err"))
35+
_, err = prepareDumpingDatabases(conf, db)
36+
c.Assert(err, NotNil)
37+
c.Assert(mock.ExpectationsWereMet(), IsNil)
38+
}
39+
40+
func (s *testPrepareSuite) TestListAllTables(c *C) {
41+
db, mock, err := sqlmock.New()
42+
c.Assert(err, IsNil)
43+
defer db.Close()
44+
45+
resultOk := sqlmock.NewResult(0, 0)
46+
data := map[databaseName][]tableName{
47+
"db1": {"t1", "t2"},
48+
"db2": {"t3", "t4", "t5"},
49+
}
50+
51+
var dbNames []databaseName
52+
for dbName, tbNames := range data {
53+
dbNames = append(dbNames, dbName)
54+
mock.ExpectExec("USE .").WillReturnResult(resultOk)
55+
rows := sqlmock.NewRows([]string{"Tables_in_xx"})
56+
for _, name := range tbNames {
57+
rows.AddRow(name)
58+
}
59+
mock.ExpectQuery("SHOW TABLES").WillReturnRows(rows)
60+
}
61+
62+
tables, err := listAllTables(db, dbNames)
63+
c.Assert(err, IsNil)
64+
65+
for d, t := range tables {
66+
expectedTbs, ok := data[d]
67+
c.Assert(ok, IsTrue)
68+
c.Assert(t, DeepEquals, expectedTbs)
69+
}
70+
c.Assert(mock.ExpectationsWereMet(), IsNil)
71+
}

v4/export/writer.go

+36-68
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,11 @@ package export
33
import (
44
"context"
55
"fmt"
6-
"io"
76
"os"
87
"path"
9-
"sync"
8+
9+
"github.com/pingcap/dumpling/v4/log"
10+
"go.uber.org/zap"
1011
)
1112

1213
type Writer interface {
@@ -25,91 +26,58 @@ func NewSimpleWriter(config *Config) (Writer, error) {
2526
}
2627

2728
func (f *SimpleWriter) WriteDatabaseMeta(ctx context.Context, db, createSQL string) error {
28-
fileName := path.Join(f.cfg.OutputDirPath, fmt.Sprintf("%s-schema-create.sql", db))
29-
fsStringWriter := NewFileSystemStringWriter(fileName, false)
30-
meta := &metaData{
31-
target: db,
32-
metaSQL: createSQL,
33-
}
34-
return WriteMeta(meta, fsStringWriter)
29+
fileName := fmt.Sprintf("%s-schema-create.sql", db)
30+
filePath := path.Join(f.cfg.OutputDirPath, fileName)
31+
return writeMetaToFile(db, createSQL, filePath)
3532
}
3633

3734
func (f *SimpleWriter) WriteTableMeta(ctx context.Context, db, table, createSQL string) error {
38-
fileName := path.Join(f.cfg.OutputDirPath, fmt.Sprintf("%s.%s-schema.sql", db, table))
39-
fsStringWriter := NewFileSystemStringWriter(fileName, false)
40-
meta := &metaData{
41-
target: table,
42-
metaSQL: createSQL,
43-
}
44-
return WriteMeta(meta, fsStringWriter)
35+
fileName := fmt.Sprintf("%s.%s-schema.sql", db, table)
36+
filePath := path.Join(f.cfg.OutputDirPath, fileName)
37+
return writeMetaToFile(db, createSQL, filePath)
4538
}
4639

4740
func (f *SimpleWriter) WriteTableData(ctx context.Context, ir TableDataIR) error {
41+
log.Zap().Debug("start dumping table...", zap.String("table", ir.TableName()))
4842
if f.cfg.FileSize == UnspecifiedSize {
4943
fileName := path.Join(f.cfg.OutputDirPath, fmt.Sprintf("%s.%s.sql", ir.DatabaseName(), ir.TableName()))
50-
fsStringWriter := NewFileSystemStringWriter(fileName, true)
51-
return WriteInsert(ir, fsStringWriter)
44+
fileWriter, tearDown := buildLazyFileWriter(fileName)
45+
defer tearDown()
46+
return WriteInsert(ir, fileWriter)
5247
}
5348

54-
ir = splitTableDataIntoChunks(ir, f.cfg.FileSize)
55-
for chunkCount := 0; ; /* loop */ chunkCount += 1 {
56-
fileName := path.Join(f.cfg.OutputDirPath, fmt.Sprintf("%s.%s.%3d.sql", ir.DatabaseName(), ir.TableName(), chunkCount))
57-
fsStringWriter := newInterceptStringWriter(NewFileSystemStringWriter(fileName, true))
58-
err := WriteInsert(ir, fsStringWriter)
49+
chunks := splitTableDataIntoChunks(ir, f.cfg.FileSize)
50+
chunkCount := 0
51+
for {
52+
fileName := fmt.Sprintf("%s.%s.%d.sql", ir.DatabaseName(), ir.TableName(), chunkCount)
53+
filePath := path.Join(f.cfg.OutputDirPath, fileName)
54+
fileWriter, tearDown := buildLazyFileWriter(filePath)
55+
intWriter := &InterceptStringWriter{StringWriter: fileWriter}
56+
err := WriteInsert(chunks, intWriter)
57+
tearDown()
5958
if err != nil {
6059
return err
6160
}
62-
if fsStringWriter.writeNothingYet {
61+
62+
if !intWriter.SomethingIsWritten {
6363
break
6464
}
65+
chunkCount += 1
6566
}
67+
log.Zap().Debug("dumping table successfully",
68+
zap.String("table", ir.TableName()))
6669
return nil
6770
}
6871

69-
type FileSystemStringWriter struct {
70-
path string
71-
72-
file *os.File
73-
once sync.Once
74-
err error
75-
}
76-
77-
func (w *FileSystemStringWriter) initFileHandle() {
78-
w.file, w.err = os.OpenFile(w.path, os.O_CREATE|os.O_WRONLY, 0755)
79-
}
80-
81-
func (w *FileSystemStringWriter) WriteString(str string) (int, error) {
82-
if w.err != nil {
83-
return 0, w.err
72+
func writeMetaToFile(target, metaSQL, path string) error {
73+
fileWriter, tearDown, err := buildFileWriter(path)
74+
if err != nil {
75+
return err
8476
}
85-
w.once.Do(w.initFileHandle)
86-
return w.file.WriteString(str)
87-
}
77+
defer tearDown()
8878

89-
func NewFileSystemStringWriter(path string, lazyHandleCreation bool) *FileSystemStringWriter {
90-
w := &FileSystemStringWriter{path: path}
91-
if !lazyHandleCreation {
92-
w.once.Do(w.initFileHandle)
93-
}
94-
return w
95-
}
96-
97-
type interceptStringWriter struct {
98-
sw io.StringWriter
99-
writeNothingYet bool
100-
}
101-
102-
func (i *interceptStringWriter) WriteString(str string) (int, error) {
103-
writtenBytes, err := i.sw.WriteString(str)
104-
if writtenBytes != 0 {
105-
i.writeNothingYet = false
106-
}
107-
return writtenBytes, err
108-
}
109-
110-
func newInterceptStringWriter(sw io.StringWriter) *interceptStringWriter {
111-
return &interceptStringWriter{
112-
sw: sw,
113-
writeNothingYet: true,
114-
}
79+
return WriteMeta(&metaData{
80+
target: target,
81+
metaSQL: metaSQL,
82+
}, fileWriter)
11583
}

0 commit comments

Comments
 (0)