Skip to content

Commit

Permalink
Merge branch 'master' into support_dump_file_for_trace_plan
Browse files Browse the repository at this point in the history
  • Loading branch information
qw4990 authored Nov 25, 2021
2 parents 3a6308c + 78c653e commit bede720
Show file tree
Hide file tree
Showing 11 changed files with 690 additions and 33 deletions.
5 changes: 0 additions & 5 deletions executor/prepared_serial_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -706,11 +706,6 @@ func TestPlanCacheOperators(t *testing.T) {

// execute this statement and check whether it uses a cached plan
results := tk.MustQuery("execute stmt " + usingStmt).Sort().Rows()
useCache := "0"
if execCase.UseCache {
useCache = "1"
}
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows(useCache))

// check whether the result is correct
tmp := strings.Split(prepCase.PrepStmt, "?")
Expand Down
12 changes: 12 additions & 0 deletions planner/core/common_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -610,6 +610,9 @@ func (e *Execute) rebuildRange(p Plan) error {
if err != nil {
return err
}
if len(ranges.Ranges) == 0 || len(ranges.AccessConds) != len(x.AccessConditions) {
return errors.New("failed to rebuild range: the length of the range has changed")
}
for i := range x.IndexValues {
x.IndexValues[i] = ranges.Ranges[0].LowVal[i]
}
Expand All @@ -625,6 +628,9 @@ func (e *Execute) rebuildRange(p Plan) error {
if err != nil {
return err
}
if len(ranges) == 0 {
return errors.New("failed to rebuild range: the length of the range has changed")
}
x.Handle = kv.IntHandle(ranges[0].LowVal[0].GetInt64())
}
}
Expand Down Expand Up @@ -658,6 +664,9 @@ func (e *Execute) rebuildRange(p Plan) error {
if err != nil {
return err
}
if len(ranges.Ranges) != len(x.IndexValues) || len(ranges.AccessConds) != len(x.AccessConditions) {
return errors.New("failed to rebuild range: the length of the range has changed")
}
for i := range x.IndexValues {
for j := range ranges.Ranges[i].LowVal {
x.IndexValues[i][j] = ranges.Ranges[i].LowVal[j]
Expand All @@ -675,6 +684,9 @@ func (e *Execute) rebuildRange(p Plan) error {
if err != nil {
return err
}
if len(ranges) != len(x.Handles) {
return errors.New("failed to rebuild range: the length of the range has changed")
}
for i := range ranges {
x.Handles[i] = kv.IntHandle(ranges[i].LowVal[0].GetInt64())
}
Expand Down
6 changes: 5 additions & 1 deletion planner/core/find_best_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -803,7 +803,11 @@ func (ds *DataSource) findBestTask(prop *property.PhysicalProperty, planCounter
continue
}
// if we already know the range of the scan is empty, just return a TableDual
if len(path.Ranges) == 0 && !ds.ctx.GetSessionVars().StmtCtx.UseCache {
if len(path.Ranges) == 0 {
// We should uncache the tableDual plan.
if expression.MaybeOverOptimized4PlanCache(ds.ctx, path.AccessConds) {
ds.ctx.GetSessionVars().StmtCtx.MaybeOverOptimized4PlanCache = true
}
dual := PhysicalTableDual{}.Init(ds.ctx, ds.stats, ds.blockOffset)
dual.SetSchema(ds.schema)
cntPlan += 1
Expand Down
116 changes: 115 additions & 1 deletion planner/core/prepare_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1850,8 +1850,11 @@ func (s *testPrepareSerialSuite) TestIssue28246(c *C) {
tk.MustExec("set @a=9223372036854775807, @b=1")
tk.MustExec(`prepare stmt from 'select min(col1) from PK_AUTO_RANDOM9111 where col1 > ?;';`)
tk.MustQuery("execute stmt using @a").Check(testkit.Rows("<nil>"))
// The plan contains the tableDual, so it will not be cached.
tk.MustQuery("execute stmt using @a").Check(testkit.Rows("<nil>"))
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0"))
tk.MustQuery("execute stmt using @b").Check(testkit.Rows("9223372036854775807"))
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("1"))
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0"))
tk.MustQuery("execute stmt using @a").Check(testkit.Rows("<nil>"))
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("1"))
}
Expand Down Expand Up @@ -1891,6 +1894,117 @@ func (s *testPrepareSerialSuite) TestIssue29805(c *C) {
tk.MustQuery("select/*+ hash_agg() */ count(distinct col1) from PK_TCOLLATION10197 where col1 > '龺';").Check(testkit.Rows("0"))
}

func (s *testPrepareSerialSuite) TestIssue29993(c *C) {
defer testleak.AfterTest(c)()
store, dom, err := newStoreWithBootstrap()
c.Assert(err, IsNil)
tk := testkit.NewTestKit(c, store)
orgEnable := core.PreparedPlanCacheEnabled()
defer func() {
dom.Close()
err = store.Close()
c.Assert(err, IsNil)
core.SetPreparedPlanCache(orgEnable)
}()
core.SetPreparedPlanCache(true)
tk.Se, err = session.CreateSession4TestWithOpt(store, &session.Opt{
PreparedPlanCache: kvcache.NewSimpleLRUCache(100, 0.1, math.MaxUint64),
})
c.Assert(err, IsNil)

tk.MustExec("use test")

// test PointGet + cluster index
tk.MustExec("set tidb_enable_clustered_index=on;")
tk.MustExec("drop table if exists t;")
tk.MustExec("CREATE TABLE `t` (`COL1` enum('a', 'b') NOT NULL PRIMARY KEY, col2 int) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;")
tk.MustExec("insert into t values('a', 1), ('b', 2);")
tk.MustExec("set @a='a', @b='b', @z='z';")
tk.MustExec(`prepare stmt from 'select col1 from t where col1 = ? and col2 in (1, 2);';`)
tk.MustQuery("execute stmt using @a").Check(testkit.Rows("a"))
tk.MustQuery("execute stmt using @b").Check(testkit.Rows("b"))
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("1"))
tk.MustQuery("execute stmt using @z").Check(testkit.Rows())
// The length of range have been changed, so the plan can not be cached.
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0"))
tk.MustQuery("execute stmt using @z").Check(testkit.Rows())

// test batchPointGet + cluster index
tk.MustExec("drop table if exists t;")
tk.MustExec("CREATE TABLE `t` (`COL1` enum('a', 'b') NOT NULL, col2 int, PRIMARY KEY(col1, col2)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;")
tk.MustExec("insert into t values('a', 1), ('b', 2);")
tk.MustExec("set @a='a', @b='b', @z='z';")
tk.MustExec(`prepare stmt from 'select col1 from t where (col1, col2) in ((?, 1));';`)
tk.MustQuery("execute stmt using @a").Check(testkit.Rows("a"))
tk.MustQuery("execute stmt using @b").Check(testkit.Rows())
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("1"))
tk.MustQuery("execute stmt using @z").Check(testkit.Rows())
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("1"))
tk.MustQuery("execute stmt using @z").Check(testkit.Rows())

// test PointGet + non cluster index
tk.MustExec("set tidb_enable_clustered_index=off;")
tk.MustExec("drop table if exists t;")
tk.MustExec("CREATE TABLE `t` (`COL1` enum('a', 'b') NOT NULL PRIMARY KEY, col2 int) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;")
tk.MustExec("insert into t values('a', 1), ('b', 2);")
tk.MustExec("set @a='a', @b='b', @z='z';")
tk.MustExec(`prepare stmt from 'select col1 from t where col1 = ? and col2 in (1, 2);';`)
tk.MustQuery("execute stmt using @a").Check(testkit.Rows("a"))
tk.MustQuery("execute stmt using @b").Check(testkit.Rows("b"))
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("1"))
tk.MustQuery("execute stmt using @z").Check(testkit.Rows())
// The length of range have been changed, so the plan can not be cached.
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0"))
tk.MustQuery("execute stmt using @z").Check(testkit.Rows())

// test batchPointGet + non cluster index
tk.MustExec("drop table if exists t;")
tk.MustExec("CREATE TABLE `t` (`COL1` enum('a', 'b') NOT NULL, col2 int, PRIMARY KEY(col1, col2)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;")
tk.MustExec("insert into t values('a', 1), ('b', 2);")
tk.MustExec("set @a='a', @b='b', @z='z';")
tk.MustExec(`prepare stmt from 'select col1 from t where (col1, col2) in ((?, 1));';`)
tk.MustQuery("execute stmt using @a").Check(testkit.Rows("a"))
tk.MustQuery("execute stmt using @b").Check(testkit.Rows())
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("1"))
tk.MustQuery("execute stmt using @z").Check(testkit.Rows())
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("1"))
tk.MustQuery("execute stmt using @z").Check(testkit.Rows())
}

