Skip to content

Commit

Permalink
Merge branch 'master' into PCTest#0
Browse files Browse the repository at this point in the history
  • Loading branch information
Reminiscent authored Oct 28, 2021
2 parents 490b7d3 + b562dc9 commit 7892c91
Show file tree
Hide file tree
Showing 13 changed files with 604 additions and 174 deletions.
105 changes: 61 additions & 44 deletions br/pkg/backup/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/statistics/handle"
"github.com/pingcap/tipb/go-tipb"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)
Expand All @@ -32,7 +31,7 @@ const (
DefaultSchemaConcurrency = 64
)

type scheamInfo struct {
type schemaInfo struct {
tableInfo *model.TableInfo
dbInfo *model.DBInfo
crc64xor uint64
Expand All @@ -44,12 +43,12 @@ type scheamInfo struct {
// Schemas is task for backuping schemas.
type Schemas struct {
// name -> schema
schemas map[string]*scheamInfo
schemas map[string]*schemaInfo
}

func newBackupSchemas() *Schemas {
return &Schemas{
schemas: make(map[string]*scheamInfo),
schemas: make(map[string]*schemaInfo),
}
}

Expand All @@ -58,7 +57,7 @@ func (ss *Schemas) addSchema(
) {
name := fmt.Sprintf("%s.%s",
utils.EncloseName(dbInfo.Name.L), utils.EncloseName(tableInfo.Name.L))
ss.schemas[name] = &scheamInfo{
ss.schemas[name] = &schemaInfo{
tableInfo: tableInfo,
dbInfo: dbInfo,
}
Expand Down Expand Up @@ -101,53 +100,27 @@ func (ss *Schemas) BackupSchemas(
if !skipChecksum {
logger.Info("table checksum start")
start := time.Now()
checksumResp, err := calculateChecksum(
ectx, schema.tableInfo, store.GetClient(), backupTS, copConcurrency)
err := schema.calculateChecksum(ectx, store.GetClient(), backupTS, copConcurrency)
if err != nil {
return errors.Trace(err)
}
schema.crc64xor = checksumResp.Checksum
schema.totalKvs = checksumResp.TotalKvs
schema.totalBytes = checksumResp.TotalBytes
logger.Info("table checksum finished",
zap.Uint64("Crc64Xor", checksumResp.Checksum),
zap.Uint64("TotalKvs", checksumResp.TotalKvs),
zap.Uint64("TotalBytes", checksumResp.TotalBytes),
zap.Uint64("Crc64Xor", schema.crc64xor),
zap.Uint64("TotalKvs", schema.totalKvs),
zap.Uint64("TotalBytes", schema.totalBytes),
zap.Duration("take", time.Since(start)))
}
if statsHandle != nil {
jsonTable, err := statsHandle.DumpStatsToJSON(
schema.dbInfo.Name.String(), schema.tableInfo, nil)
if err != nil {
if err := schema.dumpStatsToJSON(statsHandle); err != nil {
logger.Error("dump table stats failed", logutil.ShortError(err))
}
schema.stats = jsonTable
}

// Send schema to metawriter
dbBytes, err := json.Marshal(schema.dbInfo)
if err != nil {
return errors.Trace(err)
}
tableBytes, err := json.Marshal(schema.tableInfo)
s, err := schema.encodeToSchema()
if err != nil {
return errors.Trace(err)
}
var statsBytes []byte
if schema.stats != nil {
statsBytes, err = json.Marshal(schema.stats)
if err != nil {
return errors.Trace(err)
}
}
s := &backuppb.Schema{
Db: dbBytes,
Table: tableBytes,
Crc64Xor: schema.crc64xor,
TotalKvs: schema.totalKvs,
TotalBytes: schema.totalBytes,
Stats: statsBytes,
}

if err := metaWriter.Send(s, op); err != nil {
return errors.Trace(err)
}
Expand All @@ -168,24 +141,68 @@ func (ss *Schemas) Len() int {
return len(ss.schemas)
}

func calculateChecksum(
func (s *schemaInfo) calculateChecksum(
ctx context.Context,
table *model.TableInfo,
client kv.Client,
backupTS uint64,
concurrency uint,
) (*tipb.ChecksumResponse, error) {
exe, err := checksum.NewExecutorBuilder(table, backupTS).
) error {
exe, err := checksum.NewExecutorBuilder(s.tableInfo, backupTS).
SetConcurrency(concurrency).
Build()
if err != nil {
return nil, errors.Trace(err)
return errors.Trace(err)
}

checksumResp, err := exe.Execute(ctx, client, func() {
// TODO: update progress here.
})
if err != nil {
return errors.Trace(err)
}

s.crc64xor = checksumResp.Checksum
s.totalKvs = checksumResp.TotalKvs
s.totalBytes = checksumResp.TotalBytes
return nil
}

func (s *schemaInfo) dumpStatsToJSON(statsHandle *handle.Handle) error {
jsonTable, err := statsHandle.DumpStatsToJSON(
s.dbInfo.Name.String(), s.tableInfo, nil)
if err != nil {
return errors.Trace(err)
}

s.stats = jsonTable
return nil
}

func (s *schemaInfo) encodeToSchema() (*backuppb.Schema, error) {
dbBytes, err := json.Marshal(s.dbInfo)
if err != nil {
return nil, errors.Trace(err)
}

tableBytes, err := json.Marshal(s.tableInfo)
if err != nil {
return nil, errors.Trace(err)
}
return checksumResp, nil

var statsBytes []byte
if s.stats != nil {
statsBytes, err = json.Marshal(s.stats)
if err != nil {
return nil, errors.Trace(err)
}
}

return &backuppb.Schema{
Db: dbBytes,
Table: tableBytes,
Crc64Xor: s.crc64xor,
TotalKvs: s.totalKvs,
TotalBytes: s.totalBytes,
Stats: statsBytes,
}, nil
}
2 changes: 1 addition & 1 deletion cmd/explaintest/r/explain_easy.result
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,7 @@ StreamAgg 1.00 root funcs:count(1)->Column#22
├─TableDual 8000.00 root rows:0
└─Projection 0.01 root test.test01.stat_date, test.test01.show_date, test.test01.region_id
└─TableReader 0.01 root data:Selection
└─Selection 0.01 cop[tikv] eq(test.test01.period, 1), ge(test.test01.stat_date, 20191202), ge(test.test01.stat_date, 20191202), gt(cast(test.test01.registration_num, bigint(20) BINARY), 0), le(test.test01.stat_date, 20191202), le(test.test01.stat_date, 20191202)
└─Selection 0.01 cop[tikv] eq(test.test01.period, 1), ge(test.test01.stat_date, 20191202), gt(cast(test.test01.registration_num, bigint(20) BINARY), 0), le(test.test01.stat_date, 20191202)
└─TableFullScan 10000.00 cop[tikv] table:test01 keep order:false, stats:pseudo
drop table if exists t;
create table t(a int, nb int not null, nc int not null);
Expand Down
7 changes: 5 additions & 2 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -908,8 +908,11 @@ func (b *executorBuilder) buildIndexAdvise(v *plannercore.IndexAdvise) Executor

func (b *executorBuilder) buildPlanReplayerSingle(v *plannercore.PlanReplayerSingle) Executor {
e := &PlanReplayerSingleExec{
baseExecutor: newBaseExecutor(b.ctx, nil, v.ID()),
info: &PlanReplayerSingleInfo{v.ExecStmt, v.Analyze, v.Load, v.File, b.ctx},
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID()),
ExecStmt: v.ExecStmt,
Analyze: v.Analyze,
Load: v.Load,
File: v.File,
}
return e
}
Expand Down
37 changes: 37 additions & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package executor_test

import (
"archive/zip"
"context"
"flag"
"fmt"
Expand Down Expand Up @@ -9156,6 +9157,42 @@ func (s *testSuite) TestGetResultRowsCount(c *C) {
}
}

func checkFileName(s string) bool {
files := []string{
"config.toml",
"meta.txt",
"stats/test.t_dump_single.json",
"schema/test.t_dump_single.schema.txt",
"variables.toml",
"sqls.sql",
"session_bindings.sql",
"global_bindings.sql",
"explain.txt",
}
for _, f := range files {
if strings.Compare(f, s) == 0 {
return true
}
}
return false
}

func (s *testSuiteWithData) TestPlanReplayerDumpSingle(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t_dump_single")
tk.MustExec("create table t_dump_single(a int)")
res := tk.MustQuery("plan replayer dump explain select * from t_dump_single")
path := s.testData.ConvertRowsToStrings(res.Rows())

reader, err := zip.OpenReader(path[0])
c.Assert(err, IsNil)
defer reader.Close()
for _, file := range reader.File {
c.Assert(checkFileName(file.Name), IsTrue)
}
}

func (s *testSuiteP1) TestIssue28935(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("set @@tidb_enable_vectorized_expression=true")
Expand Down
Loading

0 comments on commit 7892c91

Please sign in to comment.