Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: Add the HashAggExec runtime information #20577

Merged
merged 78 commits into from
Nov 25, 2020
Merged
Show file tree
Hide file tree
Changes from 35 commits
Commits
Show all changes
78 commits
Select commit Hold shift + click to select a range
cabdd3e
init
jyz0309 Oct 21, 2020
8acbb76
init
jyz0309 Oct 21, 2020
599c418
optimize the execution info
jyz0309 Oct 21, 2020
c6a43f0
add the task num
jyz0309 Oct 21, 2020
56d10d8
go fmt
jyz0309 Oct 22, 2020
fb2fd32
Merge branch 'master' into hashaggExec
qw4990 Oct 22, 2020
a4147ae
add unit test
jyz0309 Oct 23, 2020
7bd75d4
Merge branch 'hashaggExec' of github.com:jyz0309/tidb into hashaggExec
jyz0309 Oct 23, 2020
4c18b39
change the channel close location
jyz0309 Oct 23, 2020
a2a7c96
test no close
jyz0309 Oct 27, 2020
b2e71c2
Merge branch 'master' into hashaggExec
crazycs520 Oct 28, 2020
49e93a3
fix check-dev
jyz0309 Oct 29, 2020
78c73d3
Merge branch 'hashaggExec' of github.com:jyz0309/tidb into hashaggExec
jyz0309 Oct 29, 2020
0b06e2c
fix_check-dev
jyz0309 Oct 29, 2020
ce36fe0
fix the Merge() bug
jyz0309 Oct 29, 2020
933a5ff
use slice instead of channel
jyz0309 Oct 29, 2020
7f7bf09
fix bug
jyz0309 Oct 29, 2020
56c8a65
remove the race condition
jyz0309 Oct 30, 2020
406001e
add test
jyz0309 Nov 2, 2020
67b5da4
fix test
jyz0309 Nov 2, 2020
0e8c1b7
Merge branch 'master' into hashaggExec
jyz0309 Nov 2, 2020
798ba7f
just a backup
jyz0309 Nov 8, 2020
c8a3a7e
add test
jyz0309 Nov 9, 2020
86ed7db
Merge branch 'hashaggExec' of github.com:jyz0309/tidb into hashaggExec
jyz0309 Nov 9, 2020
79e95e6
fix test
jyz0309 Nov 9, 2020
02ebbba
fix test
jyz0309 Nov 9, 2020
65b13a3
fix
jyz0309 Nov 9, 2020
9d37226
Merge branch 'master' into hashaggExec
jyz0309 Nov 9, 2020
3201068
fix comment
jyz0309 Nov 11, 2020
b9c4e86
Merge branch 'hashaggExec' of github.com:jyz0309/tidb into hashaggExec
jyz0309 Nov 11, 2020
974e597
add aggWorkerStats in baseWorker
jyz0309 Nov 11, 2020
dc1974f
add tot_wait,tot_exec,tot_time
jyz0309 Nov 11, 2020
7aa1398
Merge branch 'master' into hashaggExec
jyz0309 Nov 11, 2020
b91806d
add Clone() Merge()
jyz0309 Nov 11, 2020
376bf0a
Merge branch 'hashaggExec' of github.com:jyz0309/tidb into hashaggExec
jyz0309 Nov 11, 2020
1c8a36a
add final worker
jyz0309 Nov 12, 2020
f0b5911
fix test
jyz0309 Nov 12, 2020
f75e925
add comment
jyz0309 Nov 12, 2020
95e7007
address comment
jyz0309 Nov 13, 2020
48ac38c
add comment
jyz0309 Nov 13, 2020
44a3aaf
add wait_time and exec_time timing in consumeintermdata
jyz0309 Nov 14, 2020
e995911
Merge branch 'master' into hashaggExec
crazycs520 Nov 16, 2020
d837b13
fix the data race condition
jyz0309 Nov 16, 2020
d92e53e
Merge branch 'hashaggExec' of github.com:jyz0309/tidb into hashaggExec
jyz0309 Nov 16, 2020
31caac6
Merge branch 'master' into hashaggExec
jyz0309 Nov 16, 2020
8a83f19
fix race
jyz0309 Nov 16, 2020
9f93581
Merge branch 'hashaggExec' of github.com:jyz0309/tidb into hashaggExec
jyz0309 Nov 16, 2020
1385791
fix test
jyz0309 Nov 16, 2020
08b789c
fix test
jyz0309 Nov 16, 2020
403bceb
test
jyz0309 Nov 16, 2020
d851336
test
jyz0309 Nov 16, 2020
3e2451b
make aggWorkerStats int64 and atomic
jyz0309 Nov 16, 2020
4406ea5
atomic merge() func
jyz0309 Nov 16, 2020
baee217
Merge branch 'master' into hashaggExec
crazycs520 Nov 16, 2020
870c775
add read slice
jyz0309 Nov 17, 2020
dafa8d0
Merge branch 'hashaggExec' of github.com:jyz0309/tidb into hashaggExec
jyz0309 Nov 17, 2020
4f9c5e5
test
jyz0309 Nov 17, 2020
4120181
fix race
jyz0309 Nov 20, 2020
cbfa2b2
Merge branch 'master' into hashaggExec
jyz0309 Nov 20, 2020
6aa48f6
fix
jyz0309 Nov 20, 2020
b44ba81
test check e.readPartialWorker nil
jyz0309 Nov 20, 2020
4e1e275
test
jyz0309 Nov 21, 2020
e98e112
Merge branch 'master' into hashaggExec
crazycs520 Nov 23, 2020
86fa981
refine code and fix race
crazycs520 Nov 23, 2020
09adbd2
refine record
crazycs520 Nov 23, 2020
2cf5bd1
refine code
crazycs520 Nov 23, 2020
0969f2e
fix race
crazycs520 Nov 23, 2020
7200a9a
try to fix race
crazycs520 Nov 23, 2020
bb58664
Merge branch 'master' into hashaggExec
crazycs520 Nov 23, 2020
2250aac
try to fix race
crazycs520 Nov 23, 2020
5012135
Merge branch 'master' into hashaggExec
crazycs520 Nov 23, 2020
1dab7a3
try to fix race
crazycs520 Nov 23, 2020
d2cebe6
Merge branch 'master' into hashaggExec
crazycs520 Nov 23, 2020
9d7f5f8
Merge branch 'master' into hashaggExec
crazycs520 Nov 23, 2020
e7f8be5
Merge branch 'master' into hashaggExec
crazycs520 Nov 24, 2020
9484e9a
Merge branch 'master' of https://github.com/pingcap/tidb into hashagg…
crazycs520 Nov 25, 2020
595fe51
Merge branch 'hashaggExec' of https://github.com/jyz0309/tidb into ha…
crazycs520 Nov 25, 2020
30b5a25
Merge branch 'master' into hashaggExec
crazycs520 Nov 25, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
202 changes: 171 additions & 31 deletions executor/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ import (
"bytes"
"context"
"fmt"
"sort"
"sync"
"sync/atomic"
"time"

"github.com/cznic/mathutil"
"github.com/pingcap/errors"
Expand All @@ -43,18 +46,22 @@ type aggPartialResultMapper map[string][]aggfuncs.PartialResult

// baseHashAggWorker stores the common attributes of HashAggFinalWorker and HashAggPartialWorker.
type baseHashAggWorker struct {
ctx sessionctx.Context
finishCh <-chan struct{}
aggFuncs []aggfuncs.AggFunc
maxChunkSize int
stats *HashAggRuntimeStats
ctx sessionctx.Context
finishCh <-chan struct{}
aggFuncs []aggfuncs.AggFunc
maxChunkSize int
aggWorkerStats time.Duration
}

func newBaseHashAggWorker(ctx sessionctx.Context, finishCh <-chan struct{}, aggFuncs []aggfuncs.AggFunc, maxChunkSize int) baseHashAggWorker {
func newBaseHashAggWorker(ctx sessionctx.Context, finishCh <-chan struct{}, aggFuncs []aggfuncs.AggFunc, maxChunkSize int, stat *HashAggRuntimeStats) baseHashAggWorker {
return baseHashAggWorker{
ctx: ctx,
finishCh: finishCh,
aggFuncs: aggFuncs,
maxChunkSize: maxChunkSize,
stats: stat,
ctx: ctx,
finishCh: finishCh,
aggFuncs: aggFuncs,
maxChunkSize: maxChunkSize,
aggWorkerStats: 0,
}
}

Expand Down Expand Up @@ -171,6 +178,8 @@ type HashAggExec struct {
executed bool

memTracker *memory.Tracker // track memory usage.

stats *HashAggRuntimeStats
}

// HashAggInput indicates the input of hash agg exec.
Expand Down Expand Up @@ -234,22 +243,6 @@ func (e *HashAggExec) Close() error {
for range e.finalOutputCh {
}
e.executed = false

if e.runtimeStats != nil {
var partialConcurrency, finalConcurrency int
if e.isUnparallelExec {
partialConcurrency = 0
finalConcurrency = 0
} else {
partialConcurrency = cap(e.partialWorkers)
finalConcurrency = cap(e.finalWorkers)
}
partialConcurrencyInfo := execdetails.NewConcurrencyInfo("PartialConcurrency", partialConcurrency)
finalConcurrencyInfo := execdetails.NewConcurrencyInfo("FinalConcurrency", finalConcurrency)
runtimeStats := &execdetails.RuntimeStatsWithConcurrencyInfo{}
runtimeStats.SetConcurrencyInfo(partialConcurrencyInfo, finalConcurrencyInfo)
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, runtimeStats)
}
return e.baseExecutor.Close()
}

Expand Down Expand Up @@ -300,10 +293,12 @@ func (e *HashAggExec) initForParallelExec(ctx sessionctx.Context) {
e.partialWorkers = make([]HashAggPartialWorker, partialConcurrency)
e.finalWorkers = make([]HashAggFinalWorker, finalConcurrency)

e.initRuntimeStats()

// Init partial workers.
for i := 0; i < partialConcurrency; i++ {
w := HashAggPartialWorker{
baseHashAggWorker: newBaseHashAggWorker(e.ctx, e.finishCh, e.PartialAggFuncs, e.maxChunkSize),
baseHashAggWorker: newBaseHashAggWorker(e.ctx, e.finishCh, e.PartialAggFuncs, e.maxChunkSize, e.stats),
inputCh: e.partialInputChs[i],
outputChs: e.partialOutputChs,
giveBackCh: e.inputCh,
Expand All @@ -328,7 +323,7 @@ func (e *HashAggExec) initForParallelExec(ctx sessionctx.Context) {
// Init final workers.
for i := 0; i < finalConcurrency; i++ {
e.finalWorkers[i] = HashAggFinalWorker{
baseHashAggWorker: newBaseHashAggWorker(e.ctx, e.finishCh, e.FinalAggFuncs, e.maxChunkSize),
baseHashAggWorker: newBaseHashAggWorker(e.ctx, e.finishCh, e.FinalAggFuncs, e.maxChunkSize, e.stats),
partialResultMap: make(aggPartialResultMapper),
groupSet: set.NewStringSet(),
inputCh: e.partialOutputChs[i],
Expand Down Expand Up @@ -366,6 +361,7 @@ func recoveryHashAgg(output chan *AfFinalResult, r interface{}) {
}

func (w *HashAggPartialWorker) run(ctx sessionctx.Context, waitGroup *sync.WaitGroup, finalConcurrency int) {
start := time.Now()
needShuffle, sc := false, ctx.GetSessionVars().StmtCtx
defer func() {
if r := recover(); r != nil {
Expand All @@ -376,22 +372,34 @@ func (w *HashAggPartialWorker) run(ctx sessionctx.Context, waitGroup *sync.WaitG
}
w.memTracker.Consume(-w.chk.MemoryUsage())
waitGroup.Done()
jyz0309 marked this conversation as resolved.
Show resolved Hide resolved
w.aggWorkerStats = time.Since(start)
if w.stats != nil {
atomic.AddInt64(&w.stats.PartialTotalTime, int64(time.Since(start)))
jyz0309 marked this conversation as resolved.
Show resolved Hide resolved
}
}()
for {
waitStart := time.Now()
if !w.getChildInput() {
return
}
if w.stats != nil {
atomic.AddInt64(&w.stats.PartialWaitTime, int64(time.Since(waitStart)))
}
if err := w.updatePartialResult(ctx, sc, w.chk, len(w.partialResultsMap)); err != nil {
w.globalOutputCh <- &AfFinalResult{err: err}
return
}
if w.stats != nil {
atomic.AddInt64(&w.stats.PartialTaskNum, 1)
}
// The intermData can be promised to be not empty if reaching here,
// so we set needShuffle to be true.
needShuffle = true
}
}

func (w *HashAggPartialWorker) updatePartialResult(ctx sessionctx.Context, sc *stmtctx.StatementContext, chk *chunk.Chunk, finalConcurrency int) (err error) {
start := time.Now()
w.groupKey, err = getGroupKey(w.ctx, chk, w.groupKey, w.groupByItems)
if err != nil {
return err
Expand All @@ -408,6 +416,9 @@ func (w *HashAggPartialWorker) updatePartialResult(ctx sessionctx.Context, sc *s
}
}
}
if w.stats != nil {
atomic.AddInt64(&w.stats.PartialExecTime, int64(time.Since(start)))
}
return nil
}

Expand Down Expand Up @@ -513,6 +524,9 @@ func (w *HashAggFinalWorker) consumeIntermData(sctx sessionctx.Context) (err err
if input, ok = w.getPartialInput(); !ok {
return nil
}
if w.stats != nil {
atomic.AddInt64(&w.stats.FinalTaskNum, 1)
}
if intermDataBuffer == nil {
intermDataBuffer = make([][]aggfuncs.PartialResult, 0, w.maxChunkSize)
}
Expand Down Expand Up @@ -580,11 +594,13 @@ func (w *HashAggFinalWorker) receiveFinalResultHolder() (*chunk.Chunk, bool) {
}

func (w *HashAggFinalWorker) run(ctx sessionctx.Context, waitGroup *sync.WaitGroup) {
start := time.Now()
defer func() {
if r := recover(); r != nil {
recoveryHashAgg(w.outputCh, r)
}
waitGroup.Done()
w.aggWorkerStats = time.Since(start)
}()
if err := w.consumeIntermData(ctx); err != nil {
w.outputCh <- &AfFinalResult{err: err}
Expand Down Expand Up @@ -660,20 +676,36 @@ func (e *HashAggExec) waitFinalWorkerAndCloseFinalOutput(waitGroup *sync.WaitGro

func (e *HashAggExec) prepare4ParallelExec(ctx context.Context) {
go e.fetchChildData(ctx)

partialWorkerWaitGroup := &sync.WaitGroup{}
partialWorkerWaitGroup.Add(len(e.partialWorkers))
partialStart := time.Now()
jyz0309 marked this conversation as resolved.
Show resolved Hide resolved
for i := range e.partialWorkers {
go e.partialWorkers[i].run(e.ctx, partialWorkerWaitGroup, len(e.finalWorkers))
}
go e.waitPartialWorkerAndCloseOutputChs(partialWorkerWaitGroup)

go func() {
e.waitPartialWorkerAndCloseOutputChs(partialWorkerWaitGroup)
if e.stats != nil {
e.stats.PartialWallTime = time.Since(partialStart)
for i, worker := range e.partialWorkers {
e.stats.PartialWorkerTime[i] = worker.aggWorkerStats
}
}
}()
finalWorkerWaitGroup := &sync.WaitGroup{}
finalWorkerWaitGroup.Add(len(e.finalWorkers))
finalStart := time.Now()
jyz0309 marked this conversation as resolved.
Show resolved Hide resolved
for i := range e.finalWorkers {
go e.finalWorkers[i].run(e.ctx, finalWorkerWaitGroup)
}
go e.waitFinalWorkerAndCloseFinalOutput(finalWorkerWaitGroup)
go func() {
e.waitFinalWorkerAndCloseFinalOutput(finalWorkerWaitGroup)
if e.stats != nil {
e.stats.FinalWallTime = time.Since(finalStart)
for i, worker := range e.finalWorkers {
e.stats.FinalWorkerTime[i] = worker.aggWorkerStats
}
}
}()
}

// HashAggExec employs one input reader, M partial workers and N final workers to execute parallelly.
Expand Down Expand Up @@ -816,6 +848,114 @@ func (e *HashAggExec) getPartialResults(groupKey string) []aggfuncs.PartialResul
return partialResults
}

func (e *HashAggExec) initRuntimeStats() {
if e.runtimeStats != nil && e.stats == nil {
e.stats = &HashAggRuntimeStats{
PartialNum: e.ctx.GetSessionVars().HashAggPartialConcurrency(),
FinalNum: e.ctx.GetSessionVars().HashAggFinalConcurrency(),
PartialTaskNum: 0,
FinalTaskNum: 0,
PartialWallTime: 0,
FinalWallTime: 0,
PartialWorkerTime: make([]time.Duration, e.ctx.GetSessionVars().HashAggPartialConcurrency()),
FinalWorkerTime: make([]time.Duration, e.ctx.GetSessionVars().HashAggFinalConcurrency()),
}
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
}
}

// HashAggRuntimeStats record the HashAggExec runtime stat
type HashAggRuntimeStats struct {
PartialNum int
FinalNum int

PartialTaskNum int64
FinalTaskNum int64

PartialWallTime time.Duration
FinalWallTime time.Duration

PartialWorkerTime []time.Duration
FinalWorkerTime []time.Duration
jyz0309 marked this conversation as resolved.
Show resolved Hide resolved

PartialWaitTime int64
PartialExecTime int64
PartialTotalTime int64
}

func (e *HashAggRuntimeStats) String() string {
var result string
if e.PartialWallTime != 0 {
result += fmt.Sprintf("partial_worker:{wall_time:%s, concurrency:%d, task_num:%d, tot_wait:%s, tot_exec:%s, tot_time:%s", e.PartialWallTime, e.PartialNum, e.PartialTaskNum, time.Duration(e.PartialWaitTime), time.Duration(e.PartialExecTime), time.Duration(e.PartialTotalTime))
}
if e.PartialWorkerTime != nil {
sort.Slice(e.PartialWorkerTime, func(i, j int) bool { return e.PartialWorkerTime[i] < e.PartialWorkerTime[j] })
n := len(e.PartialWorkerTime)
if n != 0 {
result += fmt.Sprintf(", max:%v, p95:%v", e.PartialWorkerTime[n-1], e.PartialWorkerTime[n*19/20])
}
}
if len(result) > 0 {
result += "}, "
}
if e.FinalWallTime != 0 {
result += fmt.Sprintf("final_worker:{wall_time:%s, concurrency:%d, task_num:%d", e.FinalWallTime, e.FinalNum, e.FinalTaskNum)
}
if e.FinalWorkerTime != nil {
sort.Slice(e.FinalWorkerTime, func(i, j int) bool { return e.FinalWorkerTime[i] < e.FinalWorkerTime[j] })
m := len(e.FinalWorkerTime)
if m != 0 {
result += fmt.Sprintf(", max:%v, p95:%v", e.FinalWorkerTime[m-1], e.FinalWorkerTime[m*19/20])
}
}
if len(result) > 0 {
return result + "}"
}
return result
}

// Clone implements the RuntimeStats interface.
func (e *HashAggRuntimeStats) Clone() execdetails.RuntimeStats {
newRs := &HashAggRuntimeStats{
PartialTaskNum: e.PartialTaskNum,
FinalTaskNum: e.FinalTaskNum,
PartialWallTime: e.PartialWallTime,
FinalWallTime: e.FinalWallTime,
PartialNum: e.PartialNum,
FinalNum: e.FinalNum,
PartialWorkerTime: e.PartialWorkerTime,
FinalWorkerTime: e.FinalWorkerTime,
PartialExecTime: e.PartialExecTime,
PartialTotalTime: e.PartialTotalTime,
PartialWaitTime: e.PartialWaitTime,
}
return newRs
}

// Merge implements the RuntimeStats interface.
func (e *HashAggRuntimeStats) Merge(other execdetails.RuntimeStats) {
tmp, ok := other.(*HashAggRuntimeStats)
if !ok {
return
}
e.PartialWallTime += tmp.PartialWallTime
e.FinalWallTime += tmp.FinalWallTime
e.PartialTaskNum += tmp.PartialTaskNum
e.FinalTaskNum += tmp.FinalTaskNum
e.PartialNum += tmp.PartialNum
e.FinalNum += tmp.FinalNum
e.PartialWaitTime += tmp.PartialWaitTime
e.PartialTotalTime += tmp.PartialTotalTime
e.PartialExecTime += tmp.PartialExecTime
e.FinalWorkerTime = append(e.FinalWorkerTime, tmp.FinalWorkerTime...)
e.PartialWorkerTime = append(e.PartialWorkerTime, tmp.PartialWorkerTime...)
}

// Tp implements the RuntimeStats interface.
func (e *HashAggRuntimeStats) Tp() int {
return execdetails.TpHashAggRuntimeStat
}

// StreamAggExec deals with all the aggregate functions.
// It assumes all the input data is sorted by group by key.
// When Next() is called, it will return a result for the same group.
Expand Down
44 changes: 20 additions & 24 deletions executor/aggregate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@ package executor_test

import (
"fmt"
"time"

. "github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/executor"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/util/testkit"
"github.com/pingcap/tidb/util/testutil"
Expand Down Expand Up @@ -1143,28 +1145,22 @@ func (s *testSuiteAgg) TestIssue17216(c *C) {
tk.MustQuery("SELECT count(distinct col1) FROM t1").Check(testkit.Rows("48"))
}

func (s *testSuiteAgg) TestIssue19426(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int primary key, b int)")
tk.MustExec("insert into t values (1, 11), (4, 44), (2, 22), (3, 33)")
tk.MustQuery("select sum(case when a <= 0 or a > 1000 then 0.0 else b end) from t").
Check(testkit.Rows("110.0"))
tk.MustQuery("select avg(case when a <= 0 or a > 1000 then 0.0 else b end) from t").
Check(testkit.Rows("27.50000"))
tk.MustQuery("select distinct (case when a <= 0 or a > 1000 then 0.0 else b end) v from t order by v").
Check(testkit.Rows("11.0", "22.0", "33.0", "44.0"))
tk.MustQuery("select group_concat(case when a <= 0 or a > 1000 then 0.0 else b end order by -a) from t").
Check(testkit.Rows("44.0,33.0,22.0,11.0"))
tk.MustQuery("select group_concat(a, b, case when a <= 0 or a > 1000 then 0.0 else b end order by -a) from t").
Check(testkit.Rows("44444.0,33333.0,22222.0,11111.0"))
tk.MustQuery("select group_concat(distinct case when a <= 0 or a > 1000 then 0.0 else b end order by -a) from t").
Check(testkit.Rows("44.0,33.0,22.0,11.0"))
tk.MustQuery("select max(case when a <= 0 or a > 1000 then 0.0 else b end) from t").
Check(testkit.Rows("44.0"))
tk.MustQuery("select min(case when a <= 0 or a > 1000 then 0.0 else b end) from t").
Check(testkit.Rows("11.0"))
tk.MustQuery("select a, b, sum(case when a < 1000 then b else 0.0 end) over (order by a) from t").
Check(testkit.Rows("1 11 11.0", "2 22 33.0", "3 33 66.0", "4 44 110.0"))
func (s *testSuiteAgg) TestHashAggRuntimeStat(c *C) {
jyz0309 marked this conversation as resolved.
Show resolved Hide resolved
stats := &executor.HashAggRuntimeStats{
PartialNum: 5,
FinalNum: 5,
PartialTaskNum: 5,
PartialWallTime: 1 * time.Second,
FinalTaskNum: 5,
FinalWallTime: 2 * time.Second,
PartialWorkerTime: []time.Duration{1 * time.Second, 1 * time.Second, 1 * time.Second, 2 * time.Second, 3 * time.Second},
FinalWorkerTime: []time.Duration{1 * time.Second, 1 * time.Second, 1 * time.Second, 2 * time.Second, 4 * time.Second},
PartialWaitTime: int64(2 * time.Second),
PartialTotalTime: int64(2 * time.Second),
PartialExecTime: int64(2 * time.Second),
}
c.Assert(stats.String(), Equals, "partial_worker:{wall_time:1s, concurrency:5, task_num:5, tot_wait:2s, tot_exec:2s, tot_time:2s, max:3s, p95:3s}, final_worker:{wall_time:2s, concurrency:5, task_num:5, max:4s, p95:4s}")
c.Assert(stats.String(), Equals, stats.Clone().String())
stats.Merge(stats.Clone())
c.Assert(stats.String(), Equals, "partial_worker:{wall_time:2s, concurrency:10, task_num:10, tot_wait:4s, tot_exec:4s, tot_time:4s, max:3s, p95:3s}, final_worker:{wall_time:4s, concurrency:10, task_num:10, max:4s, p95:4s}")
}
Loading