Skip to content

[YQL-17789] Fix WideToBlocks over WideFromBlocks optimizer. Introduce ReplicateScalars #1903

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
218 changes: 186 additions & 32 deletions ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ TExprNode::TPtr OptimizeWideToBlocks(const TExprNode::TPtr& node, TExprContext&
Y_UNUSED(types);
if (node->Head().IsCallable("WideFromBlocks")) {
YQL_CLOG(DEBUG, CorePeepHole) << "Drop " << node->Content() << " over " << node->Head().Content();
return node->Head().HeadPtr();
return ctx.NewCallable(node->Pos(), "ReplicateScalars", { node->Head().HeadPtr() });
}

if (const auto& input = node->Head(); input.IsCallable({"Extend", "OrderedExtend"})) {
Expand All @@ -145,6 +145,118 @@ TExprNode::TPtr OptimizeWideToBlocks(const TExprNode::TPtr& node, TExprContext&
return node;
}

TExprNode::TPtr OptimizeWideFromBlocks(const TExprNode::TPtr& node, TExprContext& ctx, TTypeAnnotationContext& types) {
Y_UNUSED(types);
if (node->Head().IsCallable("WideToBlocks")) {
YQL_CLOG(DEBUG, CorePeepHole) << "Drop " << node->Content() << " over " << node->Head().Content();
return node->Head().HeadPtr();
}

if (node->Head().IsCallable("ReplicateScalars")) {
YQL_CLOG(DEBUG, CorePeepHole) << "Drop " << node->Head().Content() << " as input of " << node->Content();
return ctx.ChangeChild(*node, 0, node->Head().HeadPtr());
}

return node;
}

TExprNode::TPtr OptimizeWideTakeSkipBlocks(const TExprNode::TPtr& node, TExprContext& ctx, TTypeAnnotationContext& types) {
Y_UNUSED(types);
if (node->Head().IsCallable("ReplicateScalars")) {
YQL_CLOG(DEBUG, CorePeepHole) << "Swap " << node->Content() << " with " << node->Head().Content();
return ctx.SwapWithHead(*node);
}

return node;
}

TExprNode::TPtr OptimizeBlockCompress(const TExprNode::TPtr& node, TExprContext& ctx, TTypeAnnotationContext& types) {
Y_UNUSED(types);
if (node->Head().IsCallable("ReplicateScalars")) {
YQL_CLOG(DEBUG, CorePeepHole) << "Swap " << node->Content() << " with " << node->Head().Content();
if (node->Head().ChildrenSize() == 1) {
return ctx.SwapWithHead(*node);
}

const ui32 compressIndex = FromString<ui32>(node->Child(1)->Content());
TExprNodeList newReplicateIndexes;
for (auto atom : node->Head().Child(1)->ChildrenList()) {
ui32 idx = FromString<ui32>(atom->Content());
if (idx != compressIndex) {
newReplicateIndexes.push_back((idx < compressIndex) ? atom : ctx.NewAtom(atom->Pos(), idx - 1));
}
}
return ctx.Builder(node->Pos())
.Callable("ReplicateScalars")
.Add(0, ctx.ChangeChild(*node, 0, node->Head().HeadPtr()))
.Add(1, ctx.NewList(node->Head().Child(1)->Pos(), std::move(newReplicateIndexes)))
.Seal()
.Build();
}

return node;
}

TExprNode::TPtr OptimizeBlocksTopOrSort(const TExprNode::TPtr& node, TExprContext& ctx, TTypeAnnotationContext& types) {
Y_UNUSED(types);
if (node->Head().IsCallable("ReplicateScalars")) {
YQL_CLOG(DEBUG, CorePeepHole) << "Swap " << node->Content() << " with " << node->Head().Content();
return ctx.SwapWithHead(*node);
}

return node;
}

TExprNode::TPtr OptimizeBlockExtend(const TExprNode::TPtr& node, TExprContext& ctx, TTypeAnnotationContext& types) {
Y_UNUSED(types);
TExprNodeList inputs = node->ChildrenList();
bool hasReplicateScalars = false;
for (auto& input : inputs) {
if (input->IsCallable("ReplicateScalars")) {
hasReplicateScalars = true;
input = input->HeadPtr();
}
}

if (hasReplicateScalars) {
YQL_CLOG(DEBUG, CorePeepHole) << "Drop ReplicateScalars as input of " << node->Content();
return ctx.ChangeChildren(*node, std::move(inputs));
}

return node;
}

TExprNode::TPtr OptimizeReplicateScalars(const TExprNode::TPtr& node, TExprContext& ctx, TTypeAnnotationContext& types) {
Y_UNUSED(types);
if (node->Head().IsCallable("ReplicateScalars")) {
if (node->ChildrenSize() == 1) {
YQL_CLOG(DEBUG, CorePeepHole) << "Drop " << node->Head().Content() << " as input of " << node->Content();
return ctx.ChangeChild(*node, 0, node->Head().HeadPtr());
}

// child ReplicateScalar should also have indexes
YQL_ENSURE(node->Head().ChildrenSize() == 2);

TSet<ui32> mergedIndexes;
for (auto atom : node->Child(1)->ChildrenList()) {
mergedIndexes.insert(FromString<ui32>(atom->Content()));
}
for (auto atom : node->Head().Child(1)->ChildrenList()) {
mergedIndexes.insert(FromString<ui32>(atom->Content()));
}

TExprNodeList newIndexes;
for (auto& i : mergedIndexes) {
newIndexes.push_back(ctx.NewAtom(node->Child(1)->Pos(), i));
}

YQL_CLOG(DEBUG, CorePeepHole) << "Merge nested " << node->Content();
return ctx.ChangeChildren(*node, { node->Head().HeadPtr(), ctx.NewList(node->Child(1)->Pos(), std::move(newIndexes)) });
}

return node;
}

TExprNode::TPtr ExpandBlockExtend(const TExprNode::TPtr& node, TExprContext& ctx, TTypeAnnotationContext& types) {
Y_UNUSED(types);
YQL_CLOG(DEBUG, CorePeepHole) << "Expand " << node->Content();
Expand All @@ -156,38 +268,10 @@ TExprNode::TPtr ExpandBlockExtend(const TExprNode::TPtr& node, TExprContext& ctx
const auto& items = child->GetTypeAnn()->Cast<TFlowExprType>()->GetItemType()->Cast<TMultiExprType>()->GetItems();
const ui32 width = items.size();
YQL_ENSURE(width > 0);
if (AllOf(items.begin(), items.end() - 1, [](const auto& item) { return item->IsBlock(); })) {
newChildren.push_back(child);
continue;
}

seenScalars = true;

TExprNodeList args;
TExprNodeList bodyItems;

args.reserve(width);
bodyItems.reserve(width);
auto lastColumn = ctx.NewArgument(child->Pos(), "height");
for (ui32 i = 0; i < width; ++i) {
auto arg = (i + 1 == width) ? lastColumn : ctx.NewArgument(child->Pos(), "arg");
args.push_back(arg);

if (i + 1 == width || items[i]->IsBlock()) {
bodyItems.push_back(arg);
} else {
YQL_ENSURE(items[i]->IsScalar());
bodyItems.push_back(ctx.NewCallable(child->Pos(), "ReplicateScalar", { arg, lastColumn}));
}
}

newChildren.push_back(ctx.Builder(child->Pos())
.Callable("WideMap")
.Add(0, child)
.Add(1, ctx.NewLambda(child->Pos(), ctx.NewArguments(child->Pos(), std::move(args)), std::move(bodyItems)))
.Seal()
.Build()
);
const bool hasScalars = AnyOf(items.begin(), items.end() - 1, [](const auto& item) { return item->IsScalar(); });
seenScalars = seenScalars || hasScalars;
newChildren.push_back(ctx.WrapByCallableIf(hasScalars, "ReplicateScalars", std::move(child)));
}

const TStringBuf newName = node->IsCallable("BlockOrdredExtend") ? "OrdredExtend" : "Extend";
Expand All @@ -197,6 +281,48 @@ TExprNode::TPtr ExpandBlockExtend(const TExprNode::TPtr& node, TExprContext& ctx
return ctx.NewCallable(node->Pos(), newName, std::move(newChildren));
}

TExprNode::TPtr ExpandReplicateScalars(const TExprNode::TPtr& node, TExprContext& ctx, TTypeAnnotationContext& types) {
Y_UNUSED(types);
YQL_CLOG(DEBUG, CorePeepHole) << "Expand " << node->Content();
const auto& items = node->Head().GetTypeAnn()->Cast<TFlowExprType>()->GetItemType()->Cast<TMultiExprType>()->GetItems();
const ui32 width = items.size();

TExprNodeList args;
TExprNodeList bodyItems;

args.reserve(width);
bodyItems.reserve(width);
auto lastColumn = ctx.NewArgument(node->Pos(), "height");

TMaybe<THashSet<ui32>> replicateIndexes;
if (node->ChildrenSize() == 2) {
replicateIndexes.ConstructInPlace();
for (auto atom : node->Head().Child(1)->ChildrenList()) {
replicateIndexes->insert(FromString<ui32>(atom->Content()));
}
}

for (ui32 i = 0; i < width; ++i) {
auto arg = (i + 1 == width) ? lastColumn : ctx.NewArgument(node->Pos(), "arg");
args.push_back(arg);

if (i + 1 == width || items[i]->IsBlock()) {
bodyItems.push_back(arg);
} else {
YQL_ENSURE(items[i]->IsScalar());
bool doReplicate = !replicateIndexes.Defined() || replicateIndexes->contains(i);
bodyItems.push_back(doReplicate ? ctx.NewCallable(node->Pos(), "ReplicateScalar", { arg, lastColumn}) : arg);
}
}

return ctx.Builder(node->Pos())
.Callable("WideMap")
.Add(0, node->HeadPtr())
.Add(1, ctx.NewLambda(node->Pos(), ctx.NewArguments(node->Pos(), std::move(args)), std::move(bodyItems)))
.Seal()
.Build();
}

TExprNode::TPtr SplitEquiJoinToPairsRecursive(const TExprNode& node, const TExprNode& joinTree, TExprContext& ctx,
std::vector<std::string_view>& outLabels, const TExprNode::TPtr& settings) {
const auto leftSubtree = joinTree.Child(1);
Expand Down Expand Up @@ -5804,6 +5930,21 @@ TExprNode::TPtr OptimizeBlockCombine(const TExprNode::TPtr& node, TExprContext&
return UpdateBlockCombineColumns(node, filterIndex, argIndices, ctx);
}

if (node->Head().IsCallable("ReplicateScalars")) {
YQL_CLOG(DEBUG, CorePeepHole) << "Drop " << node->Head().Content() << " as input of " << node->Content();
return ctx.ChangeChild(*node, 0, node->Head().HeadPtr());
}

return node;
}

TExprNode::TPtr OptimizeBlockMerge(const TExprNode::TPtr& node, TExprContext& ctx, TTypeAnnotationContext& types) {
Y_UNUSED(types);
if (node->Head().IsCallable("ReplicateScalars")) {
YQL_CLOG(DEBUG, CorePeepHole) << "Drop " << node->Head().Content() << " as input of " << node->Content();
return ctx.ChangeChild(*node, 0, node->Head().HeadPtr());
}

return node;
}

Expand Down Expand Up @@ -7760,10 +7901,22 @@ struct TPeepHoleRules {
{"NarrowMap", &OptimizeWideMapBlocks},
{"WideFilter", &OptimizeWideFilterBlocks},
{"WideToBlocks", &OptimizeWideToBlocks},
{"WideFromBlocks", &OptimizeWideFromBlocks},
{"WideTakeBlocks", &OptimizeWideTakeSkipBlocks},
{"WideSkipBlocks", &OptimizeWideTakeSkipBlocks},
{"BlockCompress", &OptimizeBlockCompress},
{"WideTopBlocks", &OptimizeBlocksTopOrSort},
{"WideTopSortBlocks", &OptimizeBlocksTopOrSort},
{"WideSortBlocks", &OptimizeBlocksTopOrSort},
{"BlockExtend", &OptimizeBlockExtend},
{"BlockOrderedExtend", &OptimizeBlockExtend},
{"ReplicateScalars", &OptimizeReplicateScalars},
{"Skip", &OptimizeSkipTakeToBlocks},
{"Take", &OptimizeSkipTakeToBlocks},
{"BlockCombineAll", &OptimizeBlockCombine},
{"BlockCombineHashed", &OptimizeBlockCombine},
{"BlockMergeFinalizeHashed", &OptimizeBlockMerge},
{"BlockMergeManyFinalizeHashed", &OptimizeBlockMerge},
{"WideTop", &OptimizeTopOrSortBlocks},
{"WideTopSort", &OptimizeTopOrSortBlocks},
{"WideSort", &OptimizeTopOrSortBlocks},
Expand All @@ -7772,6 +7925,7 @@ struct TPeepHoleRules {
const TExtPeepHoleOptimizerMap BlockStageExtFinalRules = {
{"BlockExtend", &ExpandBlockExtend},
{"BlockOrderedExtend", &ExpandBlockExtend},
{"ReplicateScalars", &ExpandReplicateScalars}
};

static const TPeepHoleRules& Instance() {
Expand Down
71 changes: 71 additions & 0 deletions ydb/library/yql/core/type_ann/type_ann_blocks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,77 @@ IGraphTransformer::TStatus ReplicateScalarWrapper(const TExprNode::TPtr& input,
return IGraphTransformer::TStatus::Ok;
}

IGraphTransformer::TStatus ReplicateScalarsWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) {
if (!EnsureMinArgsCount(*input, 1, ctx.Expr) || !EnsureMaxArgsCount(*input, 2, ctx.Expr)) {
return IGraphTransformer::TStatus::Error;
}

TTypeAnnotationNode::TListType blockItemTypes;
if (!EnsureWideFlowBlockType(input->Head(), blockItemTypes, ctx.Expr)) {
return IGraphTransformer::TStatus::Error;
}

auto flowItemTypes = input->Head().GetTypeAnn()->Cast<TFlowExprType>()->GetItemType()->Cast<TMultiExprType>()->GetItems();
YQL_ENSURE(flowItemTypes.size() > 0);

TMaybe<THashSet<ui32>> replicateIndexes;
if (input->ChildrenSize() == 2) {
if (!EnsureTupleOfAtoms(*input->Child(1), ctx.Expr)) {
return IGraphTransformer::TStatus::Error;
}
replicateIndexes.ConstructInPlace();
for (auto& atom : input->Child(1)->ChildrenList()) {
ui32 idx;
if (!TryFromString(atom->Content(), idx)) {
ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(atom->Pos()),
TStringBuilder() << "Expecting integer as replicate index, got: " << atom->Content()));
return IGraphTransformer::TStatus::Error;
}
if (idx >= flowItemTypes.size() - 1) {
ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(atom->Pos()),
TStringBuilder() << "Replicate index too big: " << idx << ", should be less than " << (flowItemTypes.size() - 1)));
return IGraphTransformer::TStatus::Error;
}
if (!replicateIndexes->insert(idx).second) {
ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(atom->Pos()), TStringBuilder() << "Duplicate replicate index " << idx));
return IGraphTransformer::TStatus::Error;
}
if (flowItemTypes[idx]->GetKind() != ETypeAnnotationKind::Scalar) {
ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(atom->Pos()), TStringBuilder() << "Invalid replicate index " << idx << ": input item is not scalar"));
return IGraphTransformer::TStatus::Error;
}
}
}

