Skip to content

Commit

Permalink
systable: Adapt with TiFlash system table update (#38191)
Browse files Browse the repository at this point in the history
close #38192
  • Loading branch information
breezewish authored Sep 30, 2022
1 parent 1659c3e commit a4b8e9f
Show file tree
Hide file tree
Showing 10 changed files with 727 additions and 45 deletions.
15 changes: 15 additions & 0 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -1872,6 +1872,13 @@ def go_deps():
sum = "h1:0Vihzu20St42/UDsvZGdNE6jak7oi/UOeMzwMPHkgFY=",
version = "v3.2.0+incompatible",
)
go_repository(
name = "com_github_jarcoal_httpmock",
build_file_proto_mode = "disable",
importpath = "github.com/jarcoal/httpmock",
sum = "h1:gSvTxxFR/MEMfsGrvRbdfpRUMBStovlSRLw0Ep1bwwc=",
version = "v1.2.0",
)

go_repository(
name = "com_github_jcmturner_aescts_v2",
Expand Down Expand Up @@ -2326,6 +2333,14 @@ def go_deps():
sum = "h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=",
version = "v1.0.1",
)
go_repository(
name = "com_github_maxatome_go_testdeep",
build_file_proto_mode = "disable",
importpath = "github.com/maxatome/go-testdeep",
sum = "h1:Tgh5efyCYyJFGUYiT0qxBSIDeXw0F5zSoatlou685kk=",
version = "v1.11.0",
)

