Skip to content

Commit

Permalink
[Enhancement] Reduce fragment plan size (StarRocks#29719)
Browse files Browse the repository at this point in the history
* [Enhancement] Reduce fragment plan size

Signed-off-by: Seaven <seaven_7@qq.com>
  • Loading branch information
Seaven authored Aug 24, 2023
1 parent f28b264 commit 1b32651
Show file tree
Hide file tree
Showing 11 changed files with 91 additions and 81 deletions.
10 changes: 7 additions & 3 deletions be/src/runtime/descriptors.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ SlotDescriptor::SlotDescriptor(SlotId id, std::string name, TypeDescriptor type)
_slot_idx(0),
_slot_size(_type.get_slot_size()),
_is_materialized(false),
_is_output_column(false) {}
_is_output_column(false),
_is_nullable(true) {}

SlotDescriptor::SlotDescriptor(const TSlotDescriptor& tdesc)
: _id(tdesc.id),
Expand All @@ -79,7 +80,8 @@ SlotDescriptor::SlotDescriptor(const TSlotDescriptor& tdesc)
_slot_idx(tdesc.slotIdx),
_slot_size(_type.get_slot_size()),
_is_materialized(tdesc.isMaterialized),
_is_output_column(tdesc.__isset.isOutputColumn ? tdesc.isOutputColumn : true) {}
_is_output_column(tdesc.__isset.isOutputColumn ? tdesc.isOutputColumn : true),
_is_nullable(tdesc.__isset.isNullable ? tdesc.isNullable : true) {}

SlotDescriptor::SlotDescriptor(const PSlotDescriptor& pdesc)
: _id(pdesc.id()),
Expand All @@ -90,7 +92,9 @@ SlotDescriptor::SlotDescriptor(const PSlotDescriptor& pdesc)
_slot_idx(pdesc.slot_idx()),
_slot_size(_type.get_slot_size()),
_is_materialized(pdesc.is_materialized()),
_is_output_column(true) {}
_is_output_column(true),
// keep same as is_nullable()
_is_nullable(_null_indicator_offset.bit_mask != 0) {}

void SlotDescriptor::to_protobuf(PSlotDescriptor* pslot) const {
pslot->set_id(_id);
Expand Down
3 changes: 3 additions & 0 deletions be/src/runtime/descriptors.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,9 @@ class SlotDescriptor {
const bool _is_materialized;
const bool _is_output_column;

// @todo: replace _null_indicator_offset when remove _null_indicator_offset
const bool _is_nullable;

SlotDescriptor(const TSlotDescriptor& tdesc);
SlotDescriptor(const PSlotDescriptor& pdesc);
};
Expand Down
35 changes: 16 additions & 19 deletions fe/fe-core/src/main/java/com/starrocks/analysis/SlotDescriptor.java
Original file line number Diff line number Diff line change
Expand Up @@ -245,28 +245,25 @@ public TSlotDescriptor toThrift() {
nullIndicatorBit = -1;
}
Preconditions.checkState(isMaterialized, "isMaterialized must be true");

TSlotDescriptor tSlotDescriptor = new TSlotDescriptor();
tSlotDescriptor.setId(id.asInt());
tSlotDescriptor.setParent(parent.getId().asInt());
if (originType != null) {
TSlotDescriptor tSlotDescriptor = new TSlotDescriptor(id.asInt(), parent.getId().asInt(), originType.toThrift(), -1,
-1, -1,
nullIndicatorBit, ((column != null) ? column.getName() : ""),
-1, true);
tSlotDescriptor.setIsOutputColumn(isOutputColumn);
return tSlotDescriptor;
tSlotDescriptor.setSlotType(originType.toThrift());
} else {
/**
* Refer to {@link Expr#treeToThrift}
*/
if (type.isNull()) {
type = ScalarType.BOOLEAN;
}
TSlotDescriptor tSlotDescriptor = new TSlotDescriptor(id.asInt(), parent.getId().asInt(), type.toThrift(), -1,
-1, -1,
nullIndicatorBit, ((column != null) ? column.getName() : ""),
-1, true);
tSlotDescriptor.setIsOutputColumn(isOutputColumn);
return tSlotDescriptor;
type = type.isNull() ? ScalarType.BOOLEAN : type;
tSlotDescriptor.setSlotType(type.toThrift());
}
tSlotDescriptor.setColumnPos(-1);
tSlotDescriptor.setByteOffset(-1);
tSlotDescriptor.setNullIndicatorByte(-1);
tSlotDescriptor.setNullIndicatorBit(nullIndicatorBit);
tSlotDescriptor.setColName(((column != null) ? column.getName() : ""));
tSlotDescriptor.setSlotIdx(-1);
tSlotDescriptor.setIsMaterialized(true);
tSlotDescriptor.setIsOutputColumn(isOutputColumn);
tSlotDescriptor.setIsNullable(isNullable);
return tSlotDescriptor;
}

public String debugString() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,10 @@ public String getAlias() {
}

public TTupleDescriptor toThrift() {
TTupleDescriptor ttupleDesc = new TTupleDescriptor(id.asInt(), -1, -1);
TTupleDescriptor ttupleDesc = new TTupleDescriptor();
ttupleDesc.setId(id.asInt());
ttupleDesc.setByteSize(-1);
ttupleDesc.setNumNullBytes(-1);
ttupleDesc.setNumNullSlots(-1);
if (table != null && table.getId() >= 0) {
ttupleDesc.setTableId((int) table.getId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

import java.util.Collections;
import java.util.List;
import javax.annotation.Nullable;

import static java.util.Objects.requireNonNull;

Expand Down Expand Up @@ -63,20 +62,6 @@ public boolean isConstant() {
return true;
}

public static boolean isTrue(@Nullable ScalarOperator op) {
if (op == null) {
return false;
}
return op.isTrue();
}

public static boolean isFalse(@Nullable ScalarOperator op) {
if (op == null) {
return false;
}
return op.isFalse();
}

public boolean isTrue() {
return this.equals(ConstantOperator.TRUE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.starrocks.sql.optimizer.operator.scalar.ColumnRefOperator;
import com.starrocks.sql.optimizer.operator.scalar.ScalarOperator;
import com.starrocks.sql.optimizer.rewrite.ReplaceColumnRefRewriter;
import com.starrocks.sql.optimizer.rewrite.ScalarOperatorRewriter;
import com.starrocks.sql.optimizer.rule.RuleType;

import java.util.List;
Expand All @@ -42,10 +43,16 @@ public List<OptExpression> transform(OptExpression input, OptimizerContext conte
LogicalProjectOperator firstProject = (LogicalProjectOperator) input.getOp();
LogicalProjectOperator secondProject = (LogicalProjectOperator) input.getInputs().get(0).getOp();

ScalarOperatorRewriter scalarRewriter = new ScalarOperatorRewriter();
ReplaceColumnRefRewriter rewriter = new ReplaceColumnRefRewriter(secondProject.getColumnRefMap());
Map<ColumnRefOperator, ScalarOperator> resultMap = Maps.newHashMap();
for (Map.Entry<ColumnRefOperator, ScalarOperator> entry : firstProject.getColumnRefMap().entrySet()) {
resultMap.put(entry.getKey(), rewriter.rewrite(entry.getValue()));
ScalarOperator result = rewriter.rewrite(entry.getValue());
if (result.isConstant()) {
// better to rewrite all expression, but it's unnecessary
result = scalarRewriter.rewrite(result, ScalarOperatorRewriter.DEFAULT_REWRITE_RULES);
}
resultMap.put(entry.getKey(), result);
}

// ASSERT_TRUE must be executed in the runtime, so it should be kept anyway.
Expand Down
22 changes: 11 additions & 11 deletions fe/fe-core/src/test/java/com/starrocks/sql/plan/InsertPlanTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -176,11 +176,11 @@ public void testInsertMvCount() throws Exception {

explainString = getInsertExecPlan("insert into test_insert_mv_count(v1) values(1)");
Assert.assertTrue(explainString.contains("OUTPUT EXPRS:1: column_0 | 2: expr | 3: expr | 4: if"));
Assert.assertTrue(explainString.contains(
Assert.assertTrue(explainString, explainString.contains(
" | <slot 1> : 1: column_0\n" +
" | <slot 2> : NULL\n" +
" | <slot 3> : NULL\n" +
" | <slot 4> : if(NULL IS NULL, 0, 1)"));
" | <slot 4> : 0"));

explainString = getInsertExecPlan("insert into test_insert_mv_count(v3,v1) values(3,1)");

Expand All @@ -189,16 +189,16 @@ public void testInsertMvCount() throws Exception {
" | <slot 1> : 1: column_0\n" +
" | <slot 2> : 2: column_1\n" +
" | <slot 3> : NULL\n" +
" | <slot 4> : if(NULL IS NULL, 0, 1)"));
" | <slot 4> : 0"));

explainString = getInsertExecPlan("insert into test_insert_mv_count select 1,2,3");
Assert.assertTrue(explainString.contains("OUTPUT EXPRS:6: v1 | 7: v2 | 8: v3 | 5: if"));
Assert.assertTrue(explainString.contains(
"1:Project\n" +
" | <slot 5> : if(2 IS NULL, 0, 1)\n" +
" | <slot 6> : CAST(1 AS BIGINT)\n" +
" | <slot 7> : CAST(2 AS BIGINT)\n" +
" | <slot 8> : CAST(3 AS BIGINT)"));
" | <slot 5> : 1\n" +
" | <slot 6> : 1\n" +
" | <slot 7> : 2\n" +
" | <slot 8> : 3"));

starRocksAssert.dropTable("test_insert_mv_count");
}
Expand Down Expand Up @@ -249,7 +249,7 @@ public void testInsertFromTable() throws Exception {
" 1:Project\n" +
" | <slot 1> : 1: v1\n" +
" | <slot 6> : to_bitmap(1: v1)\n" +
" | <slot 7> : CAST(2 AS BIGINT)\n" +
" | <slot 7> : 2\n" +
" | <slot 8> : NULL\n"));

explainString = getInsertExecPlan("insert into ti2 select * from ti2");
Expand Down Expand Up @@ -740,7 +740,7 @@ public void testInsertSelectWithConstant() throws Exception {
" RANDOM\n" +
"\n" +
" 1:Project\n" +
" | <slot 7> : CAST(1 AS BIGINT)\n" +
" | <slot 7> : 1\n" +
" | <slot 8> : NULL\n" +
" | <slot 9> : NULL"));
}
Expand Down Expand Up @@ -864,8 +864,8 @@ public void testInsertIcebergTableSink() throws Exception {
" RANDOM\n" +
"\n" +
" 1:Project\n" +
" | <slot 4> : CAST(1 AS INT)\n" +
" | <slot 5> : CAST(2 AS INT)\n" +
" | <slot 4> : 1\n" +
" | <slot 5> : 2\n" +
" | \n" +
" 0:UNION\n" +
" constant exprs: \n" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ public void testJoinAssociativityConst() throws Exception {
assertContains(plan, " 1:Project\n" +
" | <slot 1> : 1: v1\n" +
" | <slot 4> : 49\n" +
" | <slot 26> : CAST(49 AS VARCHAR(1048576))\n" +
" | <slot 26> : '49'\n" +
" | \n" +
" 0:OlapScanNode");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,36 +33,36 @@ public void testSelectConst() throws Exception {
" constant exprs: \n" +
" NULL");
assertPlanContains("select v1,v2 from t0 union all select 1,2", " 4:Project\n" +
" | <slot 7> : CAST(1 AS BIGINT)\n" +
" | <slot 8> : CAST(2 AS BIGINT)\n" +
" | <slot 7> : 1\n" +
" | <slot 8> : 2\n" +
" | \n" +
" 3:UNION\n" +
" constant exprs: \n" +
" NULL");
assertPlanContains("select v1,v2 from t0 union select 1,2", " 4:Project\n" +
" | <slot 7> : CAST(1 AS BIGINT)\n" +
" | <slot 8> : CAST(2 AS BIGINT)\n" +
" | <slot 7> : 1\n" +
" | <slot 8> : 2\n" +
" | \n" +
" 3:UNION\n" +
" constant exprs: \n" +
" NULL");
assertPlanContains("select v1,v2 from t0 except select 1,2", "EXCEPT", " 4:Project\n" +
" | <slot 7> : CAST(1 AS BIGINT)\n" +
" | <slot 8> : CAST(2 AS BIGINT)\n" +
" | <slot 7> : 1\n" +
" | <slot 8> : 2\n" +
" | \n" +
" 3:UNION\n" +
" constant exprs: \n" +
" NULL");
assertPlanContains("select v1,v2 from t0 intersect select 1,2", "INTERSECT", " 4:Project\n" +
" | <slot 7> : CAST(1 AS BIGINT)\n" +
" | <slot 8> : CAST(2 AS BIGINT)\n" +
" | <slot 7> : 1\n" +
" | <slot 8> : 2\n" +
" | \n" +
" 3:UNION\n" +
" constant exprs: \n" +
" NULL");
assertPlanContains("select v1,v2,b from t0 inner join (select 1 as a,2 as b) t on v1 = a", " 1:Project\n" +
" | <slot 6> : 2\n" +
" | <slot 7> : CAST(1 AS BIGINT)\n" +
" | <slot 7> : 1\n" +
" | \n" +
" 0:UNION\n" +
" constant exprs: \n" +
Expand Down Expand Up @@ -103,9 +103,9 @@ public void testAggWithConstant() throws Exception {

@Test
public void testSubquery() throws Exception {
assertPlanContains("select * from t0 where v3 in (select 2)", "LEFT SEMI JOIN", "<slot 7> : CAST(2 AS BIGINT)");
assertPlanContains("select * from t0 where v3 in (select 2)", "RIGHT SEMI JOIN", "<slot 7> : 2");
assertPlanContains("select * from t0 where v3 not in (select 2)", "NULL AWARE LEFT ANTI JOIN",
"<slot 7> : CAST(2 AS BIGINT)");
"<slot 7> : 2");
assertPlanContains("select * from t0 where exists (select 9)", " 1:UNION\n" +
" constant exprs: \n" +
" NULL");
Expand Down
18 changes: 14 additions & 4 deletions fe/fe-core/src/test/java/com/starrocks/sql/plan/SetTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,10 @@ public void testUnionAllConst() throws Exception {
String sql = "select b from (select t1a as a, t1b as b, t1c as c, t1d as d from test_all_type " +
"union all select 1 as a, 2 as b, 3 as c, 4 as d) t1;";
String plan = getThriftPlan(sql);
Assert.assertTrue(plan.contains(
"TExprNode(node_type:INT_LITERAL, type:TTypeDesc(types:[TTypeNode(type:SCALAR, " +
"scalar_type:TScalarType(type:TINYINT))]), num_children:0, int_literal:TIntLiteral(value:2)," +
" output_scale:-1, has_nullable_child:false, is_nullable:false, is_monotonic:true"));
assertContains(plan, "[TExprNode(node_type:INT_LITERAL, type:TTypeDesc(types:" +
"[TTypeNode(type:SCALAR, scalar_type:TScalarType(type:SMALLINT))])," +
" num_children:0, int_literal:TIntLiteral(value:2), output_scale:-1, " +
"has_nullable_child:false, is_nullable:false, is_monotonic:true)]");
}

@Test
Expand Down Expand Up @@ -592,4 +592,14 @@ public void testUnionNull() throws Exception {
"WHERE NULL";
getThriftPlan(sql);
}

@Test
public void testCast() throws Exception {
String sql = "select * from t0 union all select 1, 1, 1 from t1";
String plan = getFragmentPlan(sql);
assertContains(plan, " 4:Project\n" +
" | <slot 10> : 1\n" +
" | <slot 11> : 1\n" +
" | <slot 12> : 1");
}
}
31 changes: 16 additions & 15 deletions gensrc/thrift/Descriptors.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -39,25 +39,26 @@ include "Types.thrift"
include "Exprs.thrift"

struct TSlotDescriptor {
1: required Types.TSlotId id
2: required Types.TTupleId parent
3: required Types.TTypeDesc slotType
4: required i32 columnPos // in originating table
5: required i32 byteOffset // into tuple
6: required i32 nullIndicatorByte
7: required i32 nullIndicatorBit
8: required string colName;
9: required i32 slotIdx
10: required bool isMaterialized
11: optional bool isOutputColumn
1: optional Types.TSlotId id
2: optional Types.TTupleId parent
3: optional Types.TTypeDesc slotType
4: optional i32 columnPos // Deprecated
5: optional i32 byteOffset // Deprecated
6: optional i32 nullIndicatorByte // Deprecated
7: optional i32 nullIndicatorBit // Deprecated
8: optional string colName;
9: optional i32 slotIdx // Deprecated
10: optional bool isMaterialized // Deprecated
11: optional bool isOutputColumn // Deprecated
12: optional bool isNullable // replace nullIndicatorBit & nullIndicatorByte
}

struct TTupleDescriptor {
1: required Types.TTupleId id
2: required i32 byteSize
3: required i32 numNullBytes
1: optional Types.TTupleId id
2: optional i32 byteSize // Deprecated
3: optional i32 numNullBytes // Deprecated
4: optional Types.TTableId tableId
5: optional i32 numNullSlots
5: optional i32 numNullSlots // Deprecated
}

enum THdfsFileFormat {
Expand Down

0 comments on commit 1b32651

Please sign in to comment.