Skip to content

Commit 1703c4b

Browse files
authored
Merge 48f82f8 into ded1634
2 parents ded1634 + 48f82f8 commit 1703c4b

19 files changed

+8200
-7573
lines changed
Lines changed: 298 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,298 @@
1+
2+
#include <ydb/library/yql/providers/yt/provider/yql_yt_transformer.h>
3+
#include <ydb/library/yql/providers/yt/provider/yql_yt_transformer_helper.h>
4+
5+
#include <ydb/library/yql/core/yql_type_helpers.h>
6+
#include <ydb/library/yql/dq/opt/dq_opt.h>
7+
#include <ydb/library/yql/dq/opt/dq_opt_phy.h>
8+
#include <ydb/library/yql/dq/type_ann/dq_type_ann.h>
9+
#include <ydb/library/yql/providers/common/codec/yql_codec_type_flags.h>
10+
#include <ydb/library/yql/providers/common/provider/yql_provider.h>
11+
#include <ydb/library/yql/providers/dq/expr_nodes/dqs_expr_nodes.h>
12+
#include <ydb/library/yql/providers/yt/lib/expr_traits/yql_expr_traits.h>
13+
#include <ydb/library/yql/providers/yt/opt/yql_yt_key_selector.h>
14+
#include <ydb/library/yql/utils/log/log.h>
15+
16+
#include <util/generic/xrange.h>
17+
#include <util/string/type.h>
18+
19+
namespace NYql {
20+
21+
using namespace NPrivate;
22+
23+
TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::TableContentWithSettings(TExprBase node, TExprContext& ctx) const {
24+
auto op = node.Cast<TYtOutputOpBase>();
25+
26+
TExprNode::TPtr res = op.Ptr();
27+
28+
TNodeSet nodesToOptimize;
29+
TProcessedNodesSet processedNodes;
30+
processedNodes.insert(res->Head().UniqueId());
31+
VisitExpr(res, [&nodesToOptimize, &processedNodes](const TExprNode::TPtr& input) {
32+
if (processedNodes.contains(input->UniqueId())) {
33+
return false;
34+
}
35+
36+
if (auto read = TMaybeNode<TYtLength>(input).Input().Maybe<TYtReadTable>()) {
37+
nodesToOptimize.insert(read.Cast().Raw());
38+
return false;
39+
}
40+
41+
if (auto read = TMaybeNode<TYtTableContent>(input).Input().Maybe<TYtReadTable>()) {
42+
nodesToOptimize.insert(read.Cast().Raw());
43+
return false;
44+
}
45+
if (TYtOutput::Match(input.Get())) {
46+
processedNodes.insert(input->UniqueId());
47+
return false;
48+
}
49+
return true;
50+
});
51+
52+
if (nodesToOptimize.empty()) {
53+
return node;
54+
}
55+
56+
TSyncMap syncList;
57+
TOptimizeExprSettings settings(State_->Types);
58+
settings.ProcessedNodes = &processedNodes; // Prevent optimizer to go deeper than current operation
59+
auto status = OptimizeExpr(res, res, [&syncList, &nodesToOptimize, state = State_](const TExprNode::TPtr& input, TExprContext& ctx) -> TExprNode::TPtr {
60+
if (nodesToOptimize.find(input.Get()) != nodesToOptimize.end()) {
61+
return OptimizeReadWithSettings(input, false, true, syncList, state, ctx);
62+
}
63+
return input;
64+
}, ctx, settings);
65+
66+
if (status.Level == IGraphTransformer::TStatus::Error) {
67+
return {};
68+
}
69+
70+
if (status.Level == IGraphTransformer::TStatus::Ok) {
71+
return node;
72+
}
73+
74+
if (!syncList.empty()) {
75+
using TPair = std::pair<TExprNode::TPtr, ui64>;
76+
TVector<TPair> sortedList(syncList.cbegin(), syncList.cend());
77+
TExprNode::TListType syncChildren;
78+
syncChildren.push_back(res->ChildPtr(TYtOutputOpBase::idx_World));
79+
::Sort(sortedList, [](const TPair& x, const TPair& y) { return x.second < y.second; });
80+
for (auto& x: sortedList) {
81+
auto world = ctx.NewCallable(node.Pos(), TCoLeft::CallableName(), { x.first });
82+
syncChildren.push_back(world);
83+
}
84+
85+
res = ctx.ChangeChild(*res, TYtOutputOpBase::idx_World,
86+
ctx.NewCallable(node.Pos(), TCoSync::CallableName(), std::move(syncChildren)));
87+
}
88+
89+
return TExprBase(res);
90+
}
91+
92+
TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::NonOptimalTableContent(TExprBase node, TExprContext& ctx) const {
93+
auto op = node.Cast<TYtOutputOpBase>();
94+
95+
TExprNode::TPtr res = op.Ptr();
96+
97+
TNodeSet nodesToOptimize;
98+
TProcessedNodesSet processedNodes;
99+
processedNodes.insert(res->Head().UniqueId());
100+
VisitExpr(res, [&nodesToOptimize, &processedNodes](const TExprNode::TPtr& input) {
101+
if (processedNodes.contains(input->UniqueId())) {
102+
return false;
103+
}
104+
105+
if (TYtTableContent::Match(input.Get())) {
106+
nodesToOptimize.insert(input.Get());
107+
return false;
108+
}
109+
if (TYtOutput::Match(input.Get())) {
110+
processedNodes.insert(input->UniqueId());
111+
return false;
112+
}
113+
return true;
114+
});
115+
116+
if (nodesToOptimize.empty()) {
117+
return node;
118+
}
119+
120+
TSyncMap syncList;
121+
const auto maxTables = State_->Configuration->TableContentMaxInputTables.Get().GetOrElse(1000);
122+
const auto minChunkSize = State_->Configuration->TableContentMinAvgChunkSize.Get().GetOrElse(1_GB);
123+
const auto maxChunks = State_->Configuration->TableContentMaxChunksForNativeDelivery.Get().GetOrElse(1000ul);
124+
auto state = State_;
125+
auto world = res->ChildPtr(TYtOutputOpBase::idx_World);
126+
TOptimizeExprSettings settings(State_->Types);
127+
settings.ProcessedNodes = &processedNodes; // Prevent optimizer to go deeper than current operation
128+
auto status = OptimizeExpr(res, res, [&syncList, &nodesToOptimize, maxTables, minChunkSize, maxChunks, state, world](const TExprNode::TPtr& input, TExprContext& ctx) -> TExprNode::TPtr {
129+
if (nodesToOptimize.find(input.Get()) != nodesToOptimize.end()) {
130+
if (auto read = TYtTableContent(input).Input().Maybe<TYtReadTable>()) {
131+
bool materialize = false;
132+
const bool singleSection = 1 == read.Cast().Input().Size();
133+
TVector<TYtSection> newSections;
134+
for (auto section: read.Cast().Input()) {
135+
if (NYql::HasAnySetting(section.Settings().Ref(), EYtSettingType::Sample | EYtSettingType::SysColumns)) {
136+
materialize = true;
137+
}
138+
else if (section.Paths().Size() > maxTables) {
139+
materialize = true;
140+
}
141+
else {
142+
TMaybeNode<TYtMerge> oldOp;
143+
if (section.Paths().Size() == 1) {
144+
oldOp = section.Paths().Item(0).Table().Maybe<TYtOutput>().Operation().Maybe<TYtMerge>();
145+
}
146+
if (!oldOp.IsValid() || !NYql::HasSetting(oldOp.Cast().Settings().Ref(), EYtSettingType::CombineChunks)) {
147+
for (auto path: section.Paths()) {
148+
TYtTableBaseInfo::TPtr tableInfo = TYtTableBaseInfo::Parse(path.Table());
149+
if (auto tableStat = tableInfo->Stat) {
150+
if (tableStat->ChunkCount > maxChunks || (tableStat->ChunkCount > 1 && tableStat->DataSize / tableStat->ChunkCount < minChunkSize)) {
151+
materialize = true;
152+
break;
153+
}
154+
}
155+
if (!tableInfo->IsTemp && tableInfo->Meta) {
156+
auto p = tableInfo->Meta->Attrs.FindPtr("erasure_codec");
157+
if (p && *p != "none") {
158+
materialize = true;
159+
break;
160+
}
161+
else if (tableInfo->Meta->IsDynamic) {
162+
materialize = true;
163+
break;
164+
}
165+
}
166+
}
167+
}
168+
}
169+
if (materialize) {
170+
auto path = CopyOrTrivialMap(section.Pos(),
171+
TExprBase(world),
172+
TYtDSink(ctx.RenameNode(read.DataSource().Ref(), "DataSink")),
173+
*section.Ref().GetTypeAnn()->Cast<TListExprType>()->GetItemType(),
174+
Build<TYtSection>(ctx, section.Pos())
175+
.Paths(section.Paths())
176+
.Settings(NYql::RemoveSettings(section.Settings().Ref(), EYtSettingType::Unordered | EYtSettingType::NonUnique, ctx))
177+
.Done(),
178+
{}, ctx, state,
179+
TCopyOrTrivialMapOpts()
180+
.SetTryKeepSortness(!NYql::HasSetting(section.Settings().Ref(), EYtSettingType::Unordered))
181+
.SetSectionUniq(section.Ref().GetConstraint<TDistinctConstraintNode>())
182+
.SetConstraints(section.Ref().GetConstraintSet())
183+
.SetCombineChunks(true)
184+
);
185+
186+
syncList[path.Table().Cast<TYtOutput>().Operation().Ptr()] = syncList.size();
187+
188+
if (singleSection) {
189+
return ctx.ChangeChild(*input, TYtTableContent::idx_Input, path.Table().Ptr());
190+
} else {
191+
newSections.push_back(Build<TYtSection>(ctx, section.Pos())
192+
.Paths()
193+
.Add(path)
194+
.Build()
195+
.Settings().Build()
196+
.Done());
197+
}
198+
199+
} else {
200+
newSections.push_back(section);
201+
}
202+
203+
}
204+
if (materialize) {
205+
auto newRead = Build<TYtReadTable>(ctx, read.Cast().Pos())
206+
.InitFrom(read.Cast())
207+
.Input()
208+
.Add(newSections)
209+
.Build()
210+
.Done();
211+
212+
return ctx.ChangeChild(*input, TYtTableContent::idx_Input, newRead.Ptr());
213+
}
214+
}
215+
else if (auto out = TYtTableContent(input).Input().Maybe<TYtOutput>()) {
216+
auto oldOp = GetOutputOp(out.Cast());
217+
if (!oldOp.Maybe<TYtMerge>() || !NYql::HasSetting(oldOp.Cast<TYtMerge>().Settings().Ref(), EYtSettingType::CombineChunks)) {
218+
auto outTable = GetOutTable(out.Cast());
219+
TYtOutTableInfo tableInfo(outTable);
220+
if (auto tableStat = tableInfo.Stat) {
221+
if (tableStat->ChunkCount > maxChunks || (tableStat->ChunkCount > 1 && tableStat->DataSize / tableStat->ChunkCount < minChunkSize)) {
222+
auto newOp = Build<TYtMerge>(ctx, input->Pos())
223+
.World(world)
224+
.DataSink(oldOp.DataSink())
225+
.Output()
226+
.Add()
227+
.InitFrom(outTable.Cast<TYtOutTable>())
228+
.Name().Value("").Build()
229+
.Stat<TCoVoid>().Build()
230+
.Build()
231+
.Build()
232+
.Input()
233+
.Add()
234+
.Paths()
235+
.Add()
236+
.Table(out.Cast())
237+
.Columns<TCoVoid>().Build()
238+
.Ranges<TCoVoid>().Build()
239+
.Stat<TCoVoid>().Build()
240+
.Build()
241+
.Build()
242+
.Settings<TCoNameValueTupleList>()
243+
.Build()
244+
.Build()
245+
.Build()
246+
.Settings()
247+
.Add()
248+
.Name().Value(ToString(EYtSettingType::CombineChunks)).Build()
249+
.Build()
250+
.Add()
251+
.Name().Value(ToString(EYtSettingType::ForceTransform)).Build()
252+
.Build()
253+
.Build()
254+
.Done();
255+
256+
syncList[newOp.Ptr()] = syncList.size();
257+
258+
auto newOutput = Build<TYtOutput>(ctx, input->Pos())
259+
.Operation(newOp)
260+
.OutIndex().Value(0U).Build()
261+
.Done().Ptr();
262+
263+
return ctx.ChangeChild(*input, TYtTableContent::idx_Input, std::move(newOutput));
264+
}
265+
}
266+
}
267+
}
268+
}
269+
return input;
270+
}, ctx, settings);
271+
272+
if (status.Level == IGraphTransformer::TStatus::Error) {
273+
return {};
274+
}
275+
276+
if (status.Level == IGraphTransformer::TStatus::Ok) {
277+
return node;
278+
}
279+
280+
if (!syncList.empty()) {
281+
using TPair = std::pair<TExprNode::TPtr, ui64>;
282+
TVector<TPair> sortedList(syncList.cbegin(), syncList.cend());
283+
TExprNode::TListType syncChildren;
284+
syncChildren.push_back(res->ChildPtr(TYtOutputOpBase::idx_World));
285+
::Sort(sortedList, [](const TPair& x, const TPair& y) { return x.second < y.second; });
286+
for (auto& x: sortedList) {
287+
auto world = ctx.NewCallable(node.Pos(), TCoLeft::CallableName(), { x.first });
288+
syncChildren.push_back(world);
289+
}
290+
291+
res = ctx.ChangeChild(*res, TYtOutputOpBase::idx_World,
292+
ctx.NewCallable(node.Pos(), TCoSync::CallableName(), std::move(syncChildren)));
293+
}
294+
295+
return TExprBase(res);
296+
}
297+
298+
} // namespace NYql

0 commit comments

Comments
 (0)