Skip to content

Commit

Permalink
Unify --filesize and --statement-size definition with mydumper's (p…
Browse files Browse the repository at this point in the history
…ingcap#142)

* reduce bound address times to save more time

* tmp

* unify file size

* fix ut

* fix bug that escapeBackSlash not used for rows arguments

* tmp

* move filesize and statementsize argument to writepipe

* fix format

* address comment
  • Loading branch information
lichunzhu authored Sep 8, 2020
1 parent 6393489 commit cf4aa4d
Show file tree
Hide file tree
Showing 11 changed files with 168 additions and 223 deletions.
4 changes: 2 additions & 2 deletions dumpling/tests/file_size/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ chars_20="1111_0000_1111_0000_"
# insert 100 records, each occupies 20 bytes
run_sql "insert into t values $(seq -s, 100 | sed 's/,*$//g' | sed "s/[0-9]*/('$chars_20')/g");"

# dumping with file size = 200 bytes
run_dumpling -F 200B
# dumping with file size = 311 bytes, actually 10 rows
run_dumpling -F 311B

# the dumping result is expected to be:
# 10 files for insertion(each conatins 10 records / 200 bytes)
Expand Down
9 changes: 4 additions & 5 deletions dumpling/tests/no_table_and_db_name/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,16 @@ chars_20="1111_0000_1111_0000_"
# insert 100 records, each occupies 20 bytes
run_sql "insert into t values $(seq -s, 100 | sed 's/,*$//g' | sed "s/[0-9]*/('$chars_20')/g");"

# dumping with file size = 200 bytes
run_dumpling -F 200B --filetype csv --sql "select * from $TEST_NAME.t"
# dumping with file size = 233 bytes, actually 10 rows
run_dumpling -F 233B --filetype csv --sql "select * from $TEST_NAME.t"

assert [ $( ls -lh $DUMPLING_OUTPUT_DIR | grep -e ".csv$" | wc -l ) -eq 10 ]

