Skip to content

Commit

Permalink
cherry pick batch update optimization (#8069) (#8191) (#8452) (#8727)
Browse files Browse the repository at this point in the history
ref #8084
  • Loading branch information
amyangfei authored Apr 13, 2023
1 parent 3ed2b55 commit 95b5db3
Show file tree
Hide file tree
Showing 11 changed files with 668 additions and 96 deletions.
43 changes: 43 additions & 0 deletions cdc/sink/mysql/helper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package mysql

import (
"context"
"database/sql"

dmysql "github.com/go-sql-driver/mysql"
"github.com/pingcap/errors"
"github.com/pingcap/log"
tmysql "github.com/pingcap/tidb/parser/mysql"
"go.uber.org/zap"
)

// CheckIsTiDB checks if the db connects to a TiDB.
func CheckIsTiDB(ctx context.Context, db *sql.DB) (bool, error) {
var tidbVer string
row := db.QueryRowContext(ctx, "select tidb_version()")
err := row.Scan(&tidbVer)
if err != nil {
log.Error("check tidb version error", zap.Error(err))
// downstream is not TiDB, do nothing
if mysqlErr, ok := errors.Cause(err).(*dmysql.MySQLError); ok &&
(mysqlErr.Number == tmysql.ErrNoDB ||
mysqlErr.Number == tmysql.ErrSpDoesNotExist) {
return false, nil
}
return false, errors.Trace(err)
}
return true, nil
}
53 changes: 47 additions & 6 deletions cdc/sink/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,13 @@ func NewMySQLSink(
"some types of DDL may fail to be executed",
zap.String("hostname", hostName), zap.String("port", port))
}

isTiDB, err := CheckIsTiDB(ctx, testDB)
if err != nil {
return nil, err
}
params.isTiDB = isTiDB

db, err := GetDBConnImpl(ctx, dsnStr)
if err != nil {
return nil, err
Expand Down Expand Up @@ -800,6 +807,7 @@ func convert2RowChanges(
tableInfo,
nil, nil)
}
res.SetApproximateDataSize(row.ApproximateDataSize)
return res
}

