Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/pb/plan/plan.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

64 changes: 64 additions & 0 deletions pkg/sql/plan/explain/explain_cost_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright 2025 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package explain

import (
"bytes"
"context"
"strings"
"testing"

planpb "github.com/matrixorigin/matrixone/pkg/pb/plan"
)

func TestCostDescribeImpl_IncludesRowsizeWhenPositive(t *testing.T) {
stats := &planpb.Stats{
Cost: 10,
Outcnt: 5,
Selectivity: 0.5,
Dop: 2,
BlockNum: 3,
Rowsize: 128.0,
}
impl := &CostDescribeImpl{Stats: stats}
buf := new(bytes.Buffer)
if err := impl.GetDescription(context.Background(), NewExplainDefaultOptions(), buf); err != nil {
t.Fatalf("GetDescription error: %v", err)
}
got := buf.String()
if !strings.Contains(got, "rowsize=128.00") {
t.Fatalf("expected rowsize to be printed, got: %s", got)
}
}

func TestCostDescribeImpl_OmitsRowsizeWhenZero(t *testing.T) {
stats := &planpb.Stats{
Cost: 1,
Outcnt: 1,
Selectivity: 1,
Dop: 1,
BlockNum: 1,
Rowsize: 0,
}
impl := &CostDescribeImpl{Stats: stats}
buf := new(bytes.Buffer)
if err := impl.GetDescription(context.Background(), NewExplainDefaultOptions(), buf); err != nil {
t.Fatalf("GetDescription error: %v", err)
}
got := buf.String()
if strings.Contains(got, "rowsize=") {
t.Fatalf("did not expect rowsize to be printed when zero, got: %s", got)
}
}
6 changes: 5 additions & 1 deletion pkg/sql/plan/explain/explain_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -1103,11 +1103,15 @@ func (c *CostDescribeImpl) GetDescription(ctx context.Context, options *ExplainO
if c.Stats.HashmapStats != nil && c.Stats.HashmapStats.HashmapSize > 1 {
hashmapSizeStr = " hashmapSize=" + strconv.FormatFloat(c.Stats.HashmapStats.HashmapSize, 'f', 2, 64)
}
var rowsizeStr string
if c.Stats.Rowsize > 0 {
rowsizeStr = " rowsize=" + strconv.FormatFloat(c.Stats.Rowsize, 'f', 2, 64)
}
buf.WriteString(" (cost=" + strconv.FormatFloat(c.Stats.Cost, 'f', 2, 64) +
" outcnt=" + strconv.FormatFloat(c.Stats.Outcnt, 'f', 2, 64) +
" selectivity=" + strconv.FormatFloat(c.Stats.Selectivity, 'f', 4, 64) +
" dop=" + strconv.FormatInt(int64(c.Stats.Dop), 10) +
blockNumStr + hashmapSizeStr + ")")
blockNumStr + hashmapSizeStr + rowsizeStr + ")")
}
return nil
}
Expand Down
72 changes: 64 additions & 8 deletions pkg/sql/plan/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,15 @@ const highNDVcolumnThreshHold = 0.95
const statsCacheInitSize = 128
const statsCacheMaxSize = 8192

// RowSizeThreshold Regardless of the table,
// the minimum row size is 100.
// However, due to inaccurate statistical information,
// the RowSizeThreshold is tentatively set at 128,
// and it is only used for tables with vector indexes
const RowSizeThreshold = 128
const LargeBlockThresholdForOneCN = 4
const LargeBlockThresholdForMultiCN = 32

// for test
var ForceScanOnMultiCN atomic.Bool

Expand Down Expand Up @@ -1394,6 +1403,19 @@ func calcScanStats(node *plan.Node, builder *QueryBuilder) *plan.Stats {
stats.Outcnt = stats.Selectivity * stats.TableCnt
stats.Cost = stats.TableCnt * blockSel
stats.BlockNum = int32(float64(s.BlockNumber)*blockSel) + 1
// estimate average row size from collected table stats: sum(SizeMap)/TableCnt
// SizeMap stores approximate persisted bytes per column; divide by total rows to get bytes/row
{
var totalSize uint64
for _, v := range s.SizeMap {
totalSize += v
}
if stats.TableCnt > 0 {
stats.Rowsize = float64(totalSize) / stats.TableCnt
} else {
stats.Rowsize = 0
}
}
return stats
}

Expand Down Expand Up @@ -1607,18 +1629,42 @@ func HasShuffleInPlan(qry *plan.Query) bool {
return false
}