func (s *testPrepareSerialSuite) TestIssue30100(c *C) {
defer testleak.AfterTest(c)()
store, dom, err := newStoreWithBootstrap()
c.Assert(err, IsNil)
tk := testkit.NewTestKit(c, store)
orgEnable := core.PreparedPlanCacheEnabled()
defer func() {
dom.Close()
err = store.Close()
c.Assert(err, IsNil)
core.SetPreparedPlanCache(orgEnable)
}()
core.SetPreparedPlanCache(true)
tk.Se, err = session.CreateSession4TestWithOpt(store, &session.Opt{
PreparedPlanCache: kvcache.NewSimpleLRUCache(100, 0.1, math.MaxUint64),
})
c.Assert(err, IsNil)

tk.MustExec("use test")
tk.MustExec("drop table if exists t;")
tk.MustExec("create table t(col1 enum('aa', 'bb'), col2 int, index(col1, col2));")
tk.MustExec("insert into t values('aa', 333);")
tk.MustExec(`prepare stmt from 'SELECT * FROM t t1 JOIN t t2 ON t1.col1 = t2.col1 WHERE t1.col1 <=> NULL';`)
tk.MustQuery("execute stmt").Check(testkit.Rows())
tk.MustQuery("execute stmt").Check(testkit.Rows())
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("1"))

