Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner: add cascades wrapper for integrating cascades into current logical optimization phase. #58751

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions pkg/planner/cascades/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "cascades",
srcs = ["cascades.go"],
importpath = "github.com/pingcap/tidb/pkg/planner/cascades",
visibility = ["//visibility:public"],
deps = [
"//pkg/planner/cascades/base",
"//pkg/planner/cascades/base/cascadesctx",
"//pkg/planner/cascades/memo",
"//pkg/planner/cascades/task",
"//pkg/planner/core/base",
"//pkg/util/intest",
],
)

go_test(
name = "cascades_test",
timeout = "short",
srcs = ["cascades_test.go"],
flaky = True,
deps = ["//pkg/testkit"],
)
105 changes: 105 additions & 0 deletions pkg/planner/cascades/cascades.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cascades

import (
"github.com/pingcap/tidb/pkg/planner/cascades/base"
"github.com/pingcap/tidb/pkg/planner/cascades/base/cascadesctx"
"github.com/pingcap/tidb/pkg/planner/cascades/memo"
"github.com/pingcap/tidb/pkg/planner/cascades/task"
base2 "github.com/pingcap/tidb/pkg/planner/core/base"
AilinKid marked this conversation as resolved.
Show resolved Hide resolved
"github.com/pingcap/tidb/pkg/util/intest"
)

// Cascades is a basic cascades search framework portal, drove by CascadesContext.
type Cascades struct {
AilinKid marked this conversation as resolved.
Show resolved Hide resolved
logic base2.LogicalPlan
ctx cascadesctx.CascadesContext
AilinKid marked this conversation as resolved.
Show resolved Hide resolved
}

// NewCascades return a new cascades obj for logical alternative searching.
func NewCascades(lp base2.LogicalPlan) (*Cascades, error) {
cas := &Cascades{
logic: lp,
ctx: NewContext(lp.SCtx()),
}
ge, err := cas.ctx.GetMemo().Init(lp)
intest.Assert(err == nil)
intest.Assert(ge != nil)
if err != nil {
return nil, err
}
cas.ctx.GetScheduler().PushTask(task.NewOptGroupTask(cas.ctx, ge.GetGroup()))
return cas, err
}

// Execute run the yams search flow inside, returns error if it happened.
func (c *Cascades) Execute() error {
return c.ctx.GetScheduler().ExecuteTasks()
}

// Destroy clean and reset basic elements inside.
func (c *Cascades) Destroy() {
c.ctx.Destroy()
}

// GetMemo returns the memo structure inside cascades.
func (c *Cascades) GetMemo() *memo.Memo {
return c.ctx.GetMemo()
}

// Context includes all the context stuff when go through memo optimizing.
type Context struct {
// pctx variable awareness.
pctx base2.PlanContext
// memo management.
mm *memo.Memo
// task pool management.
scheduler base.Scheduler
}

// NewContext returns a new memo context responsible for manage all the stuff in cascades opt.
func NewContext(pctx base2.PlanContext) *Context {
return &Context{
pctx: pctx,
// memo init with capacity.
mm: memo.NewMemo(pctx.GetSessionVars().StmtCtx.OperatorNum),
// task pool management.
scheduler: task.NewSimpleTaskScheduler(),
}
}

// Destroy the memo context, which will clean the resource allocated during this phase.
func (c *Context) Destroy() {
// when a memo optimizing phase is done for a session,
// we should put the stack back and clean the memo.
c.mm.Destroy()
c.scheduler.Destroy()
}

// GetScheduler return the stack inside this memo context.
func (c *Context) GetScheduler() base.Scheduler {
return c.scheduler
}

// PushTask puts a task into the stack structure inside.
func (c *Context) PushTask(task base.Task) {
c.scheduler.PushTask(task)
}

// GetMemo returns the basic memo structure.
func (c *Context) GetMemo() *memo.Memo {
return c.mm
}
38 changes: 38 additions & 0 deletions pkg/planner/cascades/cascades_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cascades_test

import (
"testing"

"github.com/pingcap/tidb/pkg/testkit"
)

