Skip to content

Commit

Permalink
brie: add error info column and history backup/restore info in sql (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
lichunzhu authored Jul 30, 2021
1 parent 99c4bc1 commit ab4a990
Show file tree
Hide file tree
Showing 4 changed files with 201 additions and 35 deletions.
41 changes: 39 additions & 2 deletions executor/brie.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,13 @@ import (
"github.com/tikv/client-go/v2/oracle"
)

const clearInterval = 10 * time.Minute

var outdatedDuration = types.Duration{
Duration: 30 * time.Minute,
Fsp: types.DefaultFsp,
}

// brieTaskProgress tracks a task's current progress.
type brieTaskProgress struct {
// current progress of the task.
Expand Down Expand Up @@ -76,11 +83,13 @@ func (p *brieTaskProgress) Close() {
type brieTaskInfo struct {
queueTime types.Time
execTime types.Time
finishTime types.Time
kind ast.BRIEKind
storage string
connID uint64
backupTS uint64
archiveSize uint64
message string
}

type brieQueueItem struct {
Expand All @@ -93,6 +102,8 @@ type brieQueue struct {
nextID uint64
tasks sync.Map

lastClearTime time.Time

workerCh chan struct{}
}

Expand Down Expand Up @@ -151,10 +162,26 @@ func (bq *brieQueue) cancelTask(taskID uint64) {
if !ok {
return
}
bq.tasks.Delete(taskID)
item.(*brieQueueItem).cancel()
}

func (bq *brieQueue) clearTask(sc *stmtctx.StatementContext) {
if time.Since(bq.lastClearTime) < clearInterval {
return
}

bq.lastClearTime = time.Now()
currTime := types.CurrentTime(mysql.TypeDatetime)

bq.tasks.Range(func(key, value interface{}) bool {
item := value.(*brieQueueItem)
if d := currTime.Sub(sc, &item.info.finishTime); d.Compare(outdatedDuration) > 0 {
bq.tasks.Delete(key)
}
return true
})
}

func (b *executorBuilder) parseTSString(ts string) (uint64, error) {
sc := &stmtctx.StatementContext{TimeZone: b.ctx.GetSessionVars().Location()}
t, err := types.ParseTime(sc, ts, mysql.TypeTimestamp, types.MaxFsp)
Expand Down Expand Up @@ -306,6 +333,7 @@ func (e *BRIEExec) Next(ctx context.Context, req *chunk.Chunk) error {
}

bq := globalBRIEQueue
bq.clearTask(e.ctx.GetSessionVars().StmtCtx)

e.info.connID = e.ctx.GetSessionVars().ConnectionID
e.info.queueTime = types.CurrentTime(mysql.TypeDatetime)
Expand Down Expand Up @@ -346,9 +374,12 @@ func (e *BRIEExec) Next(ctx context.Context, req *chunk.Chunk) error {
default:
err = errors.Errorf("unsupported BRIE statement kind: %s", e.info.kind)
}
e.info.finishTime = types.CurrentTime(mysql.TypeDatetime)
if err != nil {
e.info.message = err.Error()
return err
}
e.info.message = ""

req.AppendString(0, e.info.storage)
req.AppendUint64(1, e.info.archiveSize)
Expand Down Expand Up @@ -378,11 +409,17 @@ func (e *ShowExec) fetchShowBRIE(kind ast.BRIEKind) error {
e.result.AppendFloat64(2, 100.0*float64(current)/float64(item.progress.total))
e.result.AppendTime(3, item.info.queueTime)
e.result.AppendTime(4, item.info.execTime)
e.result.AppendNull(5) // FIXME: fill in finish time after keeping history.
e.result.AppendTime(5, item.info.finishTime)
e.result.AppendUint64(6, item.info.connID)
if len(item.info.message) > 0 {
e.result.AppendString(7, item.info.message)
} else {
e.result.AppendNull(7)
}
}
return true
})
globalBRIEQueue.clearTask(e.ctx.GetSessionVars().StmtCtx)
return nil
}

Expand Down
133 changes: 131 additions & 2 deletions executor/brie_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,145 @@

package executor

import . "github.com/pingcap/check"
import (
"context"
"fmt"
"strings"
gotime "time"

"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/mock"

. "github.com/pingcap/check"
"github.com/pingcap/parser"
"github.com/pingcap/parser/ast"
"github.com/pingcap/parser/auth"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
)

type testBRIESuite struct{}

var _ = Suite(&testBRIESuite{})

func (s *testPartitionSuite) TestGlueGetVersion(c *C) {
func (s *testBRIESuite) TestGlueGetVersion(c *C) {
g := tidbGlueSession{}
version := g.GetVersion()
c.Assert(version, Matches, `(.|\n)*Release Version(.|\n)*`)
c.Assert(version, Matches, `(.|\n)*Git Commit Hash(.|\n)*`)
c.Assert(version, Matches, `(.|\n)*GoVersion(.|\n)*`)
}

func brieTaskInfoToResult(info *brieTaskInfo) string {
arr := make([]string, 0, 8)
arr = append(arr, info.storage)
arr = append(arr, "Wait")
arr = append(arr, "0")
arr = append(arr, info.queueTime.String())
arr = append(arr, info.execTime.String())
arr = append(arr, info.finishTime.String())
arr = append(arr, fmt.Sprintf("%d", info.connID))
if len(info.message) > 0 {
arr = append(arr, info.message)
} else {
arr = append(arr, "NULL")
}
return strings.Join(arr, ", ") + "\n"
}

func fetchShowBRIEResult(c *C, e *ShowExec, brieColTypes []*types.FieldType) string {
e.result = newFirstChunk(e)
c.Assert(e.fetchShowBRIE(ast.BRIEKindBackup), IsNil)
return e.result.ToString(brieColTypes)
}

func (s *testBRIESuite) TestFetchShowBRIE(c *C) {
// Compose a mocked session manager.
ps := make([]*util.ProcessInfo, 0, 1)
pi := &util.ProcessInfo{
ID: 0,
User: "test",
Host: "127.0.0.1",
DB: "test",
Command: 't',
State: 1,
Info: "",
}
ps = append(ps, pi)
sm := &mockSessionManager{
PS: ps,
}

sctx := mock.NewContext()
sctx.SetSessionManager(sm)
sctx.GetSessionVars().User = &auth.UserIdentity{Username: "test"}

ctx := context.Background()
// Compose schema.
p := parser.New()
p.SetParserConfig(parser.ParserConfig{EnableWindowFunction: true, EnableStrictDoubleTypeCheck: true})
stmt, err := p.ParseOneStmt("show backups", "", "")
c.Assert(err, IsNil)
plan, _, err := core.BuildLogicalPlan(ctx, sctx, stmt, infoschema.MockInfoSchema([]*model.TableInfo{core.MockSignedTable(), core.MockUnsignedTable(), core.MockView()}))
c.Assert(err, IsNil)
schema := plan.Schema()

// Compose executor.
e := &ShowExec{
baseExecutor: newBaseExecutor(sctx, schema, 0),
Tp: ast.ShowBackups,
}
c.Assert(e.Open(ctx), IsNil)

tp := mysql.TypeDatetime
lateTime := types.NewTime(types.FromGoTime(gotime.Now().Add(-outdatedDuration.Duration+1)), tp, 0)
brieColTypes := make([]*types.FieldType, 0, len(schema.Columns))
for _, col := range schema.Columns {
brieColTypes = append(brieColTypes, col.RetType)
}

// Register brie task info
info1 := &brieTaskInfo{
kind: ast.BRIEKindBackup,
connID: e.ctx.GetSessionVars().ConnectionID,
queueTime: lateTime,
execTime: lateTime,
finishTime: lateTime,
storage: "noop://test/backup1",
message: "killed",
}
info1Res := brieTaskInfoToResult(info1)

globalBRIEQueue.registerTask(ctx, info1)
c.Assert(fetchShowBRIEResult(c, e, brieColTypes), Equals, info1Res)

// Query again, this info should already have been cleaned
c.Assert(fetchShowBRIEResult(c, e, brieColTypes), HasLen, 0)

// Register this task again, we should be able to fetch this info
globalBRIEQueue.registerTask(ctx, info1)
c.Assert(fetchShowBRIEResult(c, e, brieColTypes), Equals, info1Res)

// Query again, we should be able to fetch this info again, because we have cleared in last clearInterval
c.Assert(fetchShowBRIEResult(c, e, brieColTypes), Equals, info1Res)

// Reset clear time, we should only fetch info2 this time.
globalBRIEQueue.lastClearTime = gotime.Now().Add(-clearInterval - gotime.Second)
currTime := types.CurrentTime(tp)
info2 := &brieTaskInfo{
kind: ast.BRIEKindBackup,
connID: e.ctx.GetSessionVars().ConnectionID,
queueTime: currTime,
execTime: currTime,
finishTime: currTime,
storage: "noop://test/backup2",
message: "",
}
info2Res := brieTaskInfoToResult(info2)
globalBRIEQueue.registerTask(ctx, info2)
globalBRIEQueue.clearTask(e.ctx.GetSessionVars().StmtCtx)
c.Assert(fetchShowBRIEResult(c, e, brieColTypes), Equals, info2Res)
}
4 changes: 2 additions & 2 deletions planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -4102,8 +4102,8 @@ func buildShowSchema(s *ast.ShowStmt, isView bool, isSequence bool) (schema *exp
names = []string{"Supported_builtin_functions"}
ftypes = []byte{mysql.TypeVarchar}
case ast.ShowBackups, ast.ShowRestores:
names = []string{"Destination", "State", "Progress", "Queue_time", "Execution_time", "Finish_time", "Connection"}
ftypes = []byte{mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeDouble, mysql.TypeDatetime, mysql.TypeDatetime, mysql.TypeDatetime, mysql.TypeLonglong}
names = []string{"Destination", "State", "Progress", "Queue_time", "Execution_time", "Finish_time", "Connection", "Message"}
ftypes = []byte{mysql.TypeVarchar, mysql.TypeVarchar, mysql.TypeDouble, mysql.TypeDatetime, mysql.TypeDatetime, mysql.TypeDatetime, mysql.TypeLonglong, mysql.TypeVarchar}
}

schema = expression.NewSchema(make([]*expression.Column, 0, len(names))...)
Expand Down
58 changes: 29 additions & 29 deletions util/chunk/row.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,35 +217,35 @@ func (r Row) ToString(ft []*types.FieldType) string {
var buf []byte
for colIdx := 0; colIdx < r.Chunk().NumCols(); colIdx++ {
if r.IsNull(colIdx) {
buf = append(buf, "nil, "...)
continue
}
switch ft[colIdx].EvalType() {
case types.ETInt:
buf = strconv.AppendInt(buf, r.GetInt64(colIdx), 10)
case types.ETString:
switch ft[colIdx].Tp {
case mysql.TypeEnum:
buf = append(buf, r.GetEnum(colIdx).String()...)
case mysql.TypeSet:
buf = append(buf, r.GetSet(colIdx).String()...)
default:
buf = append(buf, r.GetString(colIdx)...)
}
case types.ETDatetime, types.ETTimestamp:
buf = append(buf, r.GetTime(colIdx).String()...)
case types.ETDecimal:
buf = append(buf, r.GetMyDecimal(colIdx).ToString()...)
case types.ETDuration:
buf = append(buf, r.GetDuration(colIdx, ft[colIdx].Decimal).String()...)
case types.ETJson:
buf = append(buf, r.GetJSON(colIdx).String()...)
case types.ETReal:
switch ft[colIdx].Tp {
case mysql.TypeFloat:
buf = strconv.AppendFloat(buf, float64(r.GetFloat32(colIdx)), 'f', -1, 32)
case mysql.TypeDouble:
buf = strconv.AppendFloat(buf, r.GetFloat64(colIdx), 'f', -1, 64)
buf = append(buf, "NULL"...)
} else {
switch ft[colIdx].EvalType() {
case types.ETInt:
buf = strconv.AppendInt(buf, r.GetInt64(colIdx), 10)
case types.ETString:
switch ft[colIdx].Tp {
case mysql.TypeEnum:
buf = append(buf, r.GetEnum(colIdx).String()...)
case mysql.TypeSet:
buf = append(buf, r.GetSet(colIdx).String()...)
default:
buf = append(buf, r.GetString(colIdx)...)
}
case types.ETDatetime, types.ETTimestamp:
buf = append(buf, r.GetTime(colIdx).String()...)
case types.ETDecimal:
buf = append(buf, r.GetMyDecimal(colIdx).ToString()...)
case types.ETDuration:
buf = append(buf, r.GetDuration(colIdx, ft[colIdx].Decimal).String()...)
case types.ETJson:
buf = append(buf, r.GetJSON(colIdx).String()...)
case types.ETReal:
switch ft[colIdx].Tp {
case mysql.TypeFloat:
buf = strconv.AppendFloat(buf, float64(r.GetFloat32(colIdx)), 'f', -1, 32)
case mysql.TypeDouble:
buf = strconv.AppendFloat(buf, r.GetFloat64(colIdx), 'f', -1, 64)
}
}
}
if colIdx != r.Chunk().NumCols()-1 {
Expand Down

0 comments on commit ab4a990

Please sign in to comment.