# 10 files with header.
assert [ $( cat $DUMPLING_OUTPUT_DIR/*.csv | wc -l ) -eq $(( 100 + 10 )) ]


# dumping with file size = 200 bytes
run_dumpling -F 200B --filetype sql --sql "select * from $TEST_NAME.t"
# dumping with file size = 311 bytes, actually 10 rows
run_dumpling -F 311B --filetype sql --sql "select * from $TEST_NAME.t"

assert [ $( ls -lh $DUMPLING_OUTPUT_DIR | grep -e ".sql$" | wc -l ) -eq 10 ]

Expand Down
3 changes: 0 additions & 3 deletions dumpling/v4/export/ir.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ type SQLRowIter interface {
Next()
Error() error
HasNext() bool
HasNextSQLRowIter() bool
NextSQLRowIter() SQLRowIter
// release SQLRowIter
Close() error
}
Expand All @@ -46,7 +44,6 @@ type Stringer interface {

type RowReceiver interface {
BindAddress([]interface{})
ReportSize() uint64
}

func decodeFromRows(rows *sql.Rows, args []interface{}, row RowReceiver) error {
Expand Down
105 changes: 5 additions & 100 deletions dumpling/v4/export/ir_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,74 +50,6 @@ func (iter *rowIter) HasNext() bool {
return iter.hasNext
}

func (iter *rowIter) HasNextSQLRowIter() bool {
return iter.hasNext
}

func (iter *rowIter) NextSQLRowIter() SQLRowIter {
return iter
}

type fileRowIter struct {
rowIter SQLRowIter
fileSizeLimit uint64
statementSizeLimit uint64

currentStatementSize uint64
currentFileSize uint64
}

func (c *fileRowIter) Close() error {
return c.rowIter.Close()
}

func (c *fileRowIter) Decode(row RowReceiver) error {
err := c.rowIter.Decode(row)
if err != nil {
return err
}
size := row.ReportSize()
c.currentFileSize += size
c.currentStatementSize += size
return nil
}

func (c *fileRowIter) Error() error {
return c.rowIter.Error()
}

func (c *fileRowIter) Next() {
c.rowIter.Next()
}

func (c *fileRowIter) HasNext() bool {
if c.fileSizeLimit != UnspecifiedSize && c.currentFileSize >= c.fileSizeLimit {
return false
}

if c.statementSizeLimit != UnspecifiedSize && c.currentStatementSize >= c.statementSizeLimit {
return false
}
return c.rowIter.HasNext()
}

func (c *fileRowIter) HasNextSQLRowIter() bool {
if c.fileSizeLimit != UnspecifiedSize && c.currentFileSize >= c.fileSizeLimit {
return false
}
return c.rowIter.HasNext()
}

func (c *fileRowIter) NextSQLRowIter() SQLRowIter {
return &fileRowIter{
rowIter: c.rowIter,
fileSizeLimit: c.fileSizeLimit,
statementSizeLimit: c.statementSizeLimit,
currentFileSize: c.currentFileSize,
currentStatementSize: 0,
}
}

type stringIter struct {
idx int
ss []string
Expand Down Expand Up @@ -153,6 +85,7 @@ type tableData struct {
selectedField string
specCmts []string
escapeBackslash bool
SQLRowIter
}

func (td *tableData) Start(ctx context.Context, conn *sql.Conn) error {
Expand Down Expand Up @@ -197,7 +130,10 @@ func (td *tableData) ColumnCount() uint {
}

func (td *tableData) Rows() SQLRowIter {
return newRowIter(td.rows, len(td.colTypes))
if td.SQLRowIter == nil {
td.SQLRowIter = newRowIter(td.rows, len(td.colTypes))
}
return td.SQLRowIter
}

func (td *tableData) SelectedField() string {
Expand All @@ -215,37 +151,6 @@ func (td *tableData) EscapeBackSlash() bool {
return td.escapeBackslash
}

type tableDataChunks struct {
TableDataIR
rows SQLRowIter
chunkSizeLimit uint64
statementSizeLimit uint64
}

func (t *tableDataChunks) Rows() SQLRowIter {
if t.rows == nil {
t.rows = t.TableDataIR.Rows()
}

return &fileRowIter{
rowIter: t.rows,
statementSizeLimit: t.statementSizeLimit,
fileSizeLimit: t.chunkSizeLimit,
}
}

func (t *tableDataChunks) EscapeBackSlash() bool {
return t.TableDataIR.EscapeBackSlash()
}

func buildChunksIter(td TableDataIR, chunkSize uint64, statementSize uint64) *tableDataChunks {
return &tableDataChunks{
TableDataIR: td,
chunkSizeLimit: chunkSize,
statementSizeLimit: statementSize,
}
}

func splitTableDataIntoChunks(
ctx context.Context,
tableDataIRCh chan TableDataIR,
Expand Down
37 changes: 16 additions & 21 deletions dumpling/v4/export/ir_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,6 @@ func (s *simpleRowReceiver) BindAddress(args []interface{}) {
}
}

func (s *simpleRowReceiver) ReportSize() uint64 {
var sum uint64
for _, datum := range s.data {
sum += uint64(len(datum))
}
return sum
}

func (s *testIRImplSuite) TestRowIter(c *C) {
db, mock, err := sqlmock.New()
c.Assert(err, IsNil)
Expand Down Expand Up @@ -93,30 +85,33 @@ func (s *testIRImplSuite) TestChunkRowIter(c *C) {
}
)

sqlRowIter := SQLRowIter(&fileRowIter{
rowIter: newRowIter(rows, 2),
fileSizeLimit: testFileSize,
statementSizeLimit: testStatementSize,
})
sqlRowIter := newRowIter(rows, 2)

res := newSimpleRowReceiver(2)
wp := newWriterPipe(nil, testFileSize, testStatementSize)

var resSize [][]uint64
for sqlRowIter.HasNextSQLRowIter() {
sqlRowIter = sqlRowIter.NextSQLRowIter()
fileRowIter, ok := sqlRowIter.(*fileRowIter)
c.Assert(ok, IsTrue)

for sqlRowIter.HasNext() {
wp.currentStatementSize = 0
for sqlRowIter.HasNext() {
c.Assert(sqlRowIter.Decode(res), IsNil)
sz := uint64(len(res.data[0]) + len(res.data[1]))
wp.AddFileSize(sz)
sqlRowIter.Next()
resSize = append(resSize, []uint64{fileRowIter.currentFileSize, fileRowIter.currentStatementSize})
resSize = append(resSize, []uint64{wp.currentFileSize, wp.currentStatementSize})
if wp.ShouldSwitchStatement() {
break
}
}
if wp.ShouldSwitchFile() {
break
}
}

c.Assert(resSize, DeepEquals, expectedSize)
c.Assert(sqlRowIter.HasNextSQLRowIter(), IsFalse)
c.Assert(sqlRowIter.HasNext(), IsFalse)
c.Assert(sqlRowIter.HasNext(), IsTrue)
c.Assert(wp.ShouldSwitchFile(), IsTrue)
c.Assert(wp.ShouldSwitchStatement(), IsTrue)
rows.Close()
c.Assert(sqlRowIter.Decode(res), NotNil)
sqlRowIter.Next()
Expand Down
42 changes: 18 additions & 24 deletions dumpling/v4/export/sql_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,47 +116,50 @@ func SQLTypeNumberMaker() RowReceiverStringer {
}

func MakeRowReceiver(colTypes []string) RowReceiverStringer {
rowReceiverArr := make(RowReceiverArr, len(colTypes))
rowReceiverArr := make([]RowReceiverStringer, len(colTypes))
for i, colTp := range colTypes {
recMaker, ok := colTypeRowReceiverMap[colTp]
if !ok {
recMaker = SQLTypeStringMaker
}
rowReceiverArr[i] = recMaker()
}
return rowReceiverArr
return RowReceiverArr{
bound: false,
receivers: rowReceiverArr,
}
}

type RowReceiverArr []RowReceiverStringer
type RowReceiverArr struct {
bound bool
receivers []RowReceiverStringer
}

func (r RowReceiverArr) BindAddress(args []interface{}) {
for i := range args {
r[i].BindAddress(args[i : i+1])
if r.bound {
return
}
}
func (r RowReceiverArr) ReportSize() uint64 {
var sum uint64
for _, receiver := range r {
sum += receiver.ReportSize()
r.bound = true
for i := range args {
r.receivers[i].BindAddress(args[i : i+1])
}
return sum
}

func (r RowReceiverArr) WriteToBuffer(bf *bytes.Buffer, escapeBackslash bool) {
bf.WriteByte('(')
for i, receiver := range r {
for i, receiver := range r.receivers {
receiver.WriteToBuffer(bf, escapeBackslash)
if i != len(r)-1 {
if i != len(r.receivers)-1 {
bf.WriteByte(',')
}
}
bf.WriteByte(')')
}

func (r RowReceiverArr) WriteToBufferInCsv(bf *bytes.Buffer, escapeBackslash bool, opt *csvOption) {
for i, receiver := range r {
for i, receiver := range r.receivers {
receiver.WriteToBufferInCsv(bf, escapeBackslash, opt)
if i != len(r)-1 {
if i != len(r.receivers)-1 {
bf.Write(opt.separator)
}
}
Expand Down Expand Up @@ -189,12 +192,6 @@ type SQLTypeString struct {
func (s *SQLTypeString) BindAddress(arg []interface{}) {
arg[0] = &s.RawBytes
}
func (s *SQLTypeString) ReportSize() uint64 {
if s.RawBytes != nil {
return uint64(len(s.RawBytes))
}
return uint64(len(nullValue))
}

func (s *SQLTypeString) WriteToBuffer(bf *bytes.Buffer, escapeBackslash bool) {
if s.RawBytes != nil {
Expand Down Expand Up @@ -223,9 +220,6 @@ type SQLTypeBytes struct {
func (s *SQLTypeBytes) BindAddress(arg []interface{}) {
arg[0] = &s.RawBytes
}
func (s *SQLTypeBytes) ReportSize() uint64 {
return uint64(len(s.RawBytes))
}

func (s *SQLTypeBytes) WriteToBuffer(bf *bytes.Buffer, _ bool) {
if s.RawBytes != nil {
Expand Down
40 changes: 22 additions & 18 deletions dumpling/v4/export/test_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ type mockTableIR struct {
colNames []string
escapeBackSlash bool
rowErr error
SQLRowIter
}

func (m *mockTableIR) Start(ctx context.Context, conn *sql.Conn) error {
Expand Down Expand Up @@ -128,25 +129,27 @@ func (m *mockTableIR) SpecialComments() StringIter {
}

func (m *mockTableIR) Rows() SQLRowIter {
mockRows := sqlmock.NewRows(m.colTypes)
for _, datum := range m.data {
mockRows.AddRow(datum...)
if m.SQLRowIter == nil {
mockRows := sqlmock.NewRows(m.colTypes)
for _, datum := range m.data {
mockRows.AddRow(datum...)
}
db, mock, err := sqlmock.New()
if err != nil {
panic(fmt.Sprintf("sqlmock.New return error: %v", err))
}
defer db.Close()
mock.ExpectQuery("select 1").WillReturnRows(mockRows)
if m.rowErr != nil {
mockRows.RowError(len(m.data)-1, m.rowErr)
}
rows, err := db.Query("select 1")
if err != nil {
panic(fmt.Sprintf("sqlmock.New return error: %v", err))
}
m.SQLRowIter = newRowIter(rows, len(m.colTypes))
}
db, mock, err := sqlmock.New()
if err != nil {
panic(fmt.Sprintf("sqlmock.New return error: %v", err))
}
defer db.Close()
mock.ExpectQuery("select 1").WillReturnRows(mockRows)
if m.rowErr != nil {
mockRows.RowError(len(m.data)-1, m.rowErr)
}
rows, err := db.Query("select 1")
if err != nil {
panic(fmt.Sprintf("sqlmock.New return error: %v", err))
}

return newRowIter(rows, len(m.colTypes))
return m.SQLRowIter
}

func (m *mockTableIR) EscapeBackSlash() bool {
Expand All @@ -161,5 +164,6 @@ func newMockTableIR(databaseName, tableName string, data [][]driver.Value, speci
specCmt: specialComments,
selectedField: "*",
colTypes: colTypes,
SQLRowIter: nil,
}
}
Loading

0 comments on commit cf4aa4d

Please sign in to comment.