func TestCascadesDrive(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.Session().GetSessionVars().SetEnableCascadesPlanner(true)

tk.MustExec("use test")
tk.MustExec("drop table if exists t1")
tk.MustExec("create table t1(a int not null, b int not null, key(a,b))")
tk.MustExec("insert into t1 values(1,1),(1,2),(2,1),(2,2),(1,1)")

// simple select for quick debug of memo, the normal test case is in tests/planner/cascades/integration.test.
tk.MustQuery("select 1").Check(testkit.Rows("1"))
tk.MustQuery("explain select 1").Check(testkit.Rows(""+
"Projection_3 1.00 root 1->Column#1",
"└─TableDual_4 1.00 root rows:1"))
}
30 changes: 30 additions & 0 deletions pkg/planner/cascades/memo/memo.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,17 @@ func NewMemo(caps ...uint64) *Memo {
}
}

// Destroy indicates that when stack itself is useless like in the end of optimizing phase, we can destroy ourselves.
func (mm *Memo) Destroy() {
// when a memo itself is useless, we can clean itself actively.
mm.groupIDGen.id = 0
mm.rootGroup = nil
mm.groups.Init()
clear(mm.groupID2Group)
mm.hash2GlobalGroupExpr.Clear()
mm.hasher.Reset()
}

// GetHasher gets a hasher from the memo that ready to use.
func (mm *Memo) GetHasher() base2.Hasher {
mm.hasher.Reset()
Expand Down Expand Up @@ -329,6 +340,25 @@ type IteratorLP struct {
traceID int
}

// NewIterator new a logical plan iterator from current memo based on its root group.
func (mm *Memo) NewIterator() *IteratorLP {
return &IteratorLP{
root: mm.rootGroup,
stackInfo: make([]*list.Element, 0, mm.groups.Len()),
traceID: -1,
}
}

// Each iterator all logical plan from current memo group.
func (it *IteratorLP) Each(f func(base.LogicalPlan) bool) {
cur := it.Next()
for ; cur != nil; cur = it.Next() {
if !f(cur) {
break
}
}
}

// Next return valid logical plan implied in memo without duplication.
func (it *IteratorLP) Next() (logic base.LogicalPlan) {
for {
Expand Down
1 change: 1 addition & 0 deletions pkg/planner/core/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ go_library(
"//pkg/parser/terror",
"//pkg/parser/types",
"//pkg/planner/cardinality",
"//pkg/planner/cascades",
"//pkg/planner/cascades/base",
"//pkg/planner/core/base",
"//pkg/planner/core/cost",
Expand Down
36 changes: 29 additions & 7 deletions pkg/planner/core/optimizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/pingcap/tidb/pkg/parser/auth"
pmodel "github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/planner/cascades"
"github.com/pingcap/tidb/pkg/planner/core/base"
"github.com/pingcap/tidb/pkg/planner/core/operator/logicalop"
"github.com/pingcap/tidb/pkg/planner/core/operator/physicalop"
Expand Down Expand Up @@ -275,18 +276,39 @@ func CascadesOptimize(ctx context.Context, sctx base.PlanContext, flag uint64, l
if !AllowCartesianProduct.Load() && existsCartesianProduct(logic) {
return nil, nil, 0, errors.Trace(plannererrors.ErrCartesianProductUnsupported)
}
planCounter := base.PlanCounterTp(sessVars.StmtCtx.StmtHints.ForceNthPlan)
if planCounter == 0 {
planCounter = -1
}
// todo: add cascadesOptimize(logic)

physical, cost, err := physicalOptimize(logic, &planCounter)
var cas *cascades.Cascades
if cas, err = cascades.NewCascades(logic); err == nil {
defer cas.Destroy()
err = cas.Execute()
}
if err != nil {
return nil, nil, 0, err
}
var (
physical base.PhysicalPlan
cost = math.MaxFloat64
)
cas.GetMemo().NewIterator().Each(func(oneLogic base.LogicalPlan) bool {
AilinKid marked this conversation as resolved.
Show resolved Hide resolved
planCounter := base.PlanCounterTp(sessVars.StmtCtx.StmtHints.ForceNthPlan)
if planCounter == 0 {
planCounter = -1
}
tmpPhysical, tmpCost, tmpErr := physicalOptimize(oneLogic, &planCounter)
if tmpErr != nil {
err = tmpErr
return false
}
if tmpCost < cost {
physical = tmpPhysical
}
return true
})
if err != nil {
return nil, nil, 0, err
}
finalPlan := postOptimize(ctx, sctx, physical)

finalPlan := postOptimize(ctx, sctx, physical)
if sessVars.StmtCtx.EnableOptimizerCETrace {
refineCETrace(sctx)
}
Expand Down
Loading