Skip to content

Commit 3eea2b0

Browse files
jinchengchenghhzhejiangxiaomai
authored andcommitted
[OPPRO-309] Support if then in filter (#74)
1 parent b2bb38e commit 3eea2b0

File tree

4 files changed

+464
-38
lines changed

4 files changed

+464
-38
lines changed

velox/substrait/SubstraitToVeloxPlan.cpp

Lines changed: 44 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -703,7 +703,9 @@ core::PlanNodePtr SubstraitVeloxPlanConverter::toVeloxPlan(
703703
// Flatten the conditions connected with 'and'.
704704
std::vector<::substrait::Expression_ScalarFunction> scalarFunctions;
705705
std::vector<::substrait::Expression_SingularOrList> singularOrLists;
706-
flattenConditions(sRead.filter(), scalarFunctions, singularOrLists);
706+
std::vector<::substrait::Expression_IfThen> ifThens;
707+
flattenConditions(
708+
sRead.filter(), scalarFunctions, singularOrLists, ifThens);
707709

708710
std::unordered_map<uint32_t, std::shared_ptr<RangeRecorder>> rangeRecorders;
709711
for (uint32_t idx = 0; idx < veloxTypeList.size(); idx++) {
@@ -738,10 +740,18 @@ core::PlanNodePtr SubstraitVeloxPlanConverter::toVeloxPlan(
738740
// mark all filter as remaining filters.
739741
subfieldFilters.clear();
740742
remainingFilter = connectWithAnd(
741-
colNameList, veloxTypeList, scalarFunctions, singularOrLists);
743+
colNameList,
744+
veloxTypeList,
745+
scalarFunctions,
746+
singularOrLists,
747+
ifThens);
742748
} else {
743749
remainingFilter = connectWithAnd(
744-
colNameList, veloxTypeList, remainingFunctions, remainingrOrLists);
750+
colNameList,
751+
veloxTypeList,
752+
remainingFunctions,
753+
remainingrOrLists,
754+
ifThens);
745755
}
746756

747757
tableHandle = std::make_shared<connector::hive::HiveTableHandle>(
@@ -923,7 +933,8 @@ std::string SubstraitVeloxPlanConverter::nextPlanNodeId() {
923933
void SubstraitVeloxPlanConverter::flattenConditions(
924934
const ::substrait::Expression& substraitFilter,
925935
std::vector<::substrait::Expression_ScalarFunction>& scalarFunctions,
926-
std::vector<::substrait::Expression_SingularOrList>& singularOrLists) {
936+
std::vector<::substrait::Expression_SingularOrList>& singularOrLists,
937+
std::vector<::substrait::Expression_IfThen>& ifThens) {
927938
auto typeCase = substraitFilter.rex_type_case();
928939
switch (typeCase) {
929940
case ::substrait::Expression::RexTypeCase::kScalarFunction: {
@@ -934,7 +945,7 @@ void SubstraitVeloxPlanConverter::flattenConditions(
934945
if (subParser_->getSubFunctionName(filterNameSpec) == "and") {
935946
for (const auto& sCondition : sFunc.arguments()) {
936947
flattenConditions(
937-
sCondition.value(), scalarFunctions, singularOrLists);
948+
sCondition.value(), scalarFunctions, singularOrLists, ifThens);
938949
}
939950
} else {
940951
scalarFunctions.emplace_back(sFunc);
@@ -945,6 +956,10 @@ void SubstraitVeloxPlanConverter::flattenConditions(
945956
singularOrLists.emplace_back(substraitFilter.singular_or_list());
946957
break;
947958
}
959+
case ::substrait::Expression::RexTypeCase::kIfThen: {
960+
ifThens.emplace_back(substraitFilter.if_then());
961+
break;
962+
}
948963
default:
949964
VELOX_NYI("GetFlatConditions not supported for type '{}'", typeCase);
950965
}
@@ -1781,47 +1796,40 @@ core::TypedExprPtr SubstraitVeloxPlanConverter::connectWithAnd(
17811796
std::vector<std::string> inputNameList,
17821797
std::vector<TypePtr> inputTypeList,
17831798
const std::vector<::substrait::Expression_ScalarFunction>& scalarFunctions,
1784-
const std::vector<::substrait::Expression_SingularOrList>&
1785-
singularOrLists) {
1786-
if (scalarFunctions.size() == 0 && singularOrLists.size() == 0) {
1799+
const std::vector<::substrait::Expression_SingularOrList>& singularOrLists,
1800+
const std::vector<::substrait::Expression_IfThen>& ifThens) {
1801+
if (scalarFunctions.size() == 0 && singularOrLists.size() == 0 &&
1802+
ifThens.size() == 0) {
17871803
return nullptr;
17881804
}
17891805
auto inputType = ROW(std::move(inputNameList), std::move(inputTypeList));
17901806

17911807
// Filter for scalar functions.
1792-
std::shared_ptr<const core::ITypedExpr> scalarFilter = nullptr;
1793-
if (scalarFunctions.size() > 0) {
1794-
scalarFilter = exprConverter_->toVeloxExpr(scalarFunctions[0], inputType);
1795-
// Will connect multiple functions with AND.
1796-
uint32_t idx = 1;
1797-
while (idx < scalarFunctions.size()) {
1798-
scalarFilter = connectWithAnd(
1799-
scalarFilter,
1800-
exprConverter_->toVeloxExpr(scalarFunctions[idx], inputType));
1801-
idx += 1;
1808+
std::vector<std::shared_ptr<const core::ITypedExpr>> allFilters;
1809+
for (auto scalar : scalarFunctions) {
1810+
auto filter = exprConverter_->toVeloxExpr(scalar, inputType);
1811+
if (filter != nullptr) {
1812+
allFilters.emplace_back(filter);
18021813
}
18031814
}
1804-
1805-
// Filter for OrList.
1806-
std::shared_ptr<const core::ITypedExpr> orListFilter = nullptr;
1807-
if (singularOrLists.size() > 0) {
1808-
orListFilter = exprConverter_->toVeloxExpr(singularOrLists[0], inputType);
1809-
uint32_t idx = 1;
1810-
while (idx < singularOrLists.size()) {
1811-
orListFilter = connectWithAnd(
1812-
orListFilter,
1813-
exprConverter_->toVeloxExpr(singularOrLists[idx], inputType));
1814-
idx += 1;
1815+
for (auto orList : singularOrLists) {
1816+
auto filter = exprConverter_->toVeloxExpr(orList, inputType);
1817+
if (filter != nullptr) {
1818+
allFilters.emplace_back(filter);
18151819
}
18161820
}
1817-
1818-
VELOX_CHECK(
1819-
scalarFilter != nullptr || orListFilter != nullptr,
1820-
"One filter should be valid.");
1821-
if (scalarFilter != nullptr && orListFilter != nullptr) {
1822-
return connectWithAnd(scalarFilter, orListFilter);
1821+
for (auto ifThen : ifThens) {
1822+
auto filter = exprConverter_->toVeloxExpr(ifThen, inputType);
1823+
if (filter != nullptr) {
1824+
allFilters.emplace_back(filter);
1825+
}
1826+
}
1827+
VELOX_CHECK_GT(allFilters.size(), 0, "One filter should be valid.")
1828+
std::shared_ptr<const core::ITypedExpr> andFilter = allFilters[0];
1829+
for (auto i = 1; i < allFilters.size(); i++) {
1830+
andFilter = connectWithAnd(andFilter, allFilters[i]);
18231831
}
1824-
return scalarFilter ? scalarFilter : orListFilter;
1832+
return andFilter;
18251833
}
18261834

18271835
core::TypedExprPtr SubstraitVeloxPlanConverter::connectWithAnd(

velox/substrait/SubstraitToVeloxPlan.h

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,8 @@ class SubstraitVeloxPlanConverter {
137137
void flattenConditions(
138138
const ::substrait::Expression& sFilter,
139139
std::vector<::substrait::Expression_ScalarFunction>& scalarFunctions,
140-
std::vector<::substrait::Expression_SingularOrList>& singularOrLists);
140+
std::vector<::substrait::Expression_SingularOrList>& singularOrLists,
141+
std::vector<::substrait::Expression_IfThen>& ifThens);
141142

142143
/// Used to find the function specification in the constructed function map.
143144
std::string findFuncSpec(uint64_t id);
@@ -467,7 +468,9 @@ class SubstraitVeloxPlanConverter {
467468
const std::vector<::substrait::Expression_ScalarFunction>&
468469
remainingFunctions,
469470
const std::vector<::substrait::Expression_SingularOrList>&
470-
singularOrLists);
471+
singularOrLists,
472+
const std::vector<::substrait::Expression_IfThen>&
473+
ifThens);
471474

472475
/// Connect the left and right expressions with 'and' relation.
473476
core::TypedExprPtr connectWithAnd(

velox/substrait/tests/Substrait2VeloxPlanConversionTest.cpp

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -293,4 +293,21 @@ TEST_F(Substrait2VeloxPlanConversionTest, q6) {
293293
exec::test::AssertQueryBuilder(planNode)
294294
.splits(makeSplits(*planConverter_, planNode))
295295
.assertResults(expectedResult);
296+
}
297+
298+
TEST_F(Substrait2VeloxPlanConversionTest, ifthenTest) {
299+
std::string subPlanPath =
300+
getDataFilePath("velox/substrait/tests", "data/if_then.json");
301+
302+
::substrait::Plan substraitPlan;
303+
JsonToProtoConverter::readFromFile(subPlanPath, substraitPlan);
304+
305+
// Convert to Velox PlanNode.
306+
facebook::velox::substrait::SubstraitVeloxPlanConverter planConverter(
307+
pool_.get());
308+
auto planNode = planConverter.toVeloxPlan(substraitPlan);
309+
ASSERT_EQ(
310+
"-- Project[expressions: (n1_0:BIGINT, ROW[\"n0_0\"])] -> n1_0:BIGINT\n"
311+
" -- TableScan[table: hive_table, range filters: [(hd_buy_potential, Filter(MultiRange, deterministic, null not allowed)), (hd_demo_sk, Filter(IsNotNull, deterministic, null not allowed)), (hd_vehicle_count, BigintRange: [1, 9223372036854775807] no nulls)], remaining filter: (if(greaterthan(ROW[\"hd_vehicle_count\"],0),greaterthan(divide(cast ROW[\"hd_dep_count\"] as DOUBLE,cast ROW[\"hd_vehicle_count\"] as DOUBLE),1.2)))] -> n0_0:BIGINT, n0_1:VARCHAR, n0_2:BIGINT, n0_3:BIGINT\n",
312+
planNode->toString(true, true));
296313
}

0 commit comments

Comments
 (0)