bool hasScalarsToConvert = false;
size_t inputScalarsCount = 0;
for (size_t i = 0; i + 1 < flowItemTypes.size(); ++i) {
auto& itemType = flowItemTypes[i];
if (itemType->IsScalar()) {
++inputScalarsCount;
if (!replicateIndexes.Defined() || replicateIndexes->contains(i)) {
hasScalarsToConvert = true;
itemType = ctx.Expr.MakeType<TBlockExprType>(itemType->Cast<TScalarExprType>()->GetItemType());
}
}
}

if (!hasScalarsToConvert) {
output = input->HeadPtr();
return IGraphTransformer::TStatus::Repeat;
}

if (replicateIndexes.Defined() && replicateIndexes->size() == inputScalarsCount) {
auto children = input->ChildrenList();
children.resize(1);
output = ctx.Expr.ChangeChildren(*input, std::move(children));
return IGraphTransformer::TStatus::Repeat;
}

input->SetTypeAnn(ctx.Expr.MakeType<TFlowExprType>(ctx.Expr.MakeType<TMultiExprType>(flowItemTypes)));
return IGraphTransformer::TStatus::Ok;
}

IGraphTransformer::TStatus BlockCompressWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) {
Y_UNUSED(output);
if (!EnsureArgsCount(*input, 2U, ctx.Expr)) {
Expand Down
1 change: 1 addition & 0 deletions ydb/library/yql/core/type_ann/type_ann_blocks.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ namespace NTypeAnnImpl {

IGraphTransformer::TStatus AsScalarWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExtContext& ctx);
IGraphTransformer::TStatus ReplicateScalarWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);
IGraphTransformer::TStatus ReplicateScalarsWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);
IGraphTransformer::TStatus BlockCompressWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);
IGraphTransformer::TStatus BlockExpandChunkedWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);
IGraphTransformer::TStatus BlockCoalesceWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);
Expand Down
1 change: 1 addition & 0 deletions ydb/library/yql/core/type_ann/type_ann_core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12202,6 +12202,7 @@ template <NKikimr::NUdf::EDataSlot DataSlot>
Functions["WideSortBlocks"] = &WideSortBlocksWrapper;
Functions["BlockExtend"] = &BlockExtendWrapper;
Functions["BlockOrderedExtend"] = &BlockExtendWrapper;
Functions["ReplicateScalars"] = &ReplicateScalarsWrapper;

Functions["BlockCoalesce"] = &BlockCoalesceWrapper;
Functions["BlockAnd"] = &BlockLogicalWrapper;
Expand Down
1 change: 1 addition & 0 deletions ydb/library/yql/core/yql_expr_constraint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ class TCallableConstraintTransformer : public TCallableTransformerBase<TCallable
Functions["WideToBlocks"] = &TCallableConstraintTransformer::CopyAllFrom<0>;
Functions["WideFromBlocks"] = &TCallableConstraintTransformer::CopyAllFrom<0>;
Functions["BlockExpandChunked"] = &TCallableConstraintTransformer::CopyAllFrom<0>;
Functions["ReplicateScalars"] = &TCallableConstraintTransformer::CopyAllFrom<0>;
Functions["BlockMergeFinalizeHashed"] = &TCallableConstraintTransformer::AggregateWrap<true>;
Functions["BlockMergeManyFinalizeHashed"] = &TCallableConstraintTransformer::AggregateWrap<true>;
}
Expand Down