Skip to content

Commit ddd7a49

Browse files
committed
HLL in YT statistics
1 parent a20c225 commit ddd7a49

14 files changed

+443
-98
lines changed

ydb/library/yql/providers/yt/common/yql_yt_settings.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -484,6 +484,7 @@ TYtConfiguration::TYtConfiguration()
484484
});
485485
REGISTER_SETTING(*this, MinColumnGroupSize).Lower(2);
486486
REGISTER_SETTING(*this, MaxColumnGroups);
487+
REGISTER_SETTING(*this, ExtendedStatsMaxChunkCount);
487488
}
488489

489490
EReleaseTempDataMode GetReleaseTempDataMode(const TYtSettings& settings) {

ydb/library/yql/providers/yt/common/yql_yt_settings.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -280,6 +280,7 @@ struct TYtSettings {
280280
NCommon::TConfSetting<EColumnGroupMode, false> ColumnGroupMode;
281281
NCommon::TConfSetting<ui16, false> MinColumnGroupSize;
282282
NCommon::TConfSetting<ui16, false> MaxColumnGroups;
283+
NCommon::TConfSetting<ui64, false> ExtendedStatsMaxChunkCount;
283284
};
284285

285286
EReleaseTempDataMode GetReleaseTempDataMode(const TYtSettings& settings);

ydb/library/yql/providers/yt/gateway/lib/transaction_cache.cpp

Lines changed: 45 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -101,9 +101,9 @@ TMaybe<ui64> TTransactionCache::TEntry::GetColumnarStat(NYT::TRichYPath ytPath)
101101

102102
auto guard = Guard(Lock_);
103103
if (auto p = StatisticsCache.FindPtr(NYT::NodeToCanonicalYsonString(NYT::PathToNode(ytPath), NYT::NYson::EYsonFormat::Text))) {
104-
ui64 sum = p->LegacyChunksDataWeight;
104+
ui64 sum = p->ColumnarStat.LegacyChunksDataWeight;
105105
for (auto& column: columns) {
106-
if (auto c = p->ColumnDataWeight.FindPtr(column)) {
106+
if (auto c = p->ColumnarStat.ColumnDataWeight.FindPtr(column)) {
107107
sum += *c;
108108
} else {
109109
return Nothing();
@@ -114,30 +114,64 @@ TMaybe<ui64> TTransactionCache::TEntry::GetColumnarStat(NYT::TRichYPath ytPath)
114114
return Nothing();
115115
}
116116

117+
TMaybe<NYT::TTableColumnarStatistics> TTransactionCache::TEntry::GetExtendedColumnarStat(NYT::TRichYPath ytPath) const {
118+
TVector<TString> columns(std::move(ytPath.Columns_->Parts_));
119+
ytPath.Columns_.Clear();
120+
auto cacheKey = NYT::NodeToCanonicalYsonString(NYT::PathToNode(ytPath), NYT::NYson::EYsonFormat::Text);
121+
122+
auto guard = Guard(Lock_);
123+
auto p = StatisticsCache.FindPtr(cacheKey);
124+
if (!p) {
125+
return Nothing();
126+
}
127+
128+
NYT::TTableColumnarStatistics res;
129+
for (auto& column: columns) {
130+
if (p->ExtendedStatColumns.count(column) == 0) {
131+
return Nothing();
132+
}
133+
if (auto c = p->ColumnarStat.ColumnDataWeight.FindPtr(column)) {
134+
res.ColumnDataWeight[column] = *c;
135+
}
136+
if (auto c = p->ColumnarStat.ColumnEstimatedUniqueCounts.FindPtr(column)) {
137+
res.ColumnEstimatedUniqueCounts[column] = *c;
138+
}
139+
}
140+
return res;
141+
}
142+
117143
void TTransactionCache::TEntry::UpdateColumnarStat(NYT::TRichYPath ytPath, ui64 size) {
118144
YQL_ENSURE(ytPath.Columns_.Defined());
119145
TVector<TString> columns(std::move(ytPath.Columns_->Parts_));
120146
ytPath.Columns_.Clear();
147+
auto cacheKey = NYT::NodeToCanonicalYsonString(NYT::PathToNode(ytPath), NYT::NYson::EYsonFormat::Text);
121148

122149
auto guard = Guard(Lock_);
123-
NYT::TTableColumnarStatistics& cacheColumnStat = StatisticsCache[NYT::NodeToCanonicalYsonString(NYT::PathToNode(ytPath), NYT::NYson::EYsonFormat::Text)];
124-
cacheColumnStat.LegacyChunksDataWeight = size;
125-
for (auto& c: cacheColumnStat.ColumnDataWeight) {
150+
auto& cacheEntry = StatisticsCache[cacheKey];
151+
cacheEntry.ColumnarStat.LegacyChunksDataWeight = size;
152+
for (auto& c: cacheEntry.ColumnarStat.ColumnDataWeight) {
126153
c.second = 0;
127154
}
128155
for (auto& c: columns) {
129-
cacheColumnStat.ColumnDataWeight[c] = 0;
156+
cacheEntry.ColumnarStat.ColumnDataWeight[c] = 0;
130157
}
131158
}
132159

133-
void TTransactionCache::TEntry::UpdateColumnarStat(NYT::TRichYPath ytPath, const NYT::TTableColumnarStatistics& columnStat) {
160+
void TTransactionCache::TEntry::UpdateColumnarStat(NYT::TRichYPath ytPath, const NYT::TTableColumnarStatistics& columnStat, bool extended) {
161+
TVector<TString> columns(std::move(ytPath.Columns_->Parts_));
134162
ytPath.Columns_.Clear();
135163
auto guard = Guard(Lock_);
136-
NYT::TTableColumnarStatistics& cacheColumnStat = StatisticsCache[NYT::NodeToCanonicalYsonString(NYT::PathToNode(ytPath), NYT::NYson::EYsonFormat::Text)];
137-
cacheColumnStat.LegacyChunksDataWeight = columnStat.LegacyChunksDataWeight;
138-
cacheColumnStat.TimestampTotalWeight = columnStat.TimestampTotalWeight;
164+
auto& cacheEntry = StatisticsCache[NYT::NodeToCanonicalYsonString(NYT::PathToNode(ytPath), NYT::NYson::EYsonFormat::Text)];
165+
if (extended) {
166+
std::copy(columns.begin(), columns.end(), std::inserter(cacheEntry.ExtendedStatColumns, cacheEntry.ExtendedStatColumns.end()));
167+
}
168+
cacheEntry.ColumnarStat.LegacyChunksDataWeight = columnStat.LegacyChunksDataWeight;
169+
cacheEntry.ColumnarStat.TimestampTotalWeight = columnStat.TimestampTotalWeight;
139170
for (auto& c: columnStat.ColumnDataWeight) {
140-
cacheColumnStat.ColumnDataWeight[c.first] = c.second;
171+
cacheEntry.ColumnarStat.ColumnDataWeight[c.first] = c.second;
172+
}
173+
for (auto& c : columnStat.ColumnEstimatedUniqueCounts) {
174+
cacheEntry.ColumnarStat.ColumnEstimatedUniqueCounts[c.first] = c.second;
141175
}
142176
}
143177

ydb/library/yql/providers/yt/gateway/lib/transaction_cache.h

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@ class TTransactionCache {
4747
bool KeepTables = false;
4848
THashMap<std::pair<TString, ui32>, std::tuple<TString, NYT::TTransactionId, ui64>> Snapshots; // {tablepath, epoch} -> {table_id, transaction_id, revision}
4949
NYT::TNode TransactionSpec;
50-
THashMap<TString, NYT::TTableColumnarStatistics> StatisticsCache;
5150
THashMap<TString, TString> BinarySnapshots; // remote path -> snapshot path
5251
NYT::ITransactionPtr BinarySnapshotTx;
5352
THashMap<TString, NYT::ITransactionPtr> CheckpointTxs;
@@ -114,8 +113,10 @@ class TTransactionCache {
114113
void CompleteWriteTx(const NYT::TTransactionId& id, bool abort);
115114

116115
TMaybe<ui64> GetColumnarStat(NYT::TRichYPath ytPath) const;
116+
TMaybe<NYT::TTableColumnarStatistics> GetExtendedColumnarStat(NYT::TRichYPath ytPath) const;
117+
117118
void UpdateColumnarStat(NYT::TRichYPath ytPath, ui64 size);
118-
void UpdateColumnarStat(NYT::TRichYPath ytPath, const NYT::TTableColumnarStatistics& columnStat);
119+
void UpdateColumnarStat(NYT::TRichYPath ytPath, const NYT::TTableColumnarStatistics& columnStat, bool extended = false);
119120

120121
std::pair<TString, NYT::TTransactionId> GetBinarySnapshot(TString remoteTmpFolder, const TString& md5, const TString& localPath, TDuration expirationInterval);
121122

@@ -124,9 +125,17 @@ class TTransactionCache {
124125
using TPtr = TIntrusivePtr<TEntry>;
125126

126127
private:
128+
struct TStatisticsCacheEntry {
129+
std::unordered_set<TString> ExtendedStatColumns;
130+
NYT::TTableColumnarStatistics ColumnarStat;
131+
};
132+
133+
THashMap<TString, TStatisticsCacheEntry> StatisticsCache;
134+
127135
void DeleteAtFinalizeUnlocked(const TString& table, bool isInternal);
128136
bool CancelDeleteAtFinalizeUnlocked(const TString& table, bool isInternal);
129137
void DoRemove(const TString& table);
138+
void UpdateColumnarStatLocked(NYT::TTableColumnarStatistics& cacheColumnStat, const NYT::TTableColumnarStatistics& columnStat);
130139

131140
size_t ExternalTempTablesCount = 0;
132141
};

ydb/library/yql/providers/yt/gateway/native/yql_yt_native.cpp

Lines changed: 64 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,27 @@ inline TType OptionFromNode(const NYT::TNode& value) {
195195
}
196196
}
197197

198+
void PopulatePathStatResult(IYtGateway::TPathStatResult& out, int index, NYT::TTableColumnarStatistics& extendedStat) {
199+
for (const auto& entry : extendedStat.ColumnDataWeight) {
200+
out.DataSize[index] += entry.second;
201+
}
202+
out.Extended[index] = IYtGateway::TPathStatResult::TExtendedResult{
203+
.DataWeight = extendedStat.ColumnDataWeight,
204+
.EstimatedUniqueCounts = extendedStat.ColumnEstimatedUniqueCounts
205+
};
206+
}
207+
208+
TString DebugPath(NYT::TRichYPath path) {
209+
constexpr int maxDebugColumns = 20;
210+
if (!path.Columns_ || std::ssize(path.Columns_->Parts_) <= maxDebugColumns) {
211+
return NYT::NodeToCanonicalYsonString(NYT::PathToNode(path), NYT::NYson::EYsonFormat::Text);
212+
}
213+
int numColumns = std::ssize(path.Columns_->Parts_);
214+
path.Columns_->Parts_.erase(path.Columns_->Parts_.begin() + maxDebugColumns, path.Columns_->Parts_.end());
215+
path.Columns_->Parts_.push_back("...");
216+
return NYT::NodeToCanonicalYsonString(NYT::PathToNode(path), NYT::NYson::EYsonFormat::Text) + " (" + std::to_string(numColumns) + " columns)";
217+
}
218+
198219
} // unnamed
199220

200221
///////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -4505,13 +4526,15 @@ class TYtNativeGateway : public IYtGateway {
45054526
try {
45064527
TPathStatResult res;
45074528
res.DataSize.resize(execCtx->Options_.Paths().size(), 0);
4529+
res.Extended.resize(execCtx->Options_.Paths().size());
45084530

45094531
auto entry = execCtx->GetOrCreateEntry();
45104532
auto tx = entry->Tx;
45114533
const TString tmpFolder = GetTablesTmpFolder(*execCtx->Options_.Config());
45124534
const NYT::EOptimizeForAttr tmpOptimizeFor = execCtx->Options_.Config()->OptimizeFor.Get(execCtx->Cluster_).GetOrElse(NYT::EOptimizeForAttr::OF_LOOKUP_ATTR);
45134535
TVector<NYT::TRichYPath> ytPaths(Reserve(execCtx->Options_.Paths().size()));
45144536
TVector<size_t> pathMap;
4537+
bool extended = execCtx->Options_.Extended();
45154538

45164539
auto extractSysColumns = [] (NYT::TRichYPath& ytPath) -> TVector<TString> {
45174540
TVector<TString> res;
@@ -4555,16 +4578,19 @@ class TYtNativeGateway : public IYtGateway {
45554578
YQL_CLOG(INFO, ProviderYt) << "Adding stat for " << col << ": " << size << " (virtual)";
45564579
}
45574580
}
4558-
if (auto val = entry->GetColumnarStat(ytPath)) {
4559-
res.DataSize[i] += *val;
4560-
YQL_CLOG(INFO, ProviderYt) << "Stat for " << req.Path().Path_ << ": " << res.DataSize[i] << " (from cache)";
4581+
TMaybe<ui64> cachedStat;
4582+
TMaybe<NYT::TTableColumnarStatistics> cachedExtendedStat;
4583+
if (!extended && (cachedStat = entry->GetColumnarStat(ytPath))) {
4584+
res.DataSize[i] += *cachedStat;
4585+
YQL_CLOG(INFO, ProviderYt) << "Stat for " << DebugPath(req.Path()) << ": " << res.DataSize[i] << " (from cache, extended: false)";
4586+
} else if (extended && (cachedExtendedStat = entry->GetExtendedColumnarStat(ytPath))) {
4587+
PopulatePathStatResult(res, i, *cachedExtendedStat);
4588+
YQL_CLOG(INFO, ProviderYt) << "Stat for " << DebugPath(req.Path()) << " (from cache, extended: true)";
45614589
} else if (onlyCached) {
4562-
YQL_CLOG(INFO, ProviderYt) << "Stat for " << req.Path().Path_ << " is missing in cache - sync path stat failed";
4590+
YQL_CLOG(INFO, ProviderYt) << "Stat for " << DebugPath(req.Path()) << " is missing in cache - sync path stat failed (extended: " << extended << ")";
45634591
return res;
4564-
} else if (NYT::EOptimizeForAttr::OF_SCAN_ATTR == tmpOptimizeFor) {
4565-
pathMap.push_back(i);
4566-
ytPaths.push_back(ytPath);
4567-
} else {
4592+
} else if (NYT::EOptimizeForAttr::OF_SCAN_ATTR != tmpOptimizeFor && !extended) {
4593+
45684594
// Use entire table size for lookup tables (YQL-7257)
45694595
if (attrs.IsUndefined()) {
45704596
attrs = tx->Get(ytPath.Path_ + "/@", NYT::TGetOptions().AttributeFilter(
@@ -4576,7 +4602,10 @@ class TYtNativeGateway : public IYtGateway {
45764602
auto size = CalcDataSize(ytPath, attrs);
45774603
res.DataSize[i] += size;
45784604
entry->UpdateColumnarStat(ytPath, size);
4579-
YQL_CLOG(INFO, ProviderYt) << "Stat for " << req.Path().Path_ << ": " << res.DataSize[i] << " (uncompressed_data_size for lookup)";
4605+
YQL_CLOG(INFO, ProviderYt) << "Stat for " << DebugPath(req.Path()) << ": " << res.DataSize[i] << " (uncompressed_data_size for lookup, extended: false)";
4606+
} else {
4607+
ytPaths.push_back(ytPath);
4608+
pathMap.push_back(i);
45804609
}
45814610
} else {
45824611
auto p = entry->Snapshots.FindPtr(std::make_pair(tablePath, req.Epoch()));
@@ -4607,11 +4636,19 @@ class TYtNativeGateway : public IYtGateway {
46074636
YQL_CLOG(INFO, ProviderYt) << "Adding stat for " << col << ": " << size << " (virtual)";
46084637
}
46094638
}
4610-
if (auto val = entry->GetColumnarStat(ytPath)) {
4611-
res.DataSize[i] += *val;
4612-
YQL_CLOG(INFO, ProviderYt) << "Stat for " << req.Path().Path_ << " (epoch=" << req.Epoch() << "): " << res.DataSize[i] << " (from cache)";
4639+
TMaybe<ui64> cachedStat;
4640+
TMaybe<NYT::TTableColumnarStatistics> cachedExtendedStat;
4641+
if (!extended && (cachedStat = entry->GetColumnarStat(ytPath))) {
4642+
res.DataSize[i] += *cachedStat;
4643+
YQL_CLOG(INFO, ProviderYt) << "Stat for " << DebugPath(req.Path()) << " (epoch=" << req.Epoch() << "): " << res.DataSize[i] << " (from cache, extended: false)";
4644+
} else if (extended && (cachedExtendedStat = entry->GetExtendedColumnarStat(ytPath))) {
4645+
PopulatePathStatResult(res, i, *cachedExtendedStat);
4646+
YQL_CLOG(INFO, ProviderYt) << "Stat for " << DebugPath(req.Path()) << " (from cache, extended: true)";
46134647
} else if (onlyCached) {
4614-
YQL_CLOG(INFO, ProviderYt) << "Stat for " << req.Path().Path_ << " (epoch=" << req.Epoch() << ") is missing in cache - sync path stat failed";
4648+
YQL_CLOG(INFO, ProviderYt)
4649+
<< "Stat for " << DebugPath(req.Path())
4650+
<< " (epoch=" << req.Epoch() << ", extended: " << extended
4651+
<< ") is missing in cache - sync path stat failed";
46154652
return res;
46164653
} else {
46174654
if (attrs.IsUndefined()) {
@@ -4623,40 +4660,46 @@ class TYtNativeGateway : public IYtGateway {
46234660
.AddAttribute(TString("schema"))
46244661
));
46254662
}
4626-
if (attrs.HasKey("optimize_for") && attrs["optimize_for"] == "scan" &&
4627-
AllPathColumnsAreInSchema(req.Path(), attrs))
4663+
if (extended ||
4664+
(attrs.HasKey("optimize_for") && attrs["optimize_for"] == "scan" &&
4665+
AllPathColumnsAreInSchema(req.Path(), attrs)))
46284666
{
46294667
pathMap.push_back(i);
46304668
ytPaths.push_back(ytPath);
4631-
YQL_CLOG(INFO, ProviderYt) << "Stat for " << req.Path().Path_ << " (epoch=" << req.Epoch() << ") add for request with path " << ytPath.Path_;
4669+
YQL_CLOG(INFO, ProviderYt) << "Stat for " << DebugPath(req.Path()) << " (epoch=" << req.Epoch() << ") add for request with path " << ytPath.Path_ << " (extended: " << extended << ")";
46324670
} else {
46334671
// Use entire table size for lookup tables (YQL-7257)
46344672
auto size = CalcDataSize(ytPath, attrs);
46354673
res.DataSize[i] += size;
46364674
entry->UpdateColumnarStat(ytPath, size);
4637-
YQL_CLOG(INFO, ProviderYt) << "Stat for " << req.Path().Path_ << " (epoch=" << req.Epoch() << "): " << res.DataSize[i] << " (uncompressed_data_size for lookup)";
4675+
YQL_CLOG(INFO, ProviderYt) << "Stat for " << DebugPath(req.Path()) << " (epoch=" << req.Epoch() << "): " << res.DataSize[i] << " (uncompressed_data_size for lookup)";
46384676
}
46394677
}
46404678
}
46414679
}
46424680

46434681
if (ytPaths) {
46444682
YQL_ENSURE(!onlyCached);
4645-
auto fetchMode = execCtx->Options_.Config()->JoinColumnarStatisticsFetcherMode.Get().GetOrElse(NYT::EColumnarStatisticsFetcherMode::Fallback);
4683+
auto fetchMode = extended ?
4684+
NYT::EColumnarStatisticsFetcherMode::FromNodes :
4685+
execCtx->Options_.Config()->JoinColumnarStatisticsFetcherMode.Get().GetOrElse(NYT::EColumnarStatisticsFetcherMode::Fallback);
46464686
auto columnStats = tx->GetTableColumnarStatistics(ytPaths, NYT::TGetTableColumnarStatisticsOptions().FetcherMode(fetchMode));
46474687
YQL_ENSURE(pathMap.size() == columnStats.size());
4648-
for (size_t i: xrange(columnStats.size())) {
4688+
for (size_t i: xrange(columnStats.size())) {
46494689
auto& columnStat = columnStats[i];
46504690
const ui64 weight = columnStat.LegacyChunksDataWeight +
46514691
Accumulate(columnStat.ColumnDataWeight.begin(), columnStat.ColumnDataWeight.end(), 0ull,
46524692
[](ui64 sum, decltype(*columnStat.ColumnDataWeight.begin())& v) { return sum + v.second; });
46534693

4694+
if (extended) {
4695+
PopulatePathStatResult(res, pathMap[i], columnStat);
4696+
}
4697+
46544698
res.DataSize[pathMap[i]] += weight;
4655-
entry->UpdateColumnarStat(ytPaths[i], columnStat);
4699+
entry->UpdateColumnarStat(ytPaths[i], columnStat, extended);
46564700
YQL_CLOG(INFO, ProviderYt) << "Stat for " << execCtx->Options_.Paths()[pathMap[i]].Path().Path_ << ": " << weight << " (fetched)";
46574701
}
46584702
}
4659-
46604703
res.SetSuccess();
46614704
return res;
46624705
} catch (...) {

ydb/library/yql/providers/yt/gateway/native/yql_yt_transform.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
#include "yql_yt_transform.h"
2+
#include <ydb/library/yql/providers/yt/gateway/lib/yt_helpers.h>
23

34
#include <ydb/library/yql/providers/yt/lib/skiff/yql_skiff_schema.h>
45
#include <ydb/library/yql/providers/yt/common/yql_names.h>
56
#include <ydb/library/yql/providers/yt/common/yql_configuration.h>
67
#include <ydb/library/yql/providers/common/provider/yql_provider.h>
78
#include <ydb/library/yql/providers/yt/codec/yt_codec.h>
8-
#include <ydb/library/yql/providers/yt/gateway/lib/yt_helpers.h>
99
#include <ydb/library/yql/providers/yt/expr_nodes/yql_yt_expr_nodes.h>
1010
#include <ydb/library/yql/providers/common/codec/yql_codec_type_flags.h>
1111

ydb/library/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_join.cpp

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
#include <ydb/library/yql/core/yql_type_helpers.h>
99
#include <ydb/library/yql/core/yql_opt_utils.h>
10+
#include <ydb/library/yql/providers/common/provider/yql_provider.h>
1011

1112
#include <ydb/library/yql/utils/log/log.h>
1213

@@ -321,6 +322,7 @@ TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::EarlyMergeJoin(TExprBas
321322

322323
TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::RuntimeEquiJoin(TExprBase node, TExprContext& ctx) const {
323324
auto equiJoin = node.Cast<TYtEquiJoin>();
325+
auto cluster = equiJoin.DataSink().Cluster().StringValue();
324326

325327
const bool tryReorder = State_->Types->CostBasedOptimizer != ECostBasedOptimizerType::Disable
326328
&& equiJoin.Input().Size() > 2
@@ -338,10 +340,19 @@ TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::RuntimeEquiJoin(TExprBa
338340
}
339341
}
340342
}
341-
342343
const auto tree = ImportYtEquiJoin(equiJoin, ctx);
344+
345+
const TMaybe<ui64> maxChunkCountExtendedStats = State_->Configuration->ExtendedStatsMaxChunkCount.Get();
346+
347+
if (tryReorder && waitAllInputs && maxChunkCountExtendedStats) {
348+
YQL_CLOG(INFO, ProviderYt) << "Collecting cbo stats for equiJoin";
349+
auto collectStatus = CollectCboStats(cluster, *tree, State_, ctx);
350+
if (collectStatus == TStatus::Repeat) {
351+
return ExportYtEquiJoin(equiJoin, *tree, ctx, State_);
352+
}
353+
}
343354
if (tryReorder) {
344-
const auto optimizedTree = OrderJoins(tree, State_, ctx);
355+
const auto optimizedTree = OrderJoins(tree, State_, cluster, ctx);
345356
if (optimizedTree != tree) {
346357
return ExportYtEquiJoin(equiJoin, *optimizedTree, ctx, State_);
347358
}

0 commit comments

Comments
 (0)