go_repository(
name = "com_github_mbilski_exhaustivestruct",
build_file_proto_mode = "disable",
Expand Down
1 change: 1 addition & 0 deletions executor/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,7 @@ go_test(
"//util/topsql/state",
"@com_github_golang_protobuf//proto",
"@com_github_gorilla_mux//:mux",
"@com_github_jarcoal_httpmock//:httpmock",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_fn//:fn",
Expand Down
89 changes: 53 additions & 36 deletions executor/infoschema_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -3030,14 +3030,17 @@ func (e *TiFlashSystemTableRetriever) initialize(sctx sessionctx.Context, tiflas
return nil
}

type tiFlashSQLExecuteResponseMetaColumn struct {
Name string `json:"name"`
Type string `json:"type"`
}

type tiFlashSQLExecuteResponse struct {
Meta []tiFlashSQLExecuteResponseMetaColumn `json:"meta"`
Data [][]interface{} `json:"data"`
}

func (e *TiFlashSystemTableRetriever) dataForTiFlashSystemTables(ctx sessionctx.Context, tidbDatabases string, tidbTables string) ([][]types.Datum, error) {
var columnNames []string //nolint: prealloc
for _, c := range e.outputCols {
if c.Name.O == "TIFLASH_INSTANCE" {
continue
}
columnNames = append(columnNames, c.Name.L)
}
maxCount := 1024
targetTable := strings.ToLower(strings.Replace(e.table.Name.O, "TIFLASH", "DT", 1))
var filters []string
Expand All @@ -3047,12 +3050,11 @@ func (e *TiFlashSystemTableRetriever) dataForTiFlashSystemTables(ctx sessionctx.
if len(tidbTables) > 0 {
filters = append(filters, fmt.Sprintf("tidb_table IN (%s)", strings.ReplaceAll(tidbTables, "\"", "'")))
}
sql := fmt.Sprintf("SELECT %s FROM system.%s", strings.Join(columnNames, ","), targetTable)
sql := fmt.Sprintf("SELECT * FROM system.%s", targetTable)
if len(filters) > 0 {
sql = fmt.Sprintf("%s WHERE %s", sql, strings.Join(filters, " AND "))
}
sql = fmt.Sprintf("%s LIMIT %d, %d", sql, e.rowIdx, maxCount)
notNumber := "nan"
instanceInfo := e.instanceInfos[e.instanceIdx]
url := instanceInfo.url
req, err := http.NewRequest(http.MethodGet, url, nil)
Expand All @@ -3061,6 +3063,7 @@ func (e *TiFlashSystemTableRetriever) dataForTiFlashSystemTables(ctx sessionctx.
}
q := req.URL.Query()
q.Add("query", sql)
q.Add("default_format", "JSONCompact")
req.URL.RawQuery = q.Encode()
resp, err := util.InternalHTTPClient().Do(req)
if err != nil {
Expand All @@ -3071,54 +3074,68 @@ func (e *TiFlashSystemTableRetriever) dataForTiFlashSystemTables(ctx sessionctx.
if err != nil {
return nil, errors.Trace(err)
}
records := strings.Split(string(body), "\n")
rows := make([][]types.Datum, 0, len(records))
for _, record := range records {
if len(record) == 0 {
continue
var result tiFlashSQLExecuteResponse
err = json.Unmarshal(body, &result)
if err != nil {
return nil, errors.Wrapf(err, "Failed to decode JSON from TiFlash")
}

// Map result columns back to our columns. It is possible that some columns cannot be
// recognized and some other columns are missing. This may happen during upgrading.
outputColIndexMap := map[string]int{} // Map from TiDB Column name to Output Column Index
for idx, c := range e.outputCols {
outputColIndexMap[c.Name.L] = idx
}
tiflashColIndexMap := map[int]int{} // Map from TiFlash Column index to Output Column Index
for tiFlashColIdx, col := range result.Meta {
if outputIdx, ok := outputColIndexMap[strings.ToLower(col.Name)]; ok {
tiflashColIndexMap[tiFlashColIdx] = outputIdx
}
fields := strings.Split(record, "\t")
if len(fields) < len(e.outputCols)-1 {
return nil, errors.Errorf("Record from tiflash doesn't match schema %v", fields)
}
outputRows := make([][]types.Datum, 0, len(result.Data))
for _, rowFields := range result.Data {
if len(rowFields) == 0 {
continue
}
row := make([]types.Datum, len(e.outputCols))
for index, column := range e.outputCols {
if column.Name.O == "TIFLASH_INSTANCE" {
outputRow := make([]types.Datum, len(e.outputCols))
for tiFlashColIdx, fieldValue := range rowFields {
outputIdx, ok := tiflashColIndexMap[tiFlashColIdx]
if !ok {
// Discard this field, we don't know which output column is the destination
continue
}
if fieldValue == nil {
continue
}
valStr := fmt.Sprint(fieldValue)
column := e.outputCols[outputIdx]
if column.GetType() == mysql.TypeVarchar {
row[index].SetString(fields[index], mysql.DefaultCollationName)
outputRow[outputIdx].SetString(valStr, mysql.DefaultCollationName)
} else if column.GetType() == mysql.TypeLonglong {
if fields[index] == notNumber {
continue
}
value, err := strconv.ParseInt(fields[index], 10, 64)
value, err := strconv.ParseInt(valStr, 10, 64)
if err != nil {
return nil, errors.Trace(err)
}
row[index].SetInt64(value)
outputRow[outputIdx].SetInt64(value)
} else if column.GetType() == mysql.TypeDouble {
if fields[index] == notNumber {
continue
}
value, err := strconv.ParseFloat(fields[index], 64)
value, err := strconv.ParseFloat(valStr, 64)
if err != nil {
return nil, errors.Trace(err)
}
row[index].SetFloat64(value)
outputRow[outputIdx].SetFloat64(value)
} else {
return nil, errors.Errorf("Meet column of unknown type %v", column)
}
}
row[len(e.outputCols)-1].SetString(instanceInfo.id, mysql.DefaultCollationName)
rows = append(rows, row)
outputRow[len(e.outputCols)-1].SetString(instanceInfo.id, mysql.DefaultCollationName)
outputRows = append(outputRows, outputRow)
}
e.rowIdx += len(rows)
if len(rows) < maxCount {
e.rowIdx += len(outputRows)
if len(outputRows) < maxCount {
e.instanceIdx += 1
e.rowIdx = 0
}
return rows, nil
return outputRows, nil
}

func (e *memtableRetriever) setDataForAttributes(ctx sessionctx.Context, is infoschema.InfoSchema) error {
Expand Down
94 changes: 91 additions & 3 deletions executor/infoschema_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@ package executor_test

import (
"fmt"
"os"
"strconv"
"strings"
"testing"
"time"

"github.com/jarcoal/httpmock"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/domain/infosync"
Expand Down Expand Up @@ -649,11 +651,97 @@ func TestSequences(t *testing.T) {
tk.MustQuery("SELECT TABLE_CATALOG, TABLE_SCHEMA, TABLE_NAME , TABLE_TYPE, ENGINE, TABLE_ROWS FROM information_schema.tables WHERE TABLE_TYPE='SEQUENCE' AND TABLE_NAME='seq2'").Check(testkit.Rows("def test seq2 SEQUENCE InnoDB 1"))
}

func TestTiFlashSystemTables(t *testing.T) {
func TestTiFlashSystemTableWithTiFlashV620(t *testing.T) {
httpmock.Activate()
defer httpmock.DeactivateAndReset()

instances := []string{
"tiflash,127.0.0.1:3933,127.0.0.1:7777,,",
"tikv,127.0.0.1:11080,127.0.0.1:10080,,",
}
fpName := "github.com/pingcap/tidb/infoschema/mockStoreServerInfo"
fpExpr := `return("` + strings.Join(instances, ";") + `")`
require.NoError(t, failpoint.Enable(fpName, fpExpr))
defer func() { require.NoError(t, failpoint.Disable(fpName)) }()

httpmock.RegisterResponder("GET", "http://127.0.0.1:7777/config",
httpmock.NewStringResponder(200, `
{
"raftstore-proxy": {},
"engine-store":{
"http_port":8123,
"tcp_port":9000
}
}
`))

data, err := os.ReadFile("testdata/tiflash_v620_dt_segments.json")
require.NoError(t, err)
httpmock.RegisterResponder("GET", "http://127.0.0.1:8123?default_format=JSONCompact&query=SELECT+%2A+FROM+system.dt_segments+LIMIT+0%2C+1024", httpmock.NewBytesResponder(200, data))

data, err = os.ReadFile("testdata/tiflash_v620_dt_tables.json")
require.NoError(t, err)
httpmock.RegisterResponder("GET", "http://127.0.0.1:8123?default_format=JSONCompact&query=SELECT+%2A+FROM+system.dt_tables+LIMIT+0%2C+1024", httpmock.NewBytesResponder(200, data))

store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustQuery("select * from information_schema.TIFLASH_SEGMENTS;").Check(testkit.Rows(
"db_1 t_10 mysql tables_priv 10 0 1 [-9223372036854775808,9223372036854775807) <nil> 0 0 <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> 0 2032 <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> 127.0.0.1:3933",
"db_1 t_8 mysql db 8 0 1 [-9223372036854775808,9223372036854775807) <nil> 0 0 <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> 0 2032 <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> 127.0.0.1:3933",
"db_2 t_70 test segment 70 0 1 [01,FA) <nil> 30511 50813627 0.6730359542460096 <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> 3578860 409336 <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> 127.0.0.1:3933",
))
tk.MustQuery("show warnings").Check(testkit.Rows())

tk.MustQuery("select * from information_schema.TIFLASH_TABLES;").Check(testkit.Rows(
"db_1 t_10 mysql tables_priv 10 0 1 0 0 0 <nil> 0 <nil> 0 <nil> <nil> 0 0 0 0 0 0 <nil> <nil> <nil> 0 0 0 0 <nil> <nil> 0 <nil> <nil> <nil> 0 <nil> <nil> <nil> 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 127.0.0.1:3933",
"db_1 t_8 mysql db 8 0 1 0 0 0 <nil> 0 <nil> 0 <nil> <nil> 0 0 0 0 0 0 <nil> <nil> <nil> 0 0 0 0 <nil> <nil> 0 <nil> <nil> <nil> 0 <nil> <nil> <nil> 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 127.0.0.1:3933",
"db_2 t_70 test segment 70 0 1 102000 169873868 0 0 0 <nil> 0 <nil> <nil> 0 102000 169873868 0 0 0 <nil> <nil> <nil> 1 102000 169873868 43867622 102000 169873868 0 <nil> <nil> <nil> 13 13 7846.153846153846 13067220.615384616 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 127.0.0.1:3933",
))
tk.MustQuery("show warnings").Check(testkit.Rows())
}

func TestTiFlashSystemTableWithTiFlashV630(t *testing.T) {
httpmock.Activate()
defer httpmock.DeactivateAndReset()

instances := []string{
"tiflash,127.0.0.1:3933,127.0.0.1:7777,,",
"tikv,127.0.0.1:11080,127.0.0.1:10080,,",
}
fpName := "github.com/pingcap/tidb/infoschema/mockStoreServerInfo"
fpExpr := `return("` + strings.Join(instances, ";") + `")`
require.NoError(t, failpoint.Enable(fpName, fpExpr))
defer func() { require.NoError(t, failpoint.Disable(fpName)) }()

httpmock.RegisterResponder("GET", "http://127.0.0.1:7777/config",
httpmock.NewStringResponder(200, `
{
"raftstore-proxy": {},
"engine-store":{
"http_port":8123,
"tcp_port":9000
}
}
`))

data, err := os.ReadFile("testdata/tiflash_v630_dt_segments.json")
require.NoError(t, err)
httpmock.RegisterResponder("GET", "http://127.0.0.1:8123?default_format=JSONCompact&query=SELECT+%2A+FROM+system.dt_segments+LIMIT+0%2C+1024", httpmock.NewBytesResponder(200, data))

store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("select * from information_schema.TIFLASH_TABLES;")
tk.MustExec("select * from information_schema.TIFLASH_SEGMENTS;")
tk.MustQuery("select * from information_schema.TIFLASH_SEGMENTS;").Check(testkit.Rows(
"db_1 t_10 mysql tables_priv 10 0 1 [-9223372036854775808,9223372036854775807) 0 0 0 <nil> 0 0 0 0 2 0 0 0 0 0 2032 3 0 0 1 1 0 0 0 0 127.0.0.1:3933",
"db_2 t_70 test segment 70 436272981189328904 1 [01,FA) 5 102000 169874232 0 0 0 0 0 2 0 0 0 0 0 2032 3 102000 169874232 1 68 102000 169874232 43951837 20 127.0.0.1:3933",
"db_2 t_75 test segment 75 0 1 [01,013130303030393535FF61653666642D6136FF61382D343032382DFF616436312D663736FF3062323736643461FF3600000000000000F8) 2 0 0 <nil> 0 0 1 1 110 0 0 4 4 0 2032 111 0 0 1 70 0 0 0 0 127.0.0.1:3933",
"db_2 t_75 test segment 75 0 113 [013130303030393535FF61653666642D6136FF61382D343032382DFF616436312D663736FF3062323736643461FF3600000000000000F8,013139393938363264FF33346535382D3735FF31382D343661612DFF626235392D636264FF3139333434623736FF3100000000000000F9) 2 10167 16932617 0.4887380741615029 0 0 0 0 114 4969 8275782 2 0 0 63992 112 5198 8656835 1 71 5198 8656835 2254100 1 127.0.0.1:3933",
"db_2 t_75 test segment 75 0 116 [013139393938363264FF33346535382D3735FF31382D343661612DFF626235392D636264FF3139333434623736FF3100000000000000F9,013330303131383034FF61323537662D6638FF63302D346466622DFF383235632D353361FF3236306338616662FF3400000000000000F8) 3 8 13322 0.5 3 4986 1 0 117 1 1668 4 3 4986 2032 115 4 6668 1 78 4 6668 6799 1 127.0.0.1:3933",
"db_2 t_75 test segment 75 0 125 [013330303131383034FF61323537662D6638FF63302D346466622DFF383235632D353361FF3236306338616662FF3400000000000000F8,013339393939613861FF30663062332D6537FF32372D346234642DFF396535632D363865FF3336323066383431FF6300000000000000F9) 2 8677 14451079 0.4024432407514118 3492 5816059 3 0 126 0 0 0 0 5816059 2032 124 5185 8635020 1 79 5185 8635020 2247938 1 127.0.0.1:3933",
"db_2 t_75 test segment 75 0 128 [013339393939613861FF30663062332D6537FF32372D346234642DFF396535632D363865FF3336323066383431FF6300000000000000F9,013730303031636230FF32663330652D3539FF62352D346134302DFF613539312D383930FF6132316364633466FF3200000000000000F8) 0 1 1668 1 0 0 0 0 129 1 1668 5 4 0 2032 127 0 0 1 78 4 6668 6799 1 127.0.0.1:3933",
"db_2 t_75 test segment 75 0 119 [013730303031636230FF32663330652D3539FF62352D346134302DFF613539312D383930FF6132316364633466FF3200000000000000F8,013739393939386561FF36393566612D3534FF64302D346437642DFF383136612D646335FF6432613130353533FF3200000000000000F9) 2 10303 17158730 0.489372027564787 0 0 0 0 120 5042 8397126 2 0 0 63992 118 5261 8761604 1 77 5261 8761604 2280506 1 127.0.0.1:3933",
"db_2 t_75 test segment 75 0 122 [013739393939386561FF36393566612D3534FF64302D346437642DFF383136612D646335FF6432613130353533FF3200000000000000F9,FA) 0 1 1663 1 0 0 0 0 123 1 1663 4 3 0 2032 121 0 0 1 78 4 6668 6799 1 127.0.0.1:3933",
))
tk.MustQuery("show warnings").Check(testkit.Rows())
}

func TestTablesPKType(t *testing.T) {
Expand Down
99 changes: 99 additions & 0 deletions executor/testdata/tiflash_v620_dt_segments.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
{
"meta":
[
{
"name": "database",
"type": "String"
},
{
"name": "table",
"type": "String"
},
{
"name": "tidb_database",
"type": "String"
},
{
"name": "tidb_table",
"type": "String"
},
{
"name": "table_id",
"type": "Int64"
},
{
"name": "is_tombstone",
"type": "UInt64"
},
{
"name": "segment_id",
"type": "UInt64"
},
{
"name": "range",
"type": "String"
},
{
"name": "rows",
"type": "UInt64"
},
{
"name": "size",
"type": "UInt64"
},
{
"name": "delete_ranges",
"type": "UInt64"
},
{
"name": "stable_size_on_disk",
"type": "UInt64"
},
{
"name": "delta_pack_count",
"type": "UInt64"
},
{
"name": "stable_pack_count",
"type": "UInt64"
},
{
"name": "avg_delta_pack_rows",
"type": "Float64"
},
{
"name": "avg_stable_pack_rows",
"type": "Float64"
},
{
"name": "delta_rate",
"type": "Float64"
},
{
"name": "delta_cache_size",
"type": "UInt64"
},
{
"name": "delta_index_size",
"type": "UInt64"
}
],

"data":
[
["db_1", "t_10", "mysql", "tables_priv", "10", "0", "1", "[-9223372036854775808,9223372036854775807)", "0", "0", "0", "0", "0", "0", null, null, null, "0", "2032"],
["db_1", "t_8", "mysql", "db", "8", "0", "1", "[-9223372036854775808,9223372036854775807)", "0", "0", "0", "0", "0", "0", null, null, null, "0", "2032"],
["db_2", "t_70", "test", "segment", "70", "0", "1", "[01,FA)", "30511", "50813627", "12", "4296273", "18", "2", 1140.8333333333333, 4988, 0.6730359542460096, "3578860", "409336"]
],

"rows": 3,

"rows_before_limit_at_least": 3,

"statistics":
{
"elapsed": 0.000075,
"rows_read": 3,
"bytes_read": 8324
}
}
Loading

0 comments on commit a4b8e9f

Please sign in to comment.