Skip to content

Commit

Permalink
sessionctx/binloginfo: fix uncomment pre_split_regions ddl-querys in …
Browse files Browse the repository at this point in the history
…binlog (#11762)
  • Loading branch information
crazycs520 authored and winkyao committed Aug 19, 2019
1 parent 7e0f754 commit a1411ba
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 8 deletions.
55 changes: 47 additions & 8 deletions sessionctx/binloginfo/binloginfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package binloginfo

import (
"math"
"regexp"
"strings"
"sync"
Expand All @@ -28,7 +29,7 @@ import (
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/util/logutil"
binlog "github.com/pingcap/tipb/go-binlog"
"github.com/pingcap/tipb/go-binlog"
"go.uber.org/zap"
"google.golang.org/grpc"
)
Expand All @@ -41,7 +42,8 @@ func init() {
// shared by all sessions.
var pumpsClient *pumpcli.PumpsClient
var pumpsClientLock sync.RWMutex
var shardPat = regexp.MustCompile(`SHARD_ROW_ID_BITS\s*=\s*\d+`)
var shardPat = regexp.MustCompile(`SHARD_ROW_ID_BITS\s*=\s*\d+\s*`)
var preSplitPat = regexp.MustCompile(`PRE_SPLIT_REGIONS\s*=\s*\d+\s*`)

// BinlogInfo contains binlog data and binlog client.
type BinlogInfo struct {
Expand Down Expand Up @@ -136,7 +138,7 @@ func SetDDLBinlog(client *pumpcli.PumpsClient, txn kv.Transaction, jobID int64,
return
}

ddlQuery = addSpecialComment(ddlQuery)
ddlQuery = AddSpecialComment(ddlQuery)
info := &BinlogInfo{
Data: &binlog.Binlog{
Tp: binlog.BinlogType_Prewrite,
Expand All @@ -150,15 +152,52 @@ func SetDDLBinlog(client *pumpcli.PumpsClient, txn kv.Transaction, jobID int64,

const specialPrefix = `/*!90000 `

func addSpecialComment(ddlQuery string) string {
// AddSpecialComment uses to add comment for table option in DDL query.
// Export for testing.
func AddSpecialComment(ddlQuery string) string {
if strings.Contains(ddlQuery, specialPrefix) {
return ddlQuery
}
loc := shardPat.FindStringIndex(strings.ToUpper(ddlQuery))
if loc == nil {
return ddlQuery
return addSpecialCommentByRegexps(ddlQuery, shardPat, preSplitPat)
}

// addSpecialCommentByRegexps uses to add special comment for the worlds in the ddlQuery with match the regexps.
func addSpecialCommentByRegexps(ddlQuery string, regs ...*regexp.Regexp) string {
upperQuery := strings.ToUpper(ddlQuery)
var specialComments []string
minIdx := math.MaxInt64
for i := 0; i < len(regs); {
reg := regs[i]
loc := reg.FindStringIndex(upperQuery)
if len(loc) < 2 {
i++
continue
}
specialComments = append(specialComments, ddlQuery[loc[0]:loc[1]])
if loc[0] < minIdx {
minIdx = loc[0]
}
ddlQuery = ddlQuery[:loc[0]] + ddlQuery[loc[1]:]
upperQuery = upperQuery[:loc[0]] + upperQuery[loc[1]:]
}
if minIdx != math.MaxInt64 {
query := ddlQuery[:minIdx] + specialPrefix
for _, comment := range specialComments {
if query[len(query)-1] != ' ' {
query += " "
}
query += comment
}
if query[len(query)-1] != ' ' {
query += " "
}
query += "*/"
if len(ddlQuery[minIdx:]) > 0 {
return query + " " + ddlQuery[minIdx:]
}
return query
}
return ddlQuery[:loc[0]] + specialPrefix + ddlQuery[loc[0]:loc[1]] + ` */` + ddlQuery[loc[1]:]
return ddlQuery
}

// MockPumpsClient creates a PumpsClient, used for test.
Expand Down
37 changes: 37 additions & 0 deletions sessionctx/binloginfo/binloginfo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,3 +431,40 @@ func (s *testBinlogSuite) TestDeleteSchema(c *C) {
tk.MustExec("delete from b1 where job_id in (select job_id from b2 where batch_class = 'TEST') or split_job_id in (select job_id from b2 where batch_class = 'TEST');")
tk.MustExec("delete b1 from b2 right join b1 on b1.job_id = b2.job_id and batch_class = 'TEST';")
}

func (s *testBinlogSuite) TestAddSpecialComment(c *C) {
testCase := []struct {
input string
result string
}{
{
"create table t1 (id int ) shard_row_id_bits=2;",
"create table t1 (id int ) /*!90000 shard_row_id_bits=2 */ ;",
},
{
"create table t1 (id int ) shard_row_id_bits=2 pre_split_regions=2;",
"create table t1 (id int ) /*!90000 shard_row_id_bits=2 pre_split_regions=2 */ ;",
},
{
"create table t1 (id int ) shard_row_id_bits=2 pre_split_regions=2;",
"create table t1 (id int ) /*!90000 shard_row_id_bits=2 pre_split_regions=2 */ ;",
},

{
"create table t1 (id int ) shard_row_id_bits=2 engine=innodb pre_split_regions=2;",
"create table t1 (id int ) /*!90000 shard_row_id_bits=2 pre_split_regions=2 */ engine=innodb ;",
},
{
"create table t1 (id int ) pre_split_regions=2 shard_row_id_bits=2;",
"create table t1 (id int ) /*!90000 shard_row_id_bits=2 pre_split_regions=2 */ ;",
},
{
"create table t6 (id int ) shard_row_id_bits=2 shard_row_id_bits=3 pre_split_regions=2;",
"create table t6 (id int ) /*!90000 shard_row_id_bits=2 shard_row_id_bits=3 pre_split_regions=2 */ ;",
},
}
for _, ca := range testCase {
re := binloginfo.AddSpecialComment(ca.input)
c.Assert(re, Equals, ca.result)
}
}

0 comments on commit a1411ba

Please sign in to comment.