Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions ydb/core/kqp/opt/peephole/kqp_opt_peephole.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ class TKqpPeepholeTransformer : public TOptimizeTransformerBase {
{
#define HNDL(name) "KqpPeephole-"#name, Hndl(&TKqpPeepholeTransformer::name)
AddHandler(0, &TDqReplicate::Match, HNDL(RewriteReplicate));
AddHandler(0, &TDqPhyMapJoin::Match, HNDL(RewriteMapJoin));
AddHandler(0, &TDqPhyGraceJoin::Match, HNDL(RewriteMapJoinWithGraceCore));
AddHandler(0, &TDqPhyMapJoin::Match, HNDL(RewriteMapJoinWithMapCore));
AddHandler(0, &TDqPhyCrossJoin::Match, HNDL(RewriteCrossJoin));
AddHandler(0, &TDqPhyJoinDict::Match, HNDL(RewriteDictJoin));
AddHandler(0, &TDqJoin::Match, HNDL(RewritePureJoin));
Expand All @@ -110,9 +111,15 @@ class TKqpPeepholeTransformer : public TOptimizeTransformerBase {
return output;
}

TMaybeNode<TExprBase> RewriteMapJoin(TExprBase node, TExprContext& ctx) {
TExprBase output = DqPeepholeRewriteMapJoin(node, ctx);
DumpAppliedRule("RewriteMapJoin", node.Ptr(), output.Ptr(), ctx);
TMaybeNode<TExprBase> RewriteMapJoinWithGraceCore(TExprBase node, TExprContext& ctx) {
TExprBase output = DqPeepholeRewriteMapJoinWithGraceCore(node, ctx);
DumpAppliedRule("RewriteMapJoinWithGraceCore", node.Ptr(), output.Ptr(), ctx);
return output;
}

TMaybeNode<TExprBase> RewriteMapJoinWithMapCore(TExprBase node, TExprContext& ctx) {
TExprBase output = DqPeepholeRewriteMapJoinWithMapCore(node, ctx);
DumpAppliedRule("RewriteMapJoinWithMapCore", node.Ptr(), output.Ptr(), ctx);
return output;
}

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/opt/physical/kqp_opt_phy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@ class TKqpPhysicalOptTransformer : public TOptimizeTransformerBase {
// It is now possible as we don't use datashard transactions for reads in data queries.
bool pushLeftStage = !KqpCtx.IsDataQuery() && AllowFuseJoinInputs(node);
TExprBase output = DqBuildJoin(node, ctx, optCtx, *getParents(), IsGlobal,
pushLeftStage, KqpCtx.Config->GetHashJoinMode(), false
pushLeftStage, KqpCtx.Config->GetHashJoinMode(), false, KqpCtx.Config->UseGraceJoinCoreForMap.Get().GetOrElse(false)
);
DumpAppliedRule("BuildJoin", node.Ptr(), output.Ptr(), ctx);
return output;
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/provider/yql_kikimr_settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ TKikimrConfiguration::TKikimrConfiguration() {
REGISTER_SETTING(*this, OptEnableOlapProvideComputeSharding);
REGISTER_SETTING(*this, OverrideStatistics);
REGISTER_SETTING(*this, OverridePlanner);

REGISTER_SETTING(*this, UseGraceJoinCoreForMap);

REGISTER_SETTING(*this, OptUseFinalizeByKey);
REGISTER_SETTING(*this, CostBasedOptimizationLevel);
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/provider/yql_kikimr_settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ struct TKikimrSettings {
NCommon::TConfSetting<TString, false> OverrideStatistics;
NCommon::TConfSetting<ui64, false> EnableSpillingNodes;
NCommon::TConfSetting<TString, false> OverridePlanner;
NCommon::TConfSetting<bool, false> UseGraceJoinCoreForMap;

/* Disable optimizer rules */
NCommon::TConfSetting<bool, false> OptDisableTopSort;
Expand Down
5 changes: 5 additions & 0 deletions ydb/library/yql/dq/expr_nodes/dq_expr_nodes.json
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@
{"Index": 9, "Name": "Flags", "Type": "TCoAtomList", "Optional": true}
]
},
{
"Name": "TDqPhyGraceJoin",
"Base": "TDqJoinBase",
"Match": {"Type": "Callable", "Name": "DqPhyGraceJoin"}
},
{
"Name": "TDqPhyMapJoin",
"Base": "TDqJoinBase",
Expand Down
41 changes: 27 additions & 14 deletions ydb/library/yql/dq/opt/dq_opt_join.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -323,8 +323,8 @@ std::pair<TVector<TCoAtom>, TVector<TCoAtom>> GetJoinKeys(const TDqJoin& join, T
}


TDqPhyMapJoin DqMakePhyMapJoin(const TDqJoin& join, const TExprBase& leftInput, const TExprBase& rightInput,
TExprContext& ctx)
TDqJoinBase DqMakePhyMapJoin(const TDqJoin& join, const TExprBase& leftInput, const TExprBase& rightInput,
TExprContext& ctx, bool useGraceCore)
{
static const std::set<std::string_view> supportedTypes = {"Inner"sv, "Left"sv, "LeftOnly"sv, "LeftSemi"sv};
auto joinType = join.JoinType().Value();
Expand All @@ -349,16 +349,29 @@ TDqPhyMapJoin DqMakePhyMapJoin(const TDqJoin& join, const TExprBase& leftInput,
auto leftFilteredInput = BuildSkipNullKeys(ctx, join.Pos(), leftInput, leftFilterKeys);
auto rightFilteredInput = BuildSkipNullKeys(ctx, join.Pos(), rightInput, rightFilterKeys);

return Build<TDqPhyMapJoin>(ctx, join.Pos())
.LeftInput(leftFilteredInput)
.LeftLabel(join.LeftLabel())
.RightInput(rightFilteredInput)
.RightLabel(join.RightLabel())
.JoinType(join.JoinType())
.JoinKeys(join.JoinKeys())
.LeftJoinKeyNames(join.LeftJoinKeyNames())
.RightJoinKeyNames(join.RightJoinKeyNames())
.Done();
if (useGraceCore) {
return Build<TDqPhyGraceJoin>(ctx, join.Pos())
.LeftInput(leftFilteredInput)
.LeftLabel(join.LeftLabel())
.RightInput(rightFilteredInput)
.RightLabel(join.RightLabel())
.JoinType(join.JoinType())
.JoinKeys(join.JoinKeys())
.LeftJoinKeyNames(join.LeftJoinKeyNames())
.RightJoinKeyNames(join.RightJoinKeyNames())
.Done();
} else {
return Build<TDqPhyMapJoin>(ctx, join.Pos())
.LeftInput(leftFilteredInput)
.LeftLabel(join.LeftLabel())
.RightInput(rightFilteredInput)
.RightLabel(join.RightLabel())
.JoinType(join.JoinType())
.JoinKeys(join.JoinKeys())
.LeftJoinKeyNames(join.LeftJoinKeyNames())
.RightJoinKeyNames(join.RightJoinKeyNames())
.Done();
}
}

} // namespace
Expand Down Expand Up @@ -609,7 +622,7 @@ TExprBase DqRewriteLeftPureJoin(const TExprBase node, TExprContext& ctx, const T
.Done();
}

TExprBase DqBuildPhyJoin(const TDqJoin& join, bool pushLeftStage, TExprContext& ctx, IOptimizationContext& optCtx) {
TExprBase DqBuildPhyJoin(const TDqJoin& join, bool pushLeftStage, TExprContext& ctx, IOptimizationContext& optCtx, bool useGraceCoreForMap) {
static const std::set<std::string_view> supportedTypes = {
"Inner"sv,
"Left"sv,
Expand Down Expand Up @@ -760,7 +773,7 @@ TExprBase DqBuildPhyJoin(const TDqJoin& join, bool pushLeftStage, TExprContext&

TMaybeNode<TExprBase> phyJoin;
if (join.JoinType().Value() != "Cross"sv) {
phyJoin = DqMakePhyMapJoin(join, leftInputArg, joinRightInput, ctx);
phyJoin = DqMakePhyMapJoin(join, leftInputArg, joinRightInput, ctx, useGraceCoreForMap);
} else {
YQL_ENSURE(join.JoinKeys().Empty());

Expand Down
4 changes: 2 additions & 2 deletions ydb/library/yql/dq/opt/dq_opt_join.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ NNodes::TExprBase DqRewriteEquiJoin(const NNodes::TExprBase& node, EHashJoinMode

NNodes::TExprBase DqRewriteEquiJoin(const NNodes::TExprBase& node, EHashJoinMode mode, bool useCBO, TExprContext& ctx, const TTypeAnnotationContext& typeCtx, int& joinCounter);

NNodes::TExprBase DqBuildPhyJoin(const NNodes::TDqJoin& join, bool pushLeftStage, TExprContext& ctx, IOptimizationContext& optCtx);
NNodes::TExprBase DqBuildPhyJoin(const NNodes::TDqJoin& join, bool pushLeftStage, TExprContext& ctx, IOptimizationContext& optCtx, bool useGraceCoreForMap);

NNodes::TExprBase DqBuildJoin(const NNodes::TExprBase& node, TExprContext& ctx,
IOptimizationContext& optCtx, const TParentsMap& parentsMap, bool allowStageMultiUsage, bool pushLeftStage, EHashJoinMode hashJoin = EHashJoinMode::Off, bool shuffleMapJoin = true);
IOptimizationContext& optCtx, const TParentsMap& parentsMap, bool allowStageMultiUsage, bool pushLeftStage, EHashJoinMode hashJoin = EHashJoinMode::Off, bool shuffleMapJoin = true, bool useGraceCoreForMap = false);

NNodes::TExprBase DqBuildHashJoin(const NNodes::TDqJoin& join, EHashJoinMode mode, TExprContext& ctx, IOptimizationContext& optCtx);

Expand Down
142 changes: 141 additions & 1 deletion ydb/library/yql/dq/opt/dq_opt_peephole.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <ydb/library/yql/core/yql_opt_utils.h>
#include <ydb/library/yql/core/yql_expr_type_annotation.h>
#include <ydb/library/yql/core/yql_expr_optimize.h>
#include <ydb/library/yql/core/yql_type_helpers.h>

#include <ydb/library/yql/utils/log/log.h>

Expand Down Expand Up @@ -130,8 +131,146 @@ TExprNode::TListType OriginalJoinOutputMembers(const TDqPhyMapJoin& mapJoin, TEx
}
return structMembers;
}

TExprNode::TPtr ExpandJoinInput(const TStructExprType& type, TExprNode::TPtr&& arg, TExprContext& ctx) {
return ctx.Builder(arg->Pos())
.Callable("ExpandMap")
.Add(0, std::move(arg))
.Lambda(1)
.Param("item")
.Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& {
auto i = 0U;
for (const auto& item : type.GetItems()) {
parent.Callable(i++, "Member")
.Arg(0, "item")
.Atom(1, item->GetName())
.Seal();
}
return parent;
})
.Seal()
.Seal().Build();
}

} // anonymous namespace end

TExprBase DqPeepholeRewriteMapJoinWithGraceCore(const TExprBase& node, TExprContext& ctx) {
if (!node.Maybe<TDqPhyGraceJoin>()) {
return node;
}
const auto graceJoin = node.Cast<TDqPhyGraceJoin>();
const auto pos = graceJoin.Pos();

const TString leftTableLabel(GetTableLabel(graceJoin.LeftLabel()));
const TString rightTableLabel(GetTableLabel(graceJoin.RightLabel()));

auto [leftKeyColumnNodes, rightKeyColumnNodes] = JoinKeysToAtoms(ctx, graceJoin, leftTableLabel, rightTableLabel);
const auto keyWidth = leftKeyColumnNodes.size();

const auto itemTypeLeft = GetSequenceItemType(graceJoin.LeftInput(), false, ctx)->Cast<TStructExprType>();
const auto itemTypeRight = GetSequenceItemType(graceJoin.RightInput(), false, ctx)->Cast<TStructExprType>();

TExprNode::TListType leftRenames, rightRenames;
std::vector<TString> fullColNames;
ui32 outputIndex = 0;

for (auto i = 0u; i < itemTypeLeft->GetSize(); i++) {
TString name(itemTypeLeft->GetItems()[i]->GetName());
if (leftTableLabel) {
name = leftTableLabel + "." + name;
}
fullColNames.push_back(name);
leftRenames.emplace_back(ctx.NewAtom(pos, ctx.GetIndexAsString(i)));
leftRenames.emplace_back(ctx.NewAtom(pos, ctx.GetIndexAsString(outputIndex++)));
}
if (graceJoin.JoinType().Value() != "LeftOnly" && graceJoin.JoinType().Value() != "LeftSemi") {
for (auto i = 0u; i < itemTypeRight->GetSize(); i++) {
TString name(itemTypeRight->GetItems()[i]->GetName());
if (rightTableLabel) {
name = rightTableLabel + "." + name;
}
fullColNames.push_back(name);
rightRenames.emplace_back(ctx.NewAtom(pos, ctx.GetIndexAsString(i)));
rightRenames.emplace_back(ctx.NewAtom(pos, ctx.GetIndexAsString(outputIndex++)));
}
}

TTypeAnnotationNode::TListType keyTypesLeft(keyWidth);
TTypeAnnotationNode::TListType keyTypesRight(keyWidth);
TTypeAnnotationNode::TListType keyTypes(keyWidth);
for (auto i = 0U; i < keyTypes.size(); ++i) {
const auto keyTypeLeft = itemTypeLeft->FindItemType(leftKeyColumnNodes[i]->Content());
const auto keyTypeRight = itemTypeRight->FindItemType(rightKeyColumnNodes[i]->Content());
bool optKey = false;
keyTypes[i] = JoinDryKeyType(keyTypeLeft, keyTypeRight, optKey, ctx);
if (!keyTypes[i]) {
keyTypes.clear();
keyTypesLeft.clear();
keyTypesRight.clear();
break;
}
keyTypesLeft[i] = optKey ? ctx.MakeType<TOptionalExprType>(keyTypes[i]) : keyTypes[i];
keyTypesRight[i] = optKey ? ctx.MakeType<TOptionalExprType>(keyTypes[i]) : keyTypes[i];
}

auto leftInput = ExpandJoinInput(*itemTypeLeft, ctx.NewCallable(graceJoin.LeftInput().Pos(), "ToFlow", {graceJoin.LeftInput().Ptr()}), ctx);
auto rightInput = ExpandJoinInput(*itemTypeRight, ctx.NewCallable(graceJoin.RightInput().Pos(), "ToFlow", {graceJoin.RightInput().Ptr()}), ctx);
YQL_ENSURE(!keyTypes.empty());

for (auto i = 0U; i < leftKeyColumnNodes.size(); i++) {
const auto origName = TString(leftKeyColumnNodes[i]->Content());
auto index = itemTypeLeft->FindItem(origName);
YQL_ENSURE(index);
leftKeyColumnNodes[i] = ctx.NewAtom(leftKeyColumnNodes[i]->Pos(), ctx.GetIndexAsString(*index));
}
for (auto i = 0U; i < rightKeyColumnNodes.size(); i++) {
const auto origName = TString(rightKeyColumnNodes[i]->Content());
auto index = itemTypeRight->FindItem(origName);
YQL_ENSURE(index);
rightKeyColumnNodes[i] = ctx.NewAtom(rightKeyColumnNodes[i]->Pos(), ctx.GetIndexAsString(*index));
}

auto [leftKeyColumnNodesCopy, rightKeyColumnNodesCopy] = JoinKeysToAtoms(ctx, graceJoin, leftTableLabel, rightTableLabel);

auto graceJoinCore = Build<TCoGraceJoinCore>(ctx, pos)
.LeftInput(std::move(leftInput))
.RightInput(std::move(rightInput))
.JoinKind(graceJoin.JoinType())
.LeftKeysColumns(ctx.NewList(pos, std::move(leftKeyColumnNodes)))
.RightKeysColumns(ctx.NewList(pos, std::move(rightKeyColumnNodes)))
.LeftRenames(ctx.NewList(pos, std::move(leftRenames)))
.RightRenames(ctx.NewList(pos, std::move(rightRenames)))
.LeftKeysColumnNames(ctx.NewList(pos, std::move(leftKeyColumnNodesCopy)))
.RightKeysColumnNames(ctx.NewList(pos, std::move(rightKeyColumnNodesCopy)))
.Flags()
.Build()
.Done();

auto graceNode = ctx.Builder(pos)
.Callable("NarrowMap")
.Add(0, graceJoinCore.Ptr())
.Lambda(1)
.Params("output", fullColNames.size())
.Callable("AsStruct")
.Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& {
ui32 i = 0U;
for (const auto& colName : fullColNames) {
parent.List(i)
.Atom(0, colName)
.Arg(1, "output", i)
.Seal();
i++;
}
return parent;
})
.Seal()
.Seal()
.Seal()
.Build();

return TExprBase(graceNode);
}

/**
* Rewrites a `KqpMapJoin` to the `MapJoinCore`.
*
Expand All @@ -142,10 +281,11 @@ TExprNode::TListType OriginalJoinOutputMembers(const TDqPhyMapJoin& mapJoin, TEx
* (rely on the fact that there will be only one element in the `FlatMap`-stream)
* - Align key types using `StrictCast`, use internal columns to store converted left keys
*/
TExprBase DqPeepholeRewriteMapJoin(const TExprBase& node, TExprContext& ctx) {
TExprBase DqPeepholeRewriteMapJoinWithMapCore(const TExprBase& node, TExprContext& ctx) {
if (!node.Maybe<TDqPhyMapJoin>()) {
return node;
}

const auto mapJoin = node.Cast<TDqPhyMapJoin>();
const auto pos = mapJoin.Pos();

Expand Down
3 changes: 2 additions & 1 deletion ydb/library/yql/dq/opt/dq_opt_peephole.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ namespace NYql::NDq {

NNodes::TExprBase DqPeepholeRewriteCrossJoin(const NNodes::TExprBase& node, TExprContext& ctx);
NNodes::TExprBase DqPeepholeRewriteJoinDict(const NNodes::TExprBase& node, TExprContext& ctx);
NNodes::TExprBase DqPeepholeRewriteMapJoin(const NNodes::TExprBase& node, TExprContext& ctx);
NNodes::TExprBase DqPeepholeRewriteMapJoinWithGraceCore(const NNodes::TExprBase& node, TExprContext& ctx);
NNodes::TExprBase DqPeepholeRewriteMapJoinWithMapCore(const NNodes::TExprBase& node, TExprContext& ctx);
NNodes::TExprBase DqPeepholeRewriteReplicate(const NNodes::TExprBase& node, TExprContext& ctx);
NNodes::TExprBase DqPeepholeRewritePureJoin(const NNodes::TExprBase& node, TExprContext& ctx);
NNodes::TExprBase DqPeepholeDropUnusedInputs(const NNodes::TExprBase& node, TExprContext& ctx);
Expand Down
4 changes: 2 additions & 2 deletions ydb/library/yql/dq/opt/dq_opt_phy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2654,7 +2654,7 @@ TMaybeNode<TDqJoin> DqFlipJoin(const TDqJoin& join, TExprContext& ctx) {


TExprBase DqBuildJoin(const TExprBase& node, TExprContext& ctx, IOptimizationContext& optCtx,
const TParentsMap& parentsMap, bool allowStageMultiUsage, bool pushLeftStage, EHashJoinMode hashJoin, bool shuffleMapJoin)
const TParentsMap& parentsMap, bool allowStageMultiUsage, bool pushLeftStage, EHashJoinMode hashJoin, bool shuffleMapJoin, bool useGraceCoreForMap)
{
if (!node.Maybe<TDqJoin>()) {
return node;
Expand Down Expand Up @@ -2704,7 +2704,7 @@ TExprBase DqBuildJoin(const TExprBase& node, TExprContext& ctx, IOptimizationCon
// separate stage to receive data from both sides of join.
// TODO: We can push MapJoin to existing stage for data query, if it doesn't have table reads. This
// requires some additional knowledge, probably with use of constraints.
return DqBuildPhyJoin(join, pushLeftStage, ctx, optCtx);
return DqBuildPhyJoin(join, pushLeftStage, ctx, optCtx, useGraceCoreForMap);
}

TExprBase DqPrecomputeToInput(const TExprBase& node, TExprContext& ctx) {
Expand Down
4 changes: 4 additions & 0 deletions ydb/library/yql/dq/type_ann/dq_type_ann.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1054,6 +1054,10 @@ THolder<IGraphTransformer> CreateDqTypeAnnotationTransformer(TTypeAnnotationCont
return AnnotateDqJoin(input, ctx);
}

if (TDqPhyGraceJoin::Match(input.Get())) {
return AnnotateDqMapOrDictJoin(input, ctx);
}

if (TDqPhyMapJoin::Match(input.Get())) {
return AnnotateDqMapOrDictJoin(input, ctx);
}
Expand Down
1 change: 1 addition & 0 deletions ydb/library/yql/providers/dq/common/yql_dq_settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ TDqConfiguration::TDqConfiguration() {
}
return res;
});
REGISTER_SETTING(*this, UseGraceJoinCoreForMap);
}

} // namespace NYql
1 change: 1 addition & 0 deletions ydb/library/yql/providers/dq/common/yql_dq_settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ struct TDqSettings {

NCommon::TConfSetting<ui64, false> _MaxAttachmentsSize;
NCommon::TConfSetting<bool, false> DisableCheckpoints;
NCommon::TConfSetting<bool, false> UseGraceJoinCoreForMap;

// This options will be passed to executor_actor and worker_actor
template <typename TProtoConfig>
Expand Down
3 changes: 2 additions & 1 deletion ydb/library/yql/providers/dq/opt/dqs_opt.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ namespace NYql::NDqs {
TExprBase node{inputExpr};
PERFORM_RULE(DqPeepholeRewriteCrossJoin, node, ctx);
PERFORM_RULE(DqPeepholeRewriteJoinDict, node, ctx);
PERFORM_RULE(DqPeepholeRewriteMapJoin, node, ctx);
PERFORM_RULE(DqPeepholeRewriteMapJoinWithGraceCore, node, ctx);
PERFORM_RULE(DqPeepholeRewriteMapJoinWithMapCore, node, ctx);
PERFORM_RULE(DqPeepholeRewritePureJoin, node, ctx);
PERFORM_RULE(DqPeepholeRewriteReplicate, node, ctx);
PERFORM_RULE(DqPeepholeDropUnusedInputs, node, ctx);
Expand Down
3 changes: 2 additions & 1 deletion ydb/library/yql/providers/dq/opt/physical_optimize.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,8 @@ class TDqsPhysicalOptProposalTransformer : public TOptimizeTransformerBase {
const auto join = node.Cast<TDqJoin>();
const TParentsMap* parentsMap = getParents();
const auto mode = Config->HashJoinMode.Get().GetOrElse(EHashJoinMode::Off);
return DqBuildJoin(join, ctx, optCtx, *parentsMap, IsGlobal, /* pushLeftStage = */ false /* TODO */, mode);
const auto useGraceJoin = Config->UseGraceJoinCoreForMap.Get().GetOrElse(false);
return DqBuildJoin(join, ctx, optCtx, *parentsMap, IsGlobal, /* pushLeftStage = */ false /* TODO */, mode, true, useGraceJoin);
}

template <bool IsGlobal>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class TDqDataSinkConstraintTransformer : public TVisitorTransformerBase {
AddHandler({TDqReplicate::CallableName()}, Hndl(&TDqDataSinkConstraintTransformer::HandleReplicate));
AddHandler({
TDqJoin::CallableName(),
TDqPhyGraceJoin::CallableName(),
TDqPhyMapJoin::CallableName(),
TDqPhyCrossJoin::CallableName(),
TDqPhyJoinDict::CallableName(),
Expand Down
Loading