func calcDOP(ncpu, blocks int32, isPrepare bool) int32 {
if ncpu <= 0 || blocks <= 16 {
// dop tuning constants
const (
// base block-to-core mapping for dop estimation
dopBlocksBaseUnit int32 = 16 // default: every ~16 blocks add a core
dopBlocksPrepareUnit int32 = 64 // prepare: more conservative
)

func calcDOP(ncpu int32, stats *plan.Stats, isPrepare bool) int32 {
if ncpu <= 0 {
return 1
}
ret := blocks/16 + 1

baseUnit := dopBlocksBaseUnit
if isPrepare {
ret = blocks/64 + 1
baseUnit = dopBlocksPrepareUnit
}
if ret <= ncpu {
return ret

blocks := stats.BlockNum
var ret int32 = 1
if blocks > 0 {
ret = blocks/baseUnit + 1
}
return ncpu

rs := stats.Rowsize
if rs >= RowSizeThreshold {
// very wide rows: be aggressive
ret = stats.BlockNum
}

if ret > ncpu {
ret = ncpu
}
if ret < 1 {
ret = 1
}
return ret
}

// set node dop and left child recursively
Expand Down Expand Up @@ -1652,7 +1698,7 @@ func CalcNodeDOP(p *plan.Plan, rootID int32, ncpu int32, lencn int) {
setNodeDOP(p, rootID, dop)
}
} else {
node.Stats.Dop = calcDOP(ncpu, node.Stats.BlockNum, p.IsPrepare)
node.Stats.Dop = calcDOP(ncpu, node.Stats, p.IsPrepare)
}
}

Expand Down Expand Up @@ -1696,6 +1742,16 @@ func GetExecType(qry *plan.Query, txnHaveDDL bool, isPrepare bool) ExecType {
ret = ExecTypeAP_ONECN
}
}
if node.NodeType == plan.Node_TABLE_SCAN &&
// due to the inaccuracy of stats.Rowsize, currently only vector index tables are supported
(node.TableDef.TableType == catalog.SystemSI_IVFFLAT_TblType_Entries || node.TableDef.TableType == catalog.Hnsw_TblType_Storage) &&
stats.Rowsize > RowSizeThreshold &&
stats.BlockNum > LargeBlockThresholdForOneCN {
ret = ExecTypeAP_ONECN
if stats.BlockNum > LargeBlockThresholdForMultiCN {
ret = ExecTypeAP_MULTICN
}
}
if node.NodeType != plan.Node_TABLE_SCAN && stats.HashmapStats != nil && stats.HashmapStats.Shuffle {
ret = ExecTypeAP_ONECN
}
Expand Down
63 changes: 63 additions & 0 deletions pkg/sql/plan/stats_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright 2025 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package plan

import (
"testing"

"github.com/matrixorigin/matrixone/pkg/catalog"
planpb "github.com/matrixorigin/matrixone/pkg/pb/plan"
)

func makeQueryWithScan(tableType string, rowsize float64, blockNum int32) *planpb.Query {
n := &planpb.Node{
NodeType: planpb.Node_TABLE_SCAN,
TableDef: &planpb.TableDef{TableType: tableType},
Stats: &planpb.Stats{
Rowsize: rowsize,
BlockNum: blockNum,
},
}
return &planpb.Query{
Nodes: []*planpb.Node{n},
Steps: []int32{0},
}
}

func TestGetExecType_VectorIndex_WideRows_OneCN(t *testing.T) {
// rowsize just above threshold, blockNum between oneCN and multiCN thresholds
q := makeQueryWithScan(catalog.SystemSI_IVFFLAT_TblType_Entries, float64(RowSizeThreshold+1), LargeBlockThresholdForOneCN+1)
got := GetExecType(q, false, false)
if got != ExecTypeAP_ONECN {
t.Fatalf("expected ExecTypeAP_ONECN, got %v", got)
}
}

func TestGetExecType_VectorIndex_WideRows_MultiCN(t *testing.T) {
q := makeQueryWithScan(catalog.Hnsw_TblType_Storage, float64(RowSizeThreshold+1), LargeBlockThresholdForMultiCN+1)
got := GetExecType(q, false, false)
if got != ExecTypeAP_MULTICN {
t.Fatalf("expected ExecTypeAP_MULTICN, got %v", got)
}
}

func TestGetExecType_NonVectorTable_NotForcedByRowsize(t *testing.T) {
// Non-vector tables should not trigger rowsize shortcut; with small blockNum, expect TP
q := makeQueryWithScan("normal_table", float64(RowSizeThreshold+10), LargeBlockThresholdForOneCN)
got := GetExecType(q, false, false)
if got != ExecTypeTP {
t.Fatalf("expected ExecTypeTP for non-vector table, got %v", got)
}
}
2 changes: 1 addition & 1 deletion proto/plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ message Stats {
double cost = 2;
//number of output lines
double outcnt = 3;
// average size of one row, currently not used
// average size of one row
double rowsize = 4;
//for scan, this means total count of all table, before filtering
double table_cnt = 5;
Expand Down