Expand Down Expand Up @@ -876,7 +884,7 @@ func (s *mysqlSink) groupRowsByType(
updateRow = append(
updateRow,
convert2RowChanges(row, tableInfo, sqlmodel.RowChangeUpdate))
if len(updateRow) >= s.params.maxTxnRow {
if len(updateRow) >= s.params.batchUpdateRowCount {
updateRows = append(updateRows, updateRow)
updateRow = make([]*sqlmodel.RowChange, 0, s.params.maxTxnRow)
}
Expand Down Expand Up @@ -915,11 +923,22 @@ func (s *mysqlSink) batchSingleTxnDmls(
// handle update
if len(updateRows) > 0 {
// TODO: use sql.GenUpdateSQL to generate update sql after we optimize the func.
for _, rows := range updateRows {
for _, row := range rows {
sql, value := row.GenSQL(sqlmodel.DMLUpdate)
sqls = append(sqls, sql)
values = append(values, value)
if s.params.isTiDB {
for _, rows := range updateRows {
s, v := s.genUpdateSQL(rows...)
sqls = append(sqls, s...)
values = append(values, v...)
}
} else {
// The behavior of batch update statement differs between TiDB and MySQL.
// So we don't use batch update statement when downstream is MySQL.
// Ref:https://docs.pingcap.com/tidb/stable/sql-statement-update#mysql-compatibility
for _, rows := range updateRows {
for _, row := range rows {
sql, value := row.GenSQL(sqlmodel.DMLUpdate)
sqls = append(sqls, sql)
values = append(values, value)
}
}
}
}
Expand Down Expand Up @@ -1128,6 +1147,28 @@ func (s *mysqlSink) execDMLs(ctx context.Context, txns []*model.SingleTableTxn,
return nil
}

func (s *mysqlSink) genUpdateSQL(rows ...*sqlmodel.RowChange) ([]string, [][]interface{}) {
size, count := 0, 0
for _, r := range rows {
size += int(r.GetApproximateDataSize())
count++
}
if size < defaultMaxBatchUpdateRowSize*count {
// use batch update
sql, value := sqlmodel.GenUpdateSQLFast(rows...)
return []string{sql}, [][]interface{}{value}
}
// each row has one independent update SQL.
sqls := make([]string, 0, len(rows))
values := make([][]interface{}, 0, len(rows))
for _, row := range rows {
sql, value := row.GenSQL(sqlmodel.DMLUpdate)
sqls = append(sqls, sql)
values = append(values, value)
}
return sqls, values
}

// if the column value type is []byte and charset is not binary, we get its string
// representation. Because if we use the byte array respresentation, the go-sql-driver
// will automatically set `_binary` charset for that column, which is not expected.
Expand Down
29 changes: 29 additions & 0 deletions cdc/sink/mysql/mysql_params.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,14 @@ const (
defaultTxnIsolationRC = "READ-COMMITTED"
defaultCharacterSet = "utf8mb4"
defaultBatchDMLEnable = true
// defaultMaxBatchUpdateRowCount is the default max number of rows in a
// single batch update SQL.
defaultMaxBatchUpdateRowCount = 40
maxMaxBatchUpdateRowCount = 1024
// defaultMaxBatchUpdateRowSize(1KB) defines the default value of single row.
// When the average row size is larger defaultMaxBatchUpdateRowSize,
// disable batch update, otherwise enable batch update.
defaultMaxBatchUpdateRowSize = 1024
)

var (
Expand All @@ -72,6 +80,7 @@ var defaultParams = &sinkParams{
dialTimeout: defaultDialTimeout,
safeMode: defaultSafeMode,
batchDMLEnable: defaultBatchDMLEnable,
batchUpdateRowCount: defaultMaxBatchUpdateRowCount,
}

var validSchemes = map[string]bool{
Expand All @@ -97,6 +106,8 @@ type sinkParams struct {
timezone string
tls string
batchDMLEnable bool
batchUpdateRowCount int
isTiDB bool
}

func (s *sinkParams) Clone() *sinkParams {
Expand Down Expand Up @@ -266,6 +277,24 @@ func parseSinkURIToParams(ctx context.Context,
params.batchDMLEnable = enable
}

s = sinkURI.Query().Get("max-multi-update-row")
if s != "" {
c, err := strconv.Atoi(s)
if err != nil {
return nil, cerror.WrapError(cerror.ErrMySQLInvalidConfig, err)
}
if c <= 0 {
return nil, cerror.WrapError(cerror.ErrMySQLInvalidConfig,
fmt.Errorf("invalid max-multi-update-row %d, which must be greater than 0", c))
}
if c > maxMaxBatchUpdateRowCount {
log.Warn("max-multi-update-row too large",
zap.Int("original", c), zap.Int("override", maxMaxBatchUpdateRowCount))
c = maxMaxBatchUpdateRowCount
}
params.batchUpdateRowCount = c
}

return params, nil
}

Expand Down
43 changes: 27 additions & 16 deletions cdc/sink/mysql/mysql_params_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ func TestSinkParamsClone(t *testing.T) {
dialTimeout: defaultDialTimeout,
safeMode: defaultSafeMode,
batchDMLEnable: defaultBatchDMLEnable,
batchUpdateRowCount: defaultMaxBatchUpdateRowCount,
}, param1)
require.Equal(t, &sinkParams{
changefeedID: model.DefaultChangeFeedID("123"),
Expand All @@ -58,6 +59,7 @@ func TestSinkParamsClone(t *testing.T) {
dialTimeout: defaultDialTimeout,
safeMode: defaultSafeMode,
batchDMLEnable: defaultBatchDMLEnable,
batchUpdateRowCount: defaultMaxBatchUpdateRowCount,
}, param2)
}

Expand Down Expand Up @@ -211,9 +213,11 @@ func TestParseSinkURIToParams(t *testing.T) {
expected.changefeedID = model.DefaultChangeFeedID("cf-id")
expected.captureAddr = "127.0.0.1:8300"
expected.tidbTxnMode = "pessimistic"
expected.batchUpdateRowCount = 80
uriStr := "mysql://127.0.0.1:3306/?worker-count=64&max-txn-row=20" +
"&batch-replace-enable=true&batch-replace-size=50&safe-mode=false" +
"&tidb-txn-mode=pessimistic"
"&tidb-txn-mode=pessimistic" +
"&max-multi-update-row=80"
opts := map[string]string{
metrics.OptCaptureAddr: expected.captureAddr,
}
Expand Down Expand Up @@ -256,22 +260,29 @@ func TestParseSinkURIOverride(t *testing.T) {
cases := []struct {
uri string
checker func(*sinkParams)
}{{
uri: "mysql://127.0.0.1:3306/?worker-count=2147483648", // int32 max
checker: func(sp *sinkParams) {
require.EqualValues(t, sp.workerCount, maxWorkerCount)
}{
{
uri: "mysql://127.0.0.1:3306/?worker-count=2147483648", // int32 max
checker: func(sp *sinkParams) {
require.EqualValues(t, sp.workerCount, maxWorkerCount)
},
}, {
uri: "mysql://127.0.0.1:3306/?max-txn-row=2147483648", // int32 max
checker: func(sp *sinkParams) {
require.EqualValues(t, sp.maxTxnRow, maxMaxTxnRow)
},
}, {
uri: "mysql://127.0.0.1:3306/?tidb-txn-mode=badmode",
checker: func(sp *sinkParams) {
require.EqualValues(t, sp.tidbTxnMode, defaultTiDBTxnMode)
},
}, {
uri: "mysql://127.0.0.1:3306/?max-multi-update-row=2147483648", // int32 max
checker: func(sp *sinkParams) {
require.EqualValues(t, sp.batchUpdateRowCount, maxMaxBatchUpdateRowCount)
},
},
}, {
uri: "mysql://127.0.0.1:3306/?max-txn-row=2147483648", // int32 max
checker: func(sp *sinkParams) {
require.EqualValues(t, sp.maxTxnRow, maxMaxTxnRow)
},
}, {
uri: "mysql://127.0.0.1:3306/?tidb-txn-mode=badmode",
checker: func(sp *sinkParams) {
require.EqualValues(t, sp.tidbTxnMode, defaultTiDBTxnMode)
},
}}
}
ctx := context.TODO()
var uri *url.URL
var err error
Expand Down
4 changes: 4 additions & 0 deletions cdc/sink/mysql/mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -985,6 +985,10 @@ func mockTestDBWithSQLMode(adjustSQLMode bool, sqlMode interface{}) (*sql.DB, er
"where character_set_name = 'gbk';").WillReturnRows(
sqlmock.NewRows([]string{"character_set_name"}).AddRow("gbk"),
)
mock.ExpectQuery("select tidb_version()").WillReturnError(&dmysql.MySQLError{
Number: 1305,
Message: "FUNCTION test.tidb_version does not exist",
})

mock.ExpectClose()
return db, nil
Expand Down
5 changes: 5 additions & 0 deletions pkg/applier/redo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"testing"

"github.com/DATA-DOG/go-sqlmock"
dmysql "github.com/go-sql-driver/mysql"
"github.com/phayes/freeport"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/redo/common"
Expand Down Expand Up @@ -149,6 +150,10 @@ func TestApplyDMLs(t *testing.T) {
"where character_set_name = 'gbk';").WillReturnRows(
sqlmock.NewRows([]string{"character_set_name"}).AddRow("gbk"),
)
mock.ExpectQuery("select tidb_version()").WillReturnError(&dmysql.MySQLError{
Number: 1305,
Message: "FUNCTION test.tidb_version does not exist",
})
mock.ExpectClose()
return db, nil
}
Expand Down
Loading

0 comments on commit 95b5db3

Please sign in to comment.