tk.MustExec(`prepare stmt from 'SELECT * FROM t t1 JOIN t t2 ON t1.col1 = t2.col1 WHERE t1.col1 <=> NULL and t2.col2 > ?';`)
tk.MustExec("set @a=0;")
tk.MustQuery("execute stmt using @a").Check(testkit.Rows())
tk.MustQuery("execute stmt using @a").Check(testkit.Rows())
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("1"))
}

func (s *testPlanSerialSuite) TestPartitionTable(c *C) {
if israce.RaceEnabled {
c.Skip("exhaustive types test, skip race test")
Expand Down
6 changes: 4 additions & 2 deletions server/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,8 +310,10 @@ func (cc *clientConn) Close() error {

func closeConn(cc *clientConn, connections int) error {
metrics.ConnGauge.Set(float64(connections))
err := cc.bufReadConn.Close()
terror.Log(err)
if cc.bufReadConn != nil {
err := cc.bufReadConn.Close()
terror.Log(err)
}
if cc.ctx != nil {
return cc.ctx.Close()
}
Expand Down
106 changes: 106 additions & 0 deletions server/mock_conn.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package server

import (
"bufio"
"bytes"
"context"
"testing"

"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/util/arena"
"github.com/pingcap/tidb/util/chunk"
"github.com/stretchr/testify/require"
)

// MockConn is a mock connection.
type MockConn interface {
// HandleQuery executes a statement
HandleQuery(ctx context.Context, sql string) error
// Context gets the TiDBContext
Context() *TiDBContext
// Dispatch executes command according to the command type
Dispatch(ctx context.Context, data []byte) error
// Close releases resources
Close()
}

type mockConn struct {
*clientConn
t *testing.T
}

// HandleQuery implements MockConn.HandleQuery
func (mc *mockConn) HandleQuery(ctx context.Context, sql string) error {
return mc.handleQuery(ctx, sql)
}

// Context implements MockConn.Context
func (mc *mockConn) Context() *TiDBContext {
return mc.ctx
}

// Dispatch implements MockConn.Dispatch
func (mc *mockConn) Dispatch(ctx context.Context, data []byte) error {
return mc.dispatch(ctx, data)
}

// Close implements MockConn.Close
func (mc *mockConn) Close() {
require.NoError(mc.t, mc.clientConn.Close())
}

// CreateMockServer creates a mock server.
func CreateMockServer(t *testing.T, store kv.Storage) *Server {
tidbdrv := NewTiDBDriver(store)
cfg := config.NewConfig()
cfg.Socket = ""
cfg.Port, cfg.Status.StatusPort = 0, 0
cfg.Status.ReportStatus = false
cfg.Security.AutoTLS = false
server, err := NewServer(cfg, tidbdrv)
require.NoError(t, err)
return server
}

// CreateMockConn creates a mock connection together with a session.
func CreateMockConn(t *testing.T, store kv.Storage, server *Server) MockConn {
se, err := session.CreateSession4Test(store)
require.NoError(t, err)
tc := &TiDBContext{
Session: se,
stmts: make(map[int]*TiDBStatement),
}

cc := &clientConn{
server: server,
ctx: tc,
salt: []byte{},
collation: mysql.DefaultCollationID,
alloc: arena.NewAllocator(1024),
chunkAlloc: chunk.NewAllocator(),
pkt: &packetIO{
bufWriter: bufio.NewWriter(bytes.NewBuffer(nil)),
},
}
return &mockConn{
clientConn: cc,
t: t,
}
}
48 changes: 48 additions & 0 deletions server/mock_conn_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package server

import (
"context"
"testing"

"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/testkit"
"github.com/stretchr/testify/require"
)

func TestMockConn(t *testing.T) {
t.Parallel()

store, clean := testkit.CreateMockStore(t)
defer clean()
server := CreateMockServer(t, store)
defer server.Close()
conn := CreateMockConn(t, store, server)
defer conn.Close()

require.NoError(t, conn.HandleQuery(context.Background(), "select 1"))
require.Equal(t, "select 1", conn.Context().GetSessionVars().StmtCtx.OriginalSQL)

require.Error(t, conn.HandleQuery(context.Background(), "select"))

inBytes := append([]byte{mysql.ComQuery}, []byte("select 1")...)
require.NoError(t, conn.Dispatch(context.Background(), inBytes))
require.Equal(t, "select 1", conn.Context().GetSessionVars().StmtCtx.OriginalSQL)

inBytes = append([]byte{mysql.ComStmtPrepare}, []byte("select 1")...)
require.NoError(t, conn.Dispatch(context.Background(), inBytes))
require.Equal(t, "select 1", conn.Context().GetSessionVars().StmtCtx.OriginalSQL)
}
Loading

0 comments on commit bede720

Please sign in to comment.