-
Notifications
You must be signed in to change notification settings - Fork 5.9k
/
Copy pathdistsql_test.go
635 lines (561 loc) · 28.7 KB
/
distsql_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
// Copyright 2017 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package executor_test
import (
"bytes"
"context"
"fmt"
"math/rand"
"regexp"
"runtime/pprof"
"strconv"
"strings"
"testing"
"time"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/executor"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/store/copr"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/mock"
"github.com/pingcap/tidb/util/paging"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/testutils"
)
// checkGoroutineExists
// nolint:unused
func checkGoroutineExists(keyword string) bool {
buf := new(bytes.Buffer)
profile := pprof.Lookup("goroutine")
err := profile.WriteTo(buf, 1)
if err != nil {
panic(err)
}
str := buf.String()
return strings.Contains(str, keyword)
}
func TestCopClientSend(t *testing.T) {
t.Skip("not stable")
var cluster testutils.Cluster
store, dom := testkit.CreateMockStoreAndDomain(t, mockstore.WithClusterInspector(func(c testutils.Cluster) {
mockstore.BootstrapWithSingleStore(c)
cluster = c
}))
if _, ok := store.GetClient().(*copr.CopClient); !ok {
// Make sure the store is tikv store.
return
}
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create table copclient (id int primary key)")
// Insert 1000 rows.
var values []string
for i := 0; i < 1000; i++ {
values = append(values, fmt.Sprintf("(%d)", i))
}
tk.MustExec("insert copclient values " + strings.Join(values, ","))
// Get table ID for split.
is := dom.InfoSchema()
tbl, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("copclient"))
require.NoError(t, err)
tblID := tbl.Meta().ID
// Split the table.
tableStart := tablecodec.GenTableRecordPrefix(tblID)
cluster.SplitKeys(tableStart, tableStart.PrefixNext(), 100)
ctx := context.Background()
// Send coprocessor request when the table split.
rs, err := tk.Exec("select sum(id) from copclient")
require.NoError(t, err)
req := rs.NewChunk(nil)
err = rs.Next(ctx, req)
require.NoError(t, err)
require.Equal(t, "499500", req.GetRow(0).GetMyDecimal(0).String())
require.NoError(t, rs.Close())
// Split one region.
key := tablecodec.EncodeRowKeyWithHandle(tblID, kv.IntHandle(500))
region, _, _ := cluster.GetRegionByKey(key)
peerID := cluster.AllocID()
cluster.Split(region.GetId(), cluster.AllocID(), key, []uint64{peerID}, peerID)
// Check again.
rs, err = tk.Exec("select sum(id) from copclient")
require.NoError(t, err)
req = rs.NewChunk(nil)
err = rs.Next(ctx, req)
require.NoError(t, err)
require.Equal(t, "499500", req.GetRow(0).GetMyDecimal(0).String())
require.NoError(t, rs.Close())
// Check there is no goroutine leak.
rs, err = tk.Exec("select * from copclient order by id")
require.NoError(t, err)
req = rs.NewChunk(nil)
err = rs.Next(ctx, req)
require.NoError(t, err)
require.NoError(t, rs.Close())
keyword := "(*copIterator).work"
require.False(t, checkGoroutineExists(keyword))
}
func TestGetLackHandles(t *testing.T) {
expectedHandles := []kv.Handle{kv.IntHandle(1), kv.IntHandle(2), kv.IntHandle(3), kv.IntHandle(4),
kv.IntHandle(5), kv.IntHandle(6), kv.IntHandle(7), kv.IntHandle(8), kv.IntHandle(9), kv.IntHandle(10)}
handlesMap := kv.NewHandleMap()
for _, h := range expectedHandles {
handlesMap.Set(h, true)
}
// expected handles 1, 2, 3, 4, 5, 6, 7, 8, 9, 10
// obtained handles 1, 2, 3, 4, 5, 6, 7, 8, 9, 10
diffHandles := executor.GetLackHandles(expectedHandles, handlesMap)
require.Len(t, diffHandles, 0)
require.Equal(t, 0, handlesMap.Len())
// expected handles 1, 2, 3, 4, 5, 6, 7, 8, 9, 10
// obtained handles 2, 3, 4, 6, 7, 8, 9
retHandles := []kv.Handle{kv.IntHandle(2), kv.IntHandle(3), kv.IntHandle(4), kv.IntHandle(6),
kv.IntHandle(7), kv.IntHandle(8), kv.IntHandle(9)}
handlesMap = kv.NewHandleMap()
handlesMap.Set(kv.IntHandle(1), true)
handlesMap.Set(kv.IntHandle(5), true)
handlesMap.Set(kv.IntHandle(10), true)
diffHandles = executor.GetLackHandles(expectedHandles, handlesMap)
require.Equal(t, diffHandles, retHandles) // deep equal
}
func TestBigIntPK(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create table t(a bigint unsigned primary key, b int, c int, index idx(a, b))")
tk.MustExec("insert into t values(1, 1, 1), (9223372036854775807, 2, 2)")
tk.MustQuery("select * from t use index(idx) order by a").Check(testkit.Rows("1 1 1", "9223372036854775807 2 2"))
}
func TestCorColToRanges(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("set sql_mode='STRICT_TRANS_TABLES'") // disable only-full-group-by
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int primary key, b int, c int, index idx(b))")
tk.MustExec("insert into t values(1, 1, 1), (2, 2 ,2), (3, 3, 3), (4, 4, 4), (5, 5, 5), (6, 6, 6), (7, 7, 7), (8, 8, 8), (9, 9, 9)")
tk.MustExec("analyze table t")
// Test single read on table.
tk.MustQuery("select t.c in (select count(*) from t s ignore index(idx), t t1 where s.a = t.a and s.a = t1.a) from t order by 1 desc").Check(testkit.Rows("1", "0", "0", "0", "0", "0", "0", "0", "0"))
// Test single read on index.
tk.MustQuery("select t.c in (select count(*) from t s use index(idx), t t1 where s.b = t.a and s.a = t1.a) from t order by 1 desc").Check(testkit.Rows("1", "0", "0", "0", "0", "0", "0", "0", "0"))
// Test IndexLookUpReader.
tk.MustQuery("select t.c in (select count(*) from t s use index(idx), t t1 where s.b = t.a and s.c = t1.a) from t order by 1 desc").Check(testkit.Rows("1", "0", "0", "0", "0", "0", "0", "0", "0"))
}
func TestUniqueKeyNullValueSelect(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
// test null in unique-key
tk.MustExec("create table t (id int default null, c varchar(20), unique id (id));")
tk.MustExec("insert t (c) values ('a'), ('b'), ('c');")
res := tk.MustQuery("select * from t where id is null;")
res.Check(testkit.Rows("<nil> a", "<nil> b", "<nil> c"))
// test null in mul unique-key
tk.MustExec("drop table t")
tk.MustExec("create table t (id int default null, b int default 1, c varchar(20), unique id_c(id, b));")
tk.MustExec("insert t (c) values ('a'), ('b'), ('c');")
res = tk.MustQuery("select * from t where id is null and b = 1;")
res.Check(testkit.Rows("<nil> 1 a", "<nil> 1 b", "<nil> 1 c"))
tk.MustExec("drop table t")
// test null in non-unique-key
tk.MustExec("create table t (id int default null, c varchar(20), key id (id));")
tk.MustExec("insert t (c) values ('a'), ('b'), ('c');")
res = tk.MustQuery("select * from t where id is null;")
res.Check(testkit.Rows("<nil> a", "<nil> b", "<nil> c"))
}
// TestIssue10178 contains tests for https://github.com/pingcap/tidb/issues/10178 .
func TestIssue10178(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a bigint unsigned primary key)")
tk.MustExec("insert into t values(9223372036854775807), (18446744073709551615)")
tk.MustQuery("select max(a) from t").Check(testkit.Rows("18446744073709551615"))
tk.MustQuery("select * from t where a > 9223372036854775807").Check(testkit.Rows("18446744073709551615"))
tk.MustQuery("select * from t where a < 9223372036854775808").Check(testkit.Rows("9223372036854775807"))
}
func TestInconsistentIndex(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int, b int, index idx_a(a))")
is := dom.InfoSchema()
tbl, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
require.NoError(t, err)
idx := tbl.Meta().FindIndexByName("idx_a")
idxOp := tables.NewIndex(tbl.Meta().ID, tbl.Meta(), idx)
ctx := mock.NewContext()
ctx.Store = store
for i := 0; i < 10; i++ {
tk.MustExec(fmt.Sprintf("insert into t values (%d, %d)", i+10, i))
require.NoError(t, tk.QueryToErr("select * from t where a>=0"))
}
for i := 0; i < 10; i++ {
tk.MustExec(fmt.Sprintf("update t set a=%d where a=%d", i, i+10))
require.NoError(t, tk.QueryToErr("select * from t where a>=0"))
}
for i := 0; i < 10; i++ {
txn, err := store.Begin()
require.NoError(t, err)
_, err = idxOp.Create(ctx, txn, types.MakeDatums(i+10), kv.IntHandle(100+i), nil)
require.NoError(t, err)
err = txn.Commit(context.Background())
require.NoError(t, err)
err = tk.QueryToErr("select * from t use index(idx_a) where a >= 0")
require.Equal(t, fmt.Sprintf("[executor:8133]data inconsistency in table: t, index: idx_a, index-count:%d != record-count:10", i+11), err.Error())
// if has other conditions, the inconsistent index check doesn't work.
err = tk.QueryToErr("select * from t where a>=0 and b<10")
require.NoError(t, err)
}
// fix inconsistent problem to pass CI
for i := 0; i < 10; i++ {
txn, err := store.Begin()
require.NoError(t, err)
err = idxOp.Delete(ctx.GetSessionVars().StmtCtx, txn, types.MakeDatums(i+10), kv.IntHandle(100+i))
require.NoError(t, err)
err = txn.Commit(context.Background())
require.NoError(t, err)
}
}
func TestPushLimitDownIndexLookUpReader(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists tbl")
tk.MustExec("create table tbl(a int, b int, c int, key idx_b_c(b,c))")
tk.MustExec("insert into tbl values(1,1,1),(2,2,2),(3,3,3),(4,4,4),(5,5,5)")
tk.MustQuery("select * from tbl use index(idx_b_c) where b > 1 limit 2,1").Check(testkit.Rows("4 4 4"))
tk.MustQuery("select * from tbl use index(idx_b_c) where b > 4 limit 2,1").Check(testkit.Rows())
tk.MustQuery("select * from tbl use index(idx_b_c) where b > 3 limit 2,1").Check(testkit.Rows())
tk.MustQuery("select * from tbl use index(idx_b_c) where b > 2 limit 2,1").Check(testkit.Rows("5 5 5"))
tk.MustQuery("select * from tbl use index(idx_b_c) where b > 1 limit 1").Check(testkit.Rows("2 2 2"))
tk.MustQuery("select * from tbl use index(idx_b_c) where b > 1 order by b desc limit 2,1").Check(testkit.Rows("3 3 3"))
tk.MustQuery("select * from tbl use index(idx_b_c) where b > 1 and c > 1 limit 2,1").Check(testkit.Rows("4 4 4"))
}
func TestPartitionTableIndexLookUpReader(t *testing.T) {
failpoint.Enable("github.com/pingcap/tidb/planner/core/forceDynamicPrune", `return(true)`)
defer failpoint.Disable("github.com/pingcap/tidb/planner/core/forceDynamicPrune")
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec(`create table t (a int, b int, key(a))
partition by range (a) (
partition p1 values less than (10),
partition p2 values less than (20),
partition p3 values less than (30),
partition p4 values less than (40))`)
tk.MustExec(`insert into t values (1, 1), (2, 2), (11, 11), (12, 12), (21, 21), (22, 22), (31, 31), (32, 32)`)
tk.MustExec(`set tidb_partition_prune_mode='dynamic'`)
tk.MustQuery("select * from t where a>=1 and a<=1").Sort().Check(testkit.Rows("1 1"))
tk.MustQuery("select * from t where a>=1 and a<=2").Sort().Check(testkit.Rows("1 1", "2 2"))
tk.MustQuery("select * from t where a>=1 and a<12").Sort().Check(testkit.Rows("1 1", "11 11", "2 2"))
tk.MustQuery("select * from t where a>=1 and a<15").Sort().Check(testkit.Rows("1 1", "11 11", "12 12", "2 2"))
tk.MustQuery("select * from t where a>15 and a<32").Sort().Check(testkit.Rows("21 21", "22 22", "31 31"))
tk.MustQuery("select * from t where a>30").Sort().Check(testkit.Rows("31 31", "32 32"))
tk.MustQuery("select * from t where a>=1 and a<15 order by a").Check(testkit.Rows("1 1", "2 2", "11 11", "12 12"))
tk.MustQuery("select * from t where a>=1 and a<15 order by a limit 1").Check(testkit.Rows("1 1"))
tk.MustQuery("select * from t where a>=1 and a<15 order by a limit 3").Check(testkit.Rows("1 1", "2 2", "11 11"))
tk.MustQuery("select * from t where a between 1 and 15 order by a limit 3").Check(testkit.Rows("1 1", "2 2", "11 11"))
tk.MustQuery("select * from t where a between 1 and 15 order by a limit 3 offset 1").Check(testkit.Rows("2 2", "11 11", "12 12"))
}
func TestPartitionTableRandomlyIndexLookUpReader(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec(`create table t (a int, b int, key(a))
partition by range (a) (
partition p1 values less than (10),
partition p2 values less than (20),
partition p3 values less than (30),
partition p4 values less than (40))`)
tk.MustExec("create table tnormal (a int, b int, key(a))")
values := make([]string, 0, 128)
for i := 0; i < 128; i++ {
values = append(values, fmt.Sprintf("(%v, %v)", rand.Intn(40), rand.Intn(40)))
}
tk.MustExec(fmt.Sprintf("insert into t values %v", strings.Join(values, ", ")))
tk.MustExec(fmt.Sprintf("insert into tnormal values %v", strings.Join(values, ", ")))
randRange := func() (int, int) {
a, b := rand.Intn(40), rand.Intn(40)
if a > b {
return b, a
}
return a, b
}
for i := 0; i < 256; i++ {
la, ra := randRange()
lb, rb := randRange()
cond := fmt.Sprintf("(a between %v and %v) or (b between %v and %v)", la, ra, lb, rb)
tk.MustQuery("select * from t use index(a) where " + cond).Sort().Check(
tk.MustQuery("select * from tnormal where " + cond).Sort().Rows())
}
}
func TestIndexLookUpStats(t *testing.T) {
stats := &executor.IndexLookUpRunTimeStats{
FetchHandleTotal: int64(5 * time.Second),
FetchHandle: int64(2 * time.Second),
TaskWait: int64(2 * time.Second),
TableRowScan: int64(2 * time.Second),
TableTaskNum: 2,
Concurrency: 1,
}
require.Equal(t, "index_task: {total_time: 5s, fetch_handle: 2s, build: 1s, wait: 2s}, table_task: {total_time: 2s, num: 2, concurrency: 1}", stats.String())
require.Equal(t, stats.Clone().String(), stats.String())
stats.Merge(stats.Clone())
require.Equal(t, "index_task: {total_time: 10s, fetch_handle: 4s, build: 2s, wait: 4s}, table_task: {total_time: 4s, num: 4, concurrency: 1}", stats.String())
}
func TestIndexLookUpGetResultChunk(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists tbl")
tk.MustExec("create table tbl(a int, b int, c int, key idx_a(a))")
for i := 0; i < 101; i++ {
tk.MustExec(fmt.Sprintf("insert into tbl values(%d,%d,%d)", i, i, i))
}
tk.MustQuery("select * from tbl use index(idx_a) where a > 99 order by a asc limit 1").Check(testkit.Rows("100 100 100"))
tk.MustQuery("select * from tbl use index(idx_a) where a > 10 order by a asc limit 4,1").Check(testkit.Rows("15 15 15"))
}
func TestPartitionTableIndexJoinIndexLookUp(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("set @@tidb_partition_prune_mode='dynamic'")
tk.MustExec(`create table t (a int, b int, key(a)) partition by hash(a) partitions 4`)
tk.MustExec("create table tnormal (a int, b int, key(a), key(b))")
nRows := 512
values := make([]string, 0, nRows)
for i := 0; i < nRows; i++ {
values = append(values, fmt.Sprintf("(%v, %v)", rand.Intn(nRows), rand.Intn(nRows)))
}
tk.MustExec(fmt.Sprintf("insert into t values %v", strings.Join(values, ", ")))
tk.MustExec(fmt.Sprintf("insert into tnormal values %v", strings.Join(values, ", ")))
randRange := func() (int, int) {
a, b := rand.Intn(nRows), rand.Intn(nRows)
if a > b {
return b, a
}
return a, b
}
for i := 0; i < nRows; i++ {
lb, rb := randRange()
cond := fmt.Sprintf("(t2.b between %v and %v)", lb, rb)
result := tk.MustQuery("select t1.* from tnormal t1, tnormal t2 use index(a) where t1.a=t2.b and " + cond).Sort().Rows()
tk.MustQuery("select /*+ TIDB_INLJ(t1, t2) */ t1.* from t t1, t t2 use index(a) where t1.a=t2.b and " + cond).Sort().Check(result)
}
}
func TestCoprocessorPagingSize(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create table t_paging (a int, b int, key(a), key(b))")
nRows := 512
values := make([]string, 0, nRows)
for i := 0; i < nRows; i++ {
values = append(values, fmt.Sprintf("(%v, %v)", rand.Intn(nRows), rand.Intn(nRows)))
}
tk.MustExec(fmt.Sprintf("insert into t_paging values %v", strings.Join(values, ", ")))
tk.MustQuery("select @@tidb_min_paging_size").Check(testkit.Rows(strconv.FormatUint(paging.MinPagingSize, 10)))
// Enable the coprocessor paging protocol.
tk.MustExec("set @@tidb_enable_paging = on")
// When the min paging size is small, we need more RPC roundtrip!
// Check 'rpc_num' in the execution information
//
// mysql> explain analyze select * from t_paging;
// +--------------------+----------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
// | id |task | execution info |
// +--------------------+----------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
// | TableReader_5 |root | time:7.27ms, loops:2, cop_task: {num: 10, max: 1.57ms, min: 313.3µs, avg: 675.9µs, p95: 1.57ms, tot_proc: 2ms, rpc_num: 10, rpc_time: 6.69ms, copr_cache_hit_ratio: 0.00, distsql_concurrency: 15} |
// | └─TableFullScan_4 |cop[tikv] | tikv_task:{proc max:1.48ms, min:294µs, avg: 629µs, p80:1.21ms, p95:1.48ms, iters:0, tasks:10} |
// +--------------------+----------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
// 2 rows in set (0.01 sec)
getRPCNumFromExplain := func(rows [][]interface{}) (res uint64) {
re := regexp.MustCompile("rpc_num: ([0-9]+)")
for _, row := range rows {
buf := bytes.NewBufferString("")
_, _ = fmt.Fprintf(buf, "%s\n", row)
if matched := re.FindStringSubmatch(buf.String()); matched != nil {
require.Equal(t, len(matched), 2)
c, err := strconv.ParseUint(matched[1], 10, 64)
require.NoError(t, err)
return c
}
}
return res
}
// This is required here because only the chunk encoding collect the execution information and contains 'rpc_num'.
tk.MustExec("set @@tidb_enable_chunk_rpc = on")
tk.MustExec("set @@tidb_min_paging_size = 1")
rows := tk.MustQuery("explain analyze select * from t_paging").Rows()
rpcNum := getRPCNumFromExplain(rows)
require.Greater(t, rpcNum, uint64(2))
tk.MustExec("set @@tidb_min_paging_size = 1000")
rows = tk.MustQuery("explain analyze select * from t_paging").Rows()
rpcNum = getRPCNumFromExplain(rows)
require.Equal(t, rpcNum, uint64(1))
}
func TestAdaptiveClosestRead(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
// the avg row size is more accurate in check_rpc mode when unistre is used.
// See: https://github.com/pingcap/tidb/issues/31744#issuecomment-1016309883
tk.MustExec("set @@tidb_enable_chunk_rpc = '1'")
readCounter := func(counter prometheus.Counter) float64 {
var metric dto.Metric
require.Nil(t, counter.Write(&metric))
return metric.Counter.GetValue()
}
checkMetrics := func(q string, hit, miss int) {
beforeHit := readCounter(metrics.DistSQLCoprClosestReadCounter.WithLabelValues("hit"))
beforeMiss := readCounter(metrics.DistSQLCoprClosestReadCounter.WithLabelValues("miss"))
tk.MustQuery(q)
afterHit := readCounter(metrics.DistSQLCoprClosestReadCounter.WithLabelValues("hit"))
afterMiss := readCounter(metrics.DistSQLCoprClosestReadCounter.WithLabelValues("miss"))
require.Equal(t, hit, int(afterHit-beforeHit), "exec query '%s' check hit failed", q)
require.Equal(t, miss, int(afterMiss-beforeMiss), "exec query '%s' check miss failed", q)
}
tk.MustExec("create table t(id int primary key, s varchar(8), p varchar(16));")
tk.MustExec("insert into t values (1, '00000001', '0000000000000001'), (2, '00000003', '0000000000000002'), (3, '00000011', '0000000000000003');")
tk.MustExec("analyze table t;")
tk.MustExec("set @@tidb_partition_prune_mode ='static';")
tk.MustExec("set tidb_replica_read = 'closest-adaptive';")
tk.MustExec("set tidb_adaptive_closest_read_threshold = 25;")
// table reader
// estimate cost is 19
checkMetrics("select s from t where id >= 1 and id < 2;", 0, 1)
// estimate cost is 37
checkMetrics("select * from t where id >= 1 and id < 2;", 1, 0)
tk.MustExec("set tidb_adaptive_closest_read_threshold = 50;")
checkMetrics("select * from t where id >= 1 and id < 2;", 0, 1)
// estimate cost is 74
checkMetrics("select * from t where id >= 1 and id <= 2;", 1, 0)
partitionDef := "PARTITION BY RANGE (id) (PARTITION p0 VALUES LESS THAN (3), PARTITION p3 VALUES LESS THAN MAXVALUE);"
// test TableReader with partition
tk.MustExec("set tidb_adaptive_closest_read_threshold = 30;")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(id int primary key, s varchar(8), p varchar(16)) " + partitionDef)
tk.MustExec("insert into t values (1, '00000001', '0000000000000001'), (2, '00000003', '0000000000000002'), (3, '00000011', '0000000000000003'), (4, '00000044', '0000000000000004');")
tk.MustExec("analyze table t;")
// estimate cost is 38
checkMetrics("select s from t where id >= 1 and id < 3;", 1, 0)
// estimate cost is 39 with 2 cop request
checkMetrics("select s from t where id >= 2 and id < 4;", 0, 2)
// index reader
tk.MustExec("drop table if exists t;")
tk.MustExec("create table t (id int, s varchar(8), p varchar(8), key `idx_s_p`(`s`, `p`));")
tk.MustExec("insert into t values (1, 'test1000', '11111111'), (2, 'test2000', '11111111');")
tk.MustExec("analyze table t;")
// avg row size = 27.91
checkMetrics("select p from t where s >= 'test' and s < 'test11'", 0, 1)
checkMetrics("select p from t where s >= 'test' and s < 'test22'", 1, 0)
// index reader with partitions
tk.MustExec("set tidb_adaptive_closest_read_threshold = 30;")
tk.MustExec("drop table if exists t;")
tk.MustExec("create table t (v int, id int, p varchar(8), key `idx_id_p`(`id`, `p`)) " + partitionDef)
tk.MustExec("insert into t values (1, 1, '11111111'), (2, 2, '22222222'), (3, 3, '33333333'), (4, 4, '44444444');")
tk.MustExec("analyze table t;")
// avg row size = 19
checkMetrics("select p from t where id >= 1 and id < 3", 1, 0)
checkMetrics("select p from t where id >= 2 and id < 4", 0, 2)
checkMetrics("select p from t where id >= 1 and id < 4", 1, 1)
// index lookup reader
tk.MustExec("drop table if exists t;")
tk.MustExec("create table t (id int, s varchar(8), p varchar(50), key `idx_s`(`s`));")
str := "this_is_a_string_with_length_of_50________________"
tk.MustExec(fmt.Sprintf("insert into t values (1, 'test1000', '%s'), (2, 'test2000', '%s');", str, str))
tk.MustExec("analyze table t;")
tk.MustExec("set tidb_adaptive_closest_read_threshold = 80;")
// IndexReader cost is 22, TableReader cost (1 row) is 67
checkMetrics("select/*+ FORCE_INDEX(t, idx_s) */ p from t where s >= 'test' and s < 'test11'", 0, 2)
tk.MustExec("set tidb_adaptive_closest_read_threshold = 100;")
checkMetrics("select/*+ FORCE_INDEX(t, idx_s) */ p from t where s >= 'test' and s < 'test22'", 1, 1)
// index merge reader
tk.MustExec("drop table if exists t;")
// use int field to avoid the planer estimation with big random fluctuation.
tk.MustExec("create table t (id int, v bigint not null, s1 int not null, s2 int not null, key `idx_v_s1`(`s1`, `v`), key `idx_s2`(`s2`));")
tk.MustExec("insert into t values (1, 1, 1, 1), (2, 2, 2, 2), (3, 3, 3, 3);")
tk.MustExec("analyze table t;")
tk.MustExec("set tidb_adaptive_closest_read_threshold = 30;")
// 2 IndexScan with cost 19/56, 2 TableReader with cost 32.5/65.
checkMetrics("select/* +USE_INDEX_MERGE(t) */ id from t use index(`idx_v_s1`) use index(idx_s2) where (s1 < 3 and v > 0) or s2 = 3;", 3, 1)
}
func TestCoprocessorPagingReqKeyRangeSorted(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/store/copr/checkKeyRangeSortedForPaging", "return"))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/store/copr/checkKeyRangeSortedForPaging"))
}()
tk.MustExec("use test")
tk.MustExec("CREATE TABLE `UK_COLLATION19523` (" +
"`COL1` binary(1) DEFAULT NULL," +
"`COL2` varchar(20) COLLATE utf8_general_ci DEFAULT NULL," +
"`COL4` datetime DEFAULT NULL," +
"`COL3` bigint(20) DEFAULT NULL," +
"`COL5` float DEFAULT NULL," +
"UNIQUE KEY `U_COL1` (`COL1`)" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_general_ci")
tk.MustExec("prepare stmt from 'SELECT/*+ HASH_JOIN(t1, t2) */ * FROM UK_COLLATION19523 t1 JOIN UK_COLLATION19523 t2 ON t1.col1 > t2.col1 WHERE t1.col1 IN (?, ?, ?) AND t2.col1 < ?;';")
tk.MustExec("set @a=0x4F, @b=0xF8, @c=NULL, @d=0xBF;")
tk.MustExec("execute stmt using @a,@b,@c,@d;")
tk.MustExec("set @a=0x00, @b=0xD2, @c=9179987834981541375, @d=0xF8;")
tk.MustExec("execute stmt using @a,@b,@c,@d;")
tk.MustExec("CREATE TABLE `IDT_COLLATION26873` (" +
"`COL1` varbinary(20) DEFAULT NULL," +
"`COL2` varchar(20) COLLATE utf8_general_ci DEFAULT NULL," +
"`COL4` datetime DEFAULT NULL," +
"`COL3` bigint(20) DEFAULT NULL," +
"`COL5` float DEFAULT NULL," +
"KEY `U_COL1` (`COL1`))")
tk.MustExec("prepare stmt from 'SELECT/*+ INL_JOIN(t1, t2) */ t2.* FROM IDT_COLLATION26873 t1 LEFT JOIN IDT_COLLATION26873 t2 ON t1.col1 = t2.col1 WHERE t1.col1 < ? AND t1.col1 IN (?, ?, ?);';")
tk.MustExec("set @a=NULL, @b=NULL, @c=NULL, @d=NULL;")
tk.MustExec("execute stmt using @a,@b,@c,@d;")
tk.MustExec("set @a=0xE3253A6AC72A3A168EAF0E34A4779A947872CCCD, @b=0xD67BB26504EE152C2C356D7F6CAD897F03462963, @c=NULL, @d=0xDE735FEB375A4CF33479A39CA925470BFB229DB4;")
tk.MustExec("execute stmt using @a,@b,@c,@d;")
tk.MustExec("set @a=2606738829406840179, @b=1468233589368287363, @c=5174008984061521089, @d=7727946571160309462;")
tk.MustExec("execute stmt using @a,@b,@c,@d;")
tk.MustExec("set @a=0xFCABFE6198B6323EE8A46247EDD33830453B1BDE, @b=NULL, @c=6864108002939154648, @d=0xFCABFE6198B6323EE8A46247EDD33830453B1BDE;")
tk.MustExec("execute stmt using @a,@b,@c,@d;")
tk.MustExec("set @a=0xFCABFE6198B6323EE8A46247EDD33830453B1BDE, @b=0xFCABFE6198B6323EE8A46247EDD33830453B1BDE, @c=0xFCABFE6198B6323EE8A46247EDD33830453B1BDE, @d=0xFCABFE6198B6323EE8A46247EDD33830453B1BDE;")
tk.MustExec("execute stmt using @a,@b,@c,@d;")
tk.MustExec("CREATE TABLE `PK_SNPRE10114` (" +
"`COL1` varbinary(10) NOT NULL DEFAULT 'S'," +
"`COL2` varchar(20) DEFAULT NULL," +
"`COL4` datetime DEFAULT NULL," +
"`COL3` bigint(20) DEFAULT NULL," +
"`COL5` float DEFAULT NULL," +
"PRIMARY KEY (`COL1`) CLUSTERED)")
tk.MustExec(`prepare stmt from 'SELECT * FROM PK_SNPRE10114 WHERE col1 IN (?, ?, ?) AND (col2 IS NULL OR col2 IN (?, ?)) AND (col3 IS NULL OR col4 IS NULL);';`)
tk.MustExec(`set @a=0x0D5BDAEB79074756F203, @b=NULL, @c=0x6A911AAAC728F1ED3B4F, @d="鏖秿垙麜濇凗辯Ũ卮伄幖轒ƀ漭蝏雓轊恿磔徵", @e="訇廵纹髺釖寒近槩靏詗膦潳陒錃粓悧闒摔)乀";`)
tk.MustExec(`execute stmt using @a,@b,@c,@d,@e;`)
tk.MustExec(`set @a=7775448739068993371, @b=5641728652098016210, @c=6774432238941172824, @d="HqpP5rN", @e="8Fy";`)
tk.MustExec(`execute stmt using @a,@b,@c,@d,@e;`)
tk.MustExec(`set @a=0x61219F79C90D3541F70E, @b=5501707547099269248, @c=0xEC43EFD30131DEA2CB8B, @d="呣丼蒢咿卻鹻铴础湜僂頃dž縍套衞陀碵碼幓9", @e="鹹楞睕堚尛鉌翡佾搁紟精廬姆燵藝潐楻翇慸嵊";`)
tk.MustExec(`execute stmt using @a,@b,@c,@d,@e;`)
}