Skip to content

Commit

Permalink
executor: add CTEProducer that shared by all CTEExec (#44643)
Browse files Browse the repository at this point in the history
close #44649
  • Loading branch information
guo-shaoge authored Jun 19, 2023
1 parent b7773da commit cfef1b0
Show file tree
Hide file tree
Showing 3 changed files with 316 additions and 239 deletions.
82 changes: 45 additions & 37 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ type executorBuilder struct {
type CTEStorages struct {
ResTbl cteutil.Storage
IterInTbl cteutil.Storage
Producer *cteProducer
}

func newExecutorBuilder(ctx sessionctx.Context, is infoschema.InfoSchema, ti *TelemetryInfo) *executorBuilder {
Expand Down Expand Up @@ -5317,33 +5318,39 @@ func (b *executorBuilder) buildTableSample(v *plannercore.PhysicalTableSample) *
}

func (b *executorBuilder) buildCTE(v *plannercore.PhysicalCTE) Executor {
// 1. Build seedPlan.
if b.Ti != nil {
b.Ti.UseNonRecursive = true
}
seedExec := b.build(v.SeedPlan)
if b.err != nil {
return nil
if v.RecurPlan != nil && b.Ti != nil {
b.Ti.UseRecursive = true
}

// 2. Build tables to store intermediate results.
chkSize := b.ctx.GetSessionVars().MaxChunkSize
tps := seedExec.base().retFieldTypes
// iterOutTbl will be constructed in CTEExec.Open().
var resTbl cteutil.Storage
var iterInTbl cteutil.Storage

storageMap, ok := b.ctx.GetSessionVars().StmtCtx.CTEStorageMap.(map[int]*CTEStorages)
if !ok {
b.err = errors.New("type assertion for CTEStorageMap failed")
return nil
}

chkSize := b.ctx.GetSessionVars().MaxChunkSize
// iterOutTbl will be constructed in CTEExec.Open().
var resTbl cteutil.Storage
var iterInTbl cteutil.Storage
var producer *cteProducer
storages, ok := storageMap[v.CTE.IDForStorage]
if ok {
// Storage already setup.
resTbl = storages.ResTbl
iterInTbl = storages.IterInTbl
producer = storages.Producer
} else {
// Build seed part.
seedExec := b.build(v.SeedPlan)
if b.err != nil {
return nil
}

// Setup storages.
tps := seedExec.base().retFieldTypes
resTbl = cteutil.NewStorageRowContainer(tps, chkSize)
if err := resTbl.OpenAndRef(); err != nil {
b.err = err
Expand All @@ -5355,38 +5362,39 @@ func (b *executorBuilder) buildCTE(v *plannercore.PhysicalCTE) Executor {
return nil
}
storageMap[v.CTE.IDForStorage] = &CTEStorages{ResTbl: resTbl, IterInTbl: iterInTbl}
}

// 3. Build recursive part.
if v.RecurPlan != nil && b.Ti != nil {
b.Ti.UseRecursive = true
}
recursiveExec := b.build(v.RecurPlan)
if b.err != nil {
return nil
}
// Build recursive part.
recursiveExec := b.build(v.RecurPlan)
if b.err != nil {
return nil
}
var sel []int
if v.CTE.IsDistinct {
sel = make([]int, chkSize)
for i := 0; i < chkSize; i++ {
sel[i] = i
}
}

var sel []int
if v.CTE.IsDistinct {
sel = make([]int, chkSize)
for i := 0; i < chkSize; i++ {
sel[i] = i
producer = &cteProducer{
ctx: b.ctx,
seedExec: seedExec,
recursiveExec: recursiveExec,
resTbl: resTbl,
iterInTbl: iterInTbl,
isDistinct: v.CTE.IsDistinct,
sel: sel,
hasLimit: v.CTE.HasLimit,
limitBeg: v.CTE.LimitBeg,
limitEnd: v.CTE.LimitEnd,
isInApply: v.CTE.IsInApply,
}
storageMap[v.CTE.IDForStorage].Producer = producer
}

return &CTEExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID()),
seedExec: seedExec,
recursiveExec: recursiveExec,
resTbl: resTbl,
iterInTbl: iterInTbl,
chkIdx: 0,
isDistinct: v.CTE.IsDistinct,
sel: sel,
hasLimit: v.CTE.HasLimit,
limitBeg: v.CTE.LimitBeg,
limitEnd: v.CTE.LimitEnd,
isInApply: v.CTE.IsInApply,
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID()),
producer: producer,
}
}

Expand Down
Loading

0 comments on commit cfef1b0

Please sign in to comment.