Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: fix bug when parsing the slow logs if the logNum is small #20959

Merged
merged 9 commits into from
Nov 23, 2020
2 changes: 1 addition & 1 deletion executor/slow_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ func (e *slowQueryRetriever) getBatchLog(reader *bufio.Reader, offset *offset, n
line = string(hack.String(lineByte))
log = append(log, line)
if strings.HasSuffix(line, variable.SlowLogSQLSuffixStr) {
if strings.HasPrefix(line, "use") {
if strings.HasPrefix(line, "use") || strings.HasPrefix(line, variable.SlowLogRowPrefixStr) {
continue
}
break
Expand Down
51 changes: 37 additions & 14 deletions executor/slow_query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@ import (
"github.com/pingcap/tidb/util/mock"
)

func parseLog(retriever *slowQueryRetriever, sctx sessionctx.Context, reader *bufio.Reader) ([][]types.Datum, error) {
func parseLog(retriever *slowQueryRetriever, sctx sessionctx.Context, reader *bufio.Reader, logNum int) ([][]types.Datum, error) {
retriever.parsedSlowLogCh = make(chan parsedSlowLog, 100)
ctx := context.Background()
retriever.parseSlowLog(ctx, sctx, reader, 64)
retriever.parseSlowLog(ctx, sctx, reader, logNum)
slowLog := <-retriever.parsedSlowLogCh
rows, err := slowLog.rows, slowLog.err
if err == io.EOF {
Expand All @@ -45,11 +45,11 @@ func parseLog(retriever *slowQueryRetriever, sctx sessionctx.Context, reader *bu
return rows, err
}

func parseSlowLog(sctx sessionctx.Context, reader *bufio.Reader) ([][]types.Datum, error) {
func parseSlowLog(sctx sessionctx.Context, reader *bufio.Reader, logNum int) ([][]types.Datum, error) {
retriever := &slowQueryRetriever{}
// Ignore the error is ok for test.
terror.Log(retriever.initialize(sctx))
rows, err := parseLog(retriever, sctx, reader)
rows, err := parseLog(retriever, sctx, reader, logNum)
return rows, err
}

Expand Down Expand Up @@ -83,7 +83,7 @@ select * from t;`
c.Assert(err, IsNil)
sctx := mock.NewContext()
sctx.GetSessionVars().TimeZone = loc
_, err = parseSlowLog(sctx, reader)
_, err = parseSlowLog(sctx, reader, 64)
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "panic test")
}
Expand Down Expand Up @@ -119,7 +119,7 @@ select * from t;`
c.Assert(err, IsNil)
ctx := mock.NewContext()
ctx.GetSessionVars().TimeZone = loc
rows, err := parseSlowLog(ctx, reader)
rows, err := parseSlowLog(ctx, reader, 64)
c.Assert(err, IsNil)
c.Assert(len(rows), Equals, 1)
recordString := ""
Expand All @@ -140,6 +140,29 @@ select * from t;`
`update t set i = 1;,select * from t;`
c.Assert(expectRecordString, Equals, recordString)

// Issue 20928
reader = bufio.NewReader(bytes.NewBufferString(slowLogStr))
rows, err = parseSlowLog(ctx, reader, 1)
c.Assert(err, IsNil)
c.Assert(len(rows), Equals, 1)
recordString = ""
for i, value := range rows[0] {
str, err := value.ToString()
c.Assert(err, IsNil)
if i > 0 {
recordString += ","
}
recordString += str
}
expectRecordString = `2019-04-28 15:24:04.309074,` +
`405888132465033227,root,localhost,0,57,0.12,0.216905,` +
`0,0,0,0,0,0,0,0,0,0,0,0,,0,0,0,0,0,0,0.38,0.021,0,0,0,1,637,0,10,10,10,10,100,,,1,42a1c8aae6f133e934d4bf0147491709a8812ea05ff8819ec522780fe657b772,t1:1,t2:2,` +
`0.1,0.2,0.03,127.0.0.1:20160,0.05,0.6,0.8,0.0.0.0:20160,70724,65536,0,0,0,0,` +
`Cop_backoff_regionMiss_total_times: 200 Cop_backoff_regionMiss_total_time: 0.2 Cop_backoff_regionMiss_max_time: 0.2 Cop_backoff_regionMiss_max_addr: 127.0.0.1 Cop_backoff_regionMiss_avg_time: 0.2 Cop_backoff_regionMiss_p90_time: 0.2 Cop_backoff_rpcPD_total_times: 200 Cop_backoff_rpcPD_total_time: 0.2 Cop_backoff_rpcPD_max_time: 0.2 Cop_backoff_rpcPD_max_addr: 127.0.0.1 Cop_backoff_rpcPD_avg_time: 0.2 Cop_backoff_rpcPD_p90_time: 0.2 Cop_backoff_rpcTiKV_total_times: 200 Cop_backoff_rpcTiKV_total_time: 0.2 Cop_backoff_rpcTiKV_max_time: 0.2 Cop_backoff_rpcTiKV_max_addr: 127.0.0.1 Cop_backoff_rpcTiKV_avg_time: 0.2 Cop_backoff_rpcTiKV_p90_time: 0.2,` +
`0,0,1,1,,60e9378c746d9a2be1c791047e008967cf252eb6de9167ad3aa6098fa2d523f4,` +
`update t set i = 1;,select * from t;`
c.Assert(expectRecordString, Equals, recordString)

// fix sql contain '# ' bug
slowLog := bytes.NewBufferString(
`# Time: 2019-04-28T15:24:04.309074+08:00
Expand All @@ -155,7 +178,7 @@ select a# from t;
select * from t;
`)
reader = bufio.NewReader(slowLog)
_, err = parseSlowLog(ctx, reader)
_, err = parseSlowLog(ctx, reader, 64)
c.Assert(err, IsNil)

// test for time format compatibility.
Expand All @@ -166,7 +189,7 @@ select * from t;
select * from t;
`)
reader = bufio.NewReader(slowLog)
rows, err = parseSlowLog(ctx, reader)
rows, err = parseSlowLog(ctx, reader, 64)
c.Assert(err, IsNil)
c.Assert(len(rows) == 2, IsTrue)
t0Str, err := rows[0][0].ToString()
Expand All @@ -183,7 +206,7 @@ select * from t;
select * from t;
`)
reader = bufio.NewReader(slowLog)
_, err = parseSlowLog(ctx, reader)
_, err = parseSlowLog(ctx, reader, 64)
c.Assert(err, IsNil)
warnings := ctx.GetSessionVars().StmtCtx.GetWarnings()
c.Assert(warnings, HasLen, 1)
Expand All @@ -207,13 +230,13 @@ select * from t;
sql := strings.Repeat("x", int(variable.MaxOfMaxAllowedPacket+1))
slowLog.WriteString(sql)
reader := bufio.NewReader(slowLog)
_, err = parseSlowLog(ctx, reader)
_, err = parseSlowLog(ctx, reader, 64)
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "single line length exceeds limit: 65536")

variable.MaxOfMaxAllowedPacket = originValue
reader = bufio.NewReader(slowLog)
_, err = parseSlowLog(ctx, reader)
_, err = parseSlowLog(ctx, reader, 64)
c.Assert(err, IsNil)
}

Expand Down Expand Up @@ -264,7 +287,7 @@ select * from t;`)
c.Assert(err, IsNil)
ctx := mock.NewContext()
ctx.GetSessionVars().TimeZone = loc
_, err = parseSlowLog(ctx, scanner)
_, err = parseSlowLog(ctx, scanner, 64)
c.Assert(err, IsNil)

// Test parser error.
Expand All @@ -274,7 +297,7 @@ select * from t;`)
`)

scanner = bufio.NewReader(slowLog)
_, err = parseSlowLog(ctx, scanner)
_, err = parseSlowLog(ctx, scanner, 64)
c.Assert(err, IsNil)
warnings := ctx.GetSessionVars().StmtCtx.GetWarnings()
c.Assert(warnings, HasLen, 1)
Expand Down Expand Up @@ -433,7 +456,7 @@ select 7;`
c.Assert(retriever.files, HasLen, len(cas.files), comment)
if len(retriever.files) > 0 {
reader := bufio.NewReader(retriever.files[0].file)
rows, err := parseLog(retriever, sctx, reader)
rows, err := parseLog(retriever, sctx, reader, 64)
c.Assert(err, IsNil)
c.Assert(len(rows), Equals, len(cas.querys), comment)
for i, row := range rows {
Expand Down