Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner: add cascades wrapper for integrating cascades into current logical optimization phase. #58751

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions pkg/planner/cascades/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "cascades",
srcs = ["cascades.go"],
importpath = "github.com/pingcap/tidb/pkg/planner/cascades",
visibility = ["//visibility:public"],
deps = [
"//pkg/planner/cascades/base",
"//pkg/planner/cascades/base/cascadesctx",
"//pkg/planner/cascades/memo",
"//pkg/planner/cascades/task",
"//pkg/planner/core/base",
"//pkg/util/intest",
],
)

go_test(
name = "cascades_test",
timeout = "short",
srcs = ["cascades_test.go"],
flaky = True,
deps = ["//pkg/testkit"],
)
4 changes: 2 additions & 2 deletions pkg/planner/cascades/base/cascadesctx/cascades_ctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ import (
"github.com/pingcap/tidb/pkg/planner/cascades/memo"
)

// CascadesContext define the cascades context as interface, since it will be defined
// Context define the cascades context as interface, since it will be defined
// in cascades pkg, which ref task pkg with no doubt.
// while in the task pkg, the concrete task need receive cascades context as its
// constructing args, which will lead an import cycle.
// so that's why we separate it out of base pkg.
type CascadesContext interface {
type Context interface {
Destroy()
GetScheduler() base.Scheduler
PushTask(task base.Task)
Expand Down
105 changes: 105 additions & 0 deletions pkg/planner/cascades/cascades.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cascades

import (
"github.com/pingcap/tidb/pkg/planner/cascades/base"
"github.com/pingcap/tidb/pkg/planner/cascades/base/cascadesctx"
"github.com/pingcap/tidb/pkg/planner/cascades/memo"
"github.com/pingcap/tidb/pkg/planner/cascades/task"
corebase "github.com/pingcap/tidb/pkg/planner/core/base"
"github.com/pingcap/tidb/pkg/util/intest"
)

// Optimizer is a basic cascades search framework portal, driven by Context.
type Optimizer struct {
logic corebase.LogicalPlan
ctx cascadesctx.Context
}

// NewCascades return a new cascades obj for logical alternative searching.
func NewCascades(lp corebase.LogicalPlan) (*Optimizer, error) {
cas := &Optimizer{
logic: lp,
ctx: NewContext(lp.SCtx()),
}
ge, err := cas.ctx.GetMemo().Init(lp)
intest.Assert(err == nil)
intest.Assert(ge != nil)
if err != nil {
return nil, err
}
cas.ctx.GetScheduler().PushTask(task.NewOptGroupTask(cas.ctx, ge.GetGroup()))
return cas, err
}

// Execute run the yams search flow inside, returns error if it happened.
func (c *Optimizer) Execute() error {
return c.ctx.GetScheduler().ExecuteTasks()
}

// Destroy clean and reset basic elements inside.
func (c *Optimizer) Destroy() {
c.ctx.Destroy()
}

// GetMemo returns the memo structure inside cascades.
func (c *Optimizer) GetMemo() *memo.Memo {
return c.ctx.GetMemo()
}

// Context includes all the context stuff when go through memo optimizing.
type Context struct {
// pctx variable awareness.
pctx corebase.PlanContext
// memo management.
mm *memo.Memo
// task pool management.
scheduler base.Scheduler
}

// NewContext returns a new memo context responsible for manage all the stuff in cascades opt.
func NewContext(pctx corebase.PlanContext) *Context {
return &Context{
pctx: pctx,
// memo init with capacity.
mm: memo.NewMemo(pctx.GetSessionVars().StmtCtx.OperatorNum),
// task pool management.
scheduler: task.NewSimpleTaskScheduler(),
}
}

// Destroy the memo context, which will clean the resource allocated during this phase.
func (c *Context) Destroy() {
// when a memo optimizing phase is done for a session,
// we should put the stack back and clean the memo.
c.mm.Destroy()
c.scheduler.Destroy()
}

// GetScheduler return the stack inside this memo context.
func (c *Context) GetScheduler() base.Scheduler {
return c.scheduler
}

// PushTask puts a task into the stack structure inside.
func (c *Context) PushTask(task base.Task) {
c.scheduler.PushTask(task)
}

// GetMemo returns the basic memo structure.
func (c *Context) GetMemo() *memo.Memo {
return c.mm
}
38 changes: 38 additions & 0 deletions pkg/planner/cascades/cascades_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cascades_test

import (
"testing"

"github.com/pingcap/tidb/pkg/testkit"
)

func TestCascadesDrive(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.Session().GetSessionVars().SetEnableCascadesPlanner(true)

tk.MustExec("use test")
tk.MustExec("drop table if exists t1")
tk.MustExec("create table t1(a int not null, b int not null, key(a,b))")
tk.MustExec("insert into t1 values(1,1),(1,2),(2,1),(2,2),(1,1)")

// simple select for quick debug of memo, the normal test case is in tests/planner/cascades/integration.test.
tk.MustQuery("select 1").Check(testkit.Rows("1"))
tk.MustQuery("explain select 1").Check(testkit.Rows(""+
"Projection_3 1.00 root 1->Column#1",
"└─TableDual_4 1.00 root rows:1"))
}
30 changes: 30 additions & 0 deletions pkg/planner/cascades/memo/memo.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,17 @@ func NewMemo(caps ...uint64) *Memo {
}
}

// Destroy indicates that when stack itself is useless like in the end of optimizing phase, we can destroy ourselves.
func (mm *Memo) Destroy() {
// when a memo itself is useless, we can clean itself actively.
mm.groupIDGen.id = 0
mm.rootGroup = nil
mm.groups.Init()
clear(mm.groupID2Group)
mm.hash2GlobalGroupExpr.Clear()
mm.hasher.Reset()
}

// GetHasher gets a hasher from the memo that ready to use.
func (mm *Memo) GetHasher() base2.Hasher {
mm.hasher.Reset()
Expand Down Expand Up @@ -329,6 +340,25 @@ type IteratorLP struct {
traceID int
}

// NewIterator new a logical plan iterator from current memo based on its root group.
func (mm *Memo) NewIterator() *IteratorLP {
return &IteratorLP{
root: mm.rootGroup,
stackInfo: make([]*list.Element, 0, mm.groups.Len()),
traceID: -1,
}
}

// Each iterator all logical plan from current memo group.
func (it *IteratorLP) Each(f func(base.LogicalPlan) bool) {
cur := it.Next()
for ; cur != nil; cur = it.Next() {
if !f(cur) {
break
}
}
}

// Next return valid logical plan implied in memo without duplication.
func (it *IteratorLP) Next() (logic base.LogicalPlan) {
for {
Expand Down
2 changes: 1 addition & 1 deletion pkg/planner/cascades/task/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (

// BaseTask is base task wrapper structure for encapsulating basic things.
type BaseTask struct {
ctx cascadesctx.CascadesContext
ctx cascadesctx.Context
}

// Push pushes a new task into inside stack.
Expand Down
2 changes: 1 addition & 1 deletion pkg/planner/cascades/task/task_apply_rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ type ApplyRuleTask struct {
}

// NewApplyRuleTask return a new apply rule task.
func NewApplyRuleTask(ctx cascadesctx.CascadesContext, gE *memo.GroupExpression, r rule.Rule) *ApplyRuleTask {
func NewApplyRuleTask(ctx cascadesctx.Context, gE *memo.GroupExpression, r rule.Rule) *ApplyRuleTask {
return &ApplyRuleTask{
BaseTask: BaseTask{
ctx: ctx,
Expand Down
2 changes: 1 addition & 1 deletion pkg/planner/cascades/task/task_opt_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type OptGroupTask struct {
}

// NewOptGroupTask returns a new optimizing group task.
func NewOptGroupTask(ctx cascadesctx.CascadesContext, g *memo.Group) base.Task {
func NewOptGroupTask(ctx cascadesctx.Context, g *memo.Group) base.Task {
return &OptGroupTask{BaseTask: BaseTask{
ctx: ctx,
}, group: g}
Expand Down
2 changes: 1 addition & 1 deletion pkg/planner/cascades/task/task_opt_group_expression.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type OptGroupExpressionTask struct {
}

// NewOptGroupExpressionTask return a targeting optimizing group expression task.
func NewOptGroupExpressionTask(ctx cascadesctx.CascadesContext, ge *memo.GroupExpression) *OptGroupExpressionTask {
func NewOptGroupExpressionTask(ctx cascadesctx.Context, ge *memo.GroupExpression) *OptGroupExpressionTask {
return &OptGroupExpressionTask{
BaseTask: BaseTask{ctx: ctx},
groupExpression: ge,
Expand Down
1 change: 1 addition & 0 deletions pkg/planner/core/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ go_library(
"//pkg/parser/terror",
"//pkg/parser/types",
"//pkg/planner/cardinality",
"//pkg/planner/cascades",
"//pkg/planner/cascades/base",
"//pkg/planner/core/base",
"//pkg/planner/core/cost",
Expand Down
38 changes: 31 additions & 7 deletions pkg/planner/core/optimizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/pingcap/tidb/pkg/parser/auth"
pmodel "github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/planner/cascades"
"github.com/pingcap/tidb/pkg/planner/core/base"
"github.com/pingcap/tidb/pkg/planner/core/operator/logicalop"
"github.com/pingcap/tidb/pkg/planner/core/operator/physicalop"
Expand Down Expand Up @@ -275,18 +276,41 @@ func CascadesOptimize(ctx context.Context, sctx base.PlanContext, flag uint64, l
if !AllowCartesianProduct.Load() && existsCartesianProduct(logic) {
return nil, nil, 0, errors.Trace(plannererrors.ErrCartesianProductUnsupported)
}
planCounter := base.PlanCounterTp(sessVars.StmtCtx.StmtHints.ForceNthPlan)
if planCounter == 0 {
planCounter = -1
}
// todo: add cascadesOptimize(logic)

physical, cost, err := physicalOptimize(logic, &planCounter)
var cas *cascades.Optimizer
if cas, err = cascades.NewCascades(logic); err == nil {
defer cas.Destroy()
err = cas.Execute()
}
if err != nil {
return nil, nil, 0, err
}
var (
physical base.PhysicalPlan
cost = math.MaxFloat64
)
// At current phase, cascades just iterate every logic plan out for feeding physicalOptimize.
// TODO: In the near future, physicalOptimize will be refactored as receiving *Group as param directly.
cas.GetMemo().NewIterator().Each(func(oneLogic base.LogicalPlan) bool {
AilinKid marked this conversation as resolved.
Show resolved Hide resolved
planCounter := base.PlanCounterTp(sessVars.StmtCtx.StmtHints.ForceNthPlan)
if planCounter == 0 {
planCounter = -1
}
tmpPhysical, tmpCost, tmpErr := physicalOptimize(oneLogic, &planCounter)
if tmpErr != nil {
err = tmpErr
return false
}
if tmpCost < cost {
physical = tmpPhysical
}
return true
})
if err != nil {
return nil, nil, 0, err
}
finalPlan := postOptimize(ctx, sctx, physical)

finalPlan := postOptimize(ctx, sctx, physical)
if sessVars.StmtCtx.EnableOptimizerCETrace {
refineCETrace(sctx)
}
Expand Down
Loading