Skip to content

Commit

Permalink
stats: support 'load stats' command (#5724)
Browse files Browse the repository at this point in the history
  • Loading branch information
fipped authored and alivxxx committed Jan 31, 2018
1 parent d8fb5dd commit 775dcf4
Show file tree
Hide file tree
Showing 10 changed files with 299 additions and 9 deletions.
18 changes: 18 additions & 0 deletions ast/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import "github.com/pingcap/tidb/model"
var (
_ StmtNode = &AnalyzeTableStmt{}
_ StmtNode = &DropStatsStmt{}
_ StmtNode = &LoadStatsStmt{}
)

// AnalyzeTableStmt is used to create table statistics.
Expand Down Expand Up @@ -69,3 +70,20 @@ func (n *DropStatsStmt) Accept(v Visitor) (Node, bool) {
n.Table = node.(*TableName)
return v.Leave(n)
}

// LoadStatsStmt is the statement node for loading statistic.
type LoadStatsStmt struct {
stmtNode

Path string
}

// Accept implements Node Accept interface.
func (n *LoadStatsStmt) Accept(v Visitor) (Node, bool) {
newNode, skipChildren := v.Enter(n)
if skipChildren {
return v.Leave(newNode)
}
n = newNode.(*LoadStatsStmt)
return v.Leave(n)
}
11 changes: 11 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ func (b *executorBuilder) build(p plan.Plan) Executor {
return b.buildInsert(v)
case *plan.LoadData:
return b.buildLoadData(v)
case *plan.LoadStats:
return b.buildLoadStats(v)
case *plan.PhysicalLimit:
return b.buildLimit(v)
case *plan.Prepare:
Expand Down Expand Up @@ -408,6 +410,15 @@ func (b *executorBuilder) buildLoadData(v *plan.LoadData) Executor {
return loadDataExec
}

func (b *executorBuilder) buildLoadStats(v *plan.LoadStats) Executor {
e := &LoadStatsExec{
baseExecutor: newBaseExecutor(nil, b.ctx),
info: &LoadStatsInfo{v.Path, b.ctx},
}
e.supportChk = true
return e
}

func (b *executorBuilder) buildReplace(vals *InsertValues) Executor {
replaceExec := &ReplaceExec{
InsertValues: vals,
Expand Down
158 changes: 158 additions & 0 deletions executor/load_stats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
// Copyright 2018 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package executor

import (
"encoding/json"

"github.com/juju/errors"
"github.com/pingcap/tidb/context"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/model"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/util/chunk"
goctx "golang.org/x/net/context"
)

var _ Executor = &LoadStatsExec{}

// LoadStatsExec represents a load statistic executor.
type LoadStatsExec struct {
baseExecutor
info *LoadStatsInfo
}

// LoadStatsInfo saves the information of loading statistic operation.
type LoadStatsInfo struct {
Path string
Ctx context.Context
}

// loadStatsVarKeyType is a dummy type to avoid naming collision in context.
type loadStatsVarKeyType int

// String defines a Stringer function for debugging and pretty printing.
func (k loadStatsVarKeyType) String() string {
return "load_stats_var"
}

// LoadStatsVarKey is a variable key for load statistic.
const LoadStatsVarKey loadStatsVarKeyType = 0

func (e *LoadStatsExec) exec(goCtx goctx.Context) error {
if len(e.info.Path) == 0 {
return errors.New("Load Stats: file path is empty")
}
val := e.ctx.Value(LoadStatsVarKey)
if val != nil {
e.ctx.SetValue(LoadStatsVarKey, nil)
return errors.New("Load Stats: previous load stats option isn't closed normally")
}
e.ctx.SetValue(LoadStatsVarKey, e.info)

return nil
}

// Next implements the Executor Next interface.
func (e *LoadStatsExec) Next(goCtx goctx.Context) (Row, error) {
return nil, e.exec(goCtx)
}

// NextChunk implements the Executor NextChunk interface.
func (e *LoadStatsExec) NextChunk(goCtx goctx.Context, chk *chunk.Chunk) error {
chk.Reset()
return errors.Trace(e.exec(goCtx))
}

// Close implements the Executor Close interface.
func (e *LoadStatsExec) Close() error {
return nil
}

// Open implements the Executor Open interface.
func (e *LoadStatsExec) Open(goCtx goctx.Context) error {
return nil
}

// Update updates the stats of the corresponding table according to the data.
func (e *LoadStatsInfo) Update(data []byte) error {
jsonTbl := &statistics.JSONTable{}
if err := json.Unmarshal(data, jsonTbl); err != nil {
return errors.Trace(err)
}

do := domain.GetDomain(e.Ctx)
is := do.InfoSchema()

tableInfo, err := is.TableByName(model.NewCIStr(jsonTbl.DatabaseName), model.NewCIStr(jsonTbl.TableName))
if err != nil {
return errors.Trace(err)
}

h := do.StatsHandle()
if h == nil {
return errors.New("Load Stats: handle is nil")
}

tbl, err := h.LoadStatsFromJSON(tableInfo.Meta(), jsonTbl)
if err != nil {
return errors.Trace(err)
}

if h.Lease > 0 {
hists := make([]*statistics.Histogram, 0, len(tbl.Columns))
cms := make([]*statistics.CMSketch, 0, len(tbl.Columns))
for _, col := range tbl.Columns {
hists = append(hists, &col.Histogram)
cms = append(cms, col.CMSketch)
}
h.AnalyzeResultCh() <- &statistics.AnalyzeResult{
TableID: tbl.TableID,
Hist: hists,
Cms: cms,
Count: tbl.Count,
IsIndex: 0,
Err: nil}

hists = make([]*statistics.Histogram, 0, len(tbl.Indices))
cms = make([]*statistics.CMSketch, 0, len(tbl.Indices))
for _, idx := range tbl.Indices {
hists = append(hists, &idx.Histogram)
cms = append(cms, idx.CMSketch)
}
h.AnalyzeResultCh() <- &statistics.AnalyzeResult{
TableID: tbl.TableID,
Hist: hists,
Cms: cms,
Count: tbl.Count,
IsIndex: 1,
Err: nil}

return nil
}
for _, col := range tbl.Columns {
err = statistics.SaveStatsToStorage(e.Ctx, tbl.TableID, tbl.Count, 0, &col.Histogram, col.CMSketch)
if err != nil {
return errors.Trace(err)
}
}
for _, idx := range tbl.Indices {
err = statistics.SaveStatsToStorage(e.Ctx, tbl.TableID, tbl.Count, 1, &idx.Histogram, idx.CMSketch)
if err != nil {
return errors.Trace(err)
}
}
err = h.Update(GetInfoSchema(e.Ctx))
return errors.Trace(err)
}
12 changes: 12 additions & 0 deletions parser/parser.y
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,7 @@ import (
InsertIntoStmt "INSERT INTO statement"
KillStmt "Kill statement"
LoadDataStmt "Load data statement"
LoadStatsStmt "Load statistic statement"
LockTablesStmt "Lock tables statement"
PreparedStmt "PreparedStmt"
SelectStmt "SELECT statement"
Expand Down Expand Up @@ -5125,6 +5126,7 @@ Statement:
| InsertIntoStmt
| KillStmt
| LoadDataStmt
| LoadStatsStmt
| PreparedStmt
| RollbackStmt
| RenameTableStmt
Expand Down Expand Up @@ -6435,4 +6437,14 @@ KillOrKillTiDB:
$$ = true
}

/*******************************************************************************************/

LoadStatsStmt:
"LOAD" "STATS" stringLit
{
$$ = &ast.LoadStatsStmt{
Path: $3,
}
}

%%
2 changes: 2 additions & 0 deletions parser/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,8 @@ func (s *testParserSuite) TestDBAStmt(c *C) {
{"show stats_buckets", true},
{"show stats_buckets where col_name = 'a'", true},

// for load stats
{"load stats '/tmp/stats.json'", true},
// set
// user defined
{"SET @a = 1", true},
Expand Down
7 changes: 7 additions & 0 deletions plan/common_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,13 @@ type LoadData struct {
GenCols InsertGeneratedColumns
}

// LoadStats represents a load stats plan.
type LoadStats struct {
baseSchemaProducer

Path string
}

// DDL represents a DDL statement plan.
type DDL struct {
baseSchemaProducer
Expand Down
7 changes: 7 additions & 0 deletions plan/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,8 @@ func (b *planBuilder) build(node ast.Node) Plan {
return b.buildInsert(x)
case *ast.LoadDataStmt:
return b.buildLoadData(x)
case *ast.LoadStatsStmt:
return b.buildLoadStats(x)
case *ast.PrepareStmt:
return b.buildPrepare(x)
case *ast.SelectStmt:
Expand Down Expand Up @@ -973,6 +975,11 @@ func (b *planBuilder) buildLoadData(ld *ast.LoadDataStmt) Plan {
return p
}

func (b *planBuilder) buildLoadStats(ld *ast.LoadStatsStmt) Plan {
p := &LoadStats{Path: ld.Path}
return p
}

func (b *planBuilder) buildDDL(node ast.DDLNode) Plan {
switch v := node.(type) {
case *ast.AlterTableStmt:
Expand Down
41 changes: 41 additions & 0 deletions server/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -799,9 +799,41 @@ func (cc *clientConn) handleLoadData(goCtx goctx.Context, loadDataInfo *executor
return errors.Trace(txn.Commit(goCtx))
}

// handleLoadStats does the additional work after processing the 'load stats' query.
// It sends client a file path, then reads the file content from client, loads it into the storage.
func (cc *clientConn) handleLoadStats(goCtx goctx.Context, loadStatsInfo *executor.LoadStatsInfo) error {
// If the server handles the load data request, the client has to set the ClientLocalFiles capability.
if cc.capability&mysql.ClientLocalFiles == 0 {
return errNotAllowedCommand
}
if loadStatsInfo == nil {
return errors.New("Load stats: info is empty")
}
err := cc.writeReq(loadStatsInfo.Path)
if err != nil {
return errors.Trace(err)
}
var prevData, curData []byte
for {
curData, err = cc.readPacket()
if err != nil && terror.ErrorNotEqual(err, io.EOF) {
return errors.Trace(err)
}
if len(curData) == 0 {
break
}
prevData = append(prevData, curData...)
}
if len(prevData) == 0 {
return nil
}
return errors.Trace(loadStatsInfo.Update(prevData))
}

// handleQuery executes the sql query string and writes result set or result ok to the client.
// As the execution time of this function represents the performance of TiDB, we do time log and metrics here.
// There is a special query `load data` that does not return result, which is handled differently.
// Query `load stats` does not return result either.
func (cc *clientConn) handleQuery(goCtx goctx.Context, sql string) (err error) {
rs, err := cc.ctx.Execute(goCtx, sql)
if err != nil {
Expand All @@ -822,6 +854,15 @@ func (cc *clientConn) handleQuery(goCtx goctx.Context, sql string) (err error) {
return errors.Trace(err)
}
}

loadStats := cc.ctx.Value(executor.LoadStatsVarKey)
if loadStats != nil {
defer cc.ctx.SetValue(executor.LoadStatsVarKey, nil)
if err = cc.handleLoadStats(goCtx, loadStats.(*executor.LoadStatsInfo)); err != nil {
return errors.Trace(err)
}
}

err = cc.writeOK()
}
return errors.Trace(err)
Expand Down
3 changes: 1 addition & 2 deletions server/statistics_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"github.com/pingcap/tidb/model"
)

// StatsHandler is the handler for dump statistics.
// StatsHandler is the handler for dumping statistics.
type StatsHandler struct {
do *domain.Domain
}
Expand All @@ -47,7 +47,6 @@ func (sh StatsHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {

is := sh.do.InfoSchema()
h := sh.do.StatsHandle()

tbl, err := is.TableByName(model.NewCIStr(params[pDBName]), model.NewCIStr(params[pTableName]))
if err != nil {
writeError(w, err)
Expand Down
Loading

0 comments on commit 775dcf4

Please sign in to comment.