Skip to content

Commit ecb2de0

Browse files
authored
Merge d5755ab into 4f839b4
2 parents 4f839b4 + d5755ab commit ecb2de0

File tree

13 files changed

+446
-89
lines changed

13 files changed

+446
-89
lines changed

ydb/library/yql/providers/yt/common/yql_yt_settings.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -484,6 +484,7 @@ TYtConfiguration::TYtConfiguration()
484484
});
485485
REGISTER_SETTING(*this, MinColumnGroupSize).Lower(2);
486486
REGISTER_SETTING(*this, MaxColumnGroups);
487+
REGISTER_SETTING(*this, ExtendedStatsMaxChunkCount);
487488
}
488489

489490
EReleaseTempDataMode GetReleaseTempDataMode(const TYtSettings& settings) {

ydb/library/yql/providers/yt/common/yql_yt_settings.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -280,6 +280,7 @@ struct TYtSettings {
280280
NCommon::TConfSetting<EColumnGroupMode, false> ColumnGroupMode;
281281
NCommon::TConfSetting<ui16, false> MinColumnGroupSize;
282282
NCommon::TConfSetting<ui16, false> MaxColumnGroups;
283+
NCommon::TConfSetting<ui64, false> ExtendedStatsMaxChunkCount;
283284
};
284285

285286
EReleaseTempDataMode GetReleaseTempDataMode(const TYtSettings& settings);

ydb/library/yql/providers/yt/gateway/lib/transaction_cache.cpp

Lines changed: 47 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -101,9 +101,9 @@ TMaybe<ui64> TTransactionCache::TEntry::GetColumnarStat(NYT::TRichYPath ytPath)
101101

102102
auto guard = Guard(Lock_);
103103
if (auto p = StatisticsCache.FindPtr(NYT::NodeToCanonicalYsonString(NYT::PathToNode(ytPath), NYT::NYson::EYsonFormat::Text))) {
104-
ui64 sum = p->LegacyChunksDataWeight;
104+
ui64 sum = p->ColumnarStat.LegacyChunksDataWeight;
105105
for (auto& column: columns) {
106-
if (auto c = p->ColumnDataWeight.FindPtr(column)) {
106+
if (auto c = p->ColumnarStat.ColumnDataWeight.FindPtr(column)) {
107107
sum += *c;
108108
} else {
109109
return Nothing();
@@ -114,30 +114,66 @@ TMaybe<ui64> TTransactionCache::TEntry::GetColumnarStat(NYT::TRichYPath ytPath)
114114
return Nothing();
115115
}
116116

117+
TMaybe<NYT::TTableColumnarStatistics> TTransactionCache::TEntry::GetExtendedColumnarStat(NYT::TRichYPath ytPath) const {
118+
TVector<TString> columns(std::move(ytPath.Columns_->Parts_));
119+
ytPath.Columns_.Clear();
120+
auto cacheKey = NYT::NodeToCanonicalYsonString(NYT::PathToNode(ytPath), NYT::NYson::EYsonFormat::Text);
121+
122+
auto guard = Guard(Lock_);
123+
auto p = StatisticsCache.FindPtr(cacheKey);
124+
if (!p) {
125+
return Nothing();
126+
}
127+
128+
NYT::TTableColumnarStatistics res;
129+
for (auto& column: columns) {
130+
if (p->ExtendedStatColumns.count(column) == 0) {
131+
return Nothing();
132+
}
133+
if (auto c = p->ColumnarStat.ColumnDataWeight.FindPtr(column)) {
134+
res.ColumnDataWeight[column] = *c;
135+
}
136+
if (auto c = p->ColumnarStat.ColumnEstimatedUniqueCounts.FindPtr(column)) {
137+
res.ColumnEstimatedUniqueCounts[column] = *c;
138+
}
139+
}
140+
return res;
141+
}
142+
117143
void TTransactionCache::TEntry::UpdateColumnarStat(NYT::TRichYPath ytPath, ui64 size) {
118144
YQL_ENSURE(ytPath.Columns_.Defined());
119145
TVector<TString> columns(std::move(ytPath.Columns_->Parts_));
120146
ytPath.Columns_.Clear();
147+
auto cacheKey = NYT::NodeToCanonicalYsonString(NYT::PathToNode(ytPath), NYT::NYson::EYsonFormat::Text);
121148

122149
auto guard = Guard(Lock_);
123-
NYT::TTableColumnarStatistics& cacheColumnStat = StatisticsCache[NYT::NodeToCanonicalYsonString(NYT::PathToNode(ytPath), NYT::NYson::EYsonFormat::Text)];
124-
cacheColumnStat.LegacyChunksDataWeight = size;
125-
for (auto& c: cacheColumnStat.ColumnDataWeight) {
150+
auto& cacheEntry = StatisticsCache[cacheKey];
151+
cacheEntry.ColumnarStat.LegacyChunksDataWeight = size;
152+
for (auto& c: cacheEntry.ColumnarStat.ColumnDataWeight) {
126153
c.second = 0;
127154
}
128155
for (auto& c: columns) {
129-
cacheColumnStat.ColumnDataWeight[c] = 0;
156+
cacheEntry.ColumnarStat.ColumnDataWeight[c] = 0;
130157
}
131158
}
132159

133-
void TTransactionCache::TEntry::UpdateColumnarStat(NYT::TRichYPath ytPath, const NYT::TTableColumnarStatistics& columnStat) {
160+
void TTransactionCache::TEntry::UpdateColumnarStat(NYT::TRichYPath ytPath, const NYT::TTableColumnarStatistics& columnStat, bool extended) {
161+
TVector<TString> columns(std::move(ytPath.Columns_->Parts_));
134162
ytPath.Columns_.Clear();
135163
auto guard = Guard(Lock_);
136-
NYT::TTableColumnarStatistics& cacheColumnStat = StatisticsCache[NYT::NodeToCanonicalYsonString(NYT::PathToNode(ytPath), NYT::NYson::EYsonFormat::Text)];
137-
cacheColumnStat.LegacyChunksDataWeight = columnStat.LegacyChunksDataWeight;
138-
cacheColumnStat.TimestampTotalWeight = columnStat.TimestampTotalWeight;
164+
auto& cacheEntry = StatisticsCache[NYT::NodeToCanonicalYsonString(NYT::PathToNode(ytPath), NYT::NYson::EYsonFormat::Text)];
165+
if (extended) {
166+
std::copy(columns.begin(), columns.end(), std::inserter(cacheEntry.ExtendedStatColumns, cacheEntry.ExtendedStatColumns.end()));
167+
}
168+
cacheEntry.ColumnarStat.LegacyChunksDataWeight = columnStat.LegacyChunksDataWeight;
169+
cacheEntry.ColumnarStat.TimestampTotalWeight = columnStat.TimestampTotalWeight;
139170
for (auto& c: columnStat.ColumnDataWeight) {
140-
cacheColumnStat.ColumnDataWeight[c.first] = c.second;
171+
cacheEntry.ColumnarStat.ColumnDataWeight[c.first] = c.second;
172+
}
173+
if (extended) {
174+
for (auto& c : columnStat.ColumnEstimatedUniqueCounts) {
175+
cacheEntry.ColumnarStat.ColumnEstimatedUniqueCounts[c.first] = c.second;
176+
}
141177
}
142178
}
143179

ydb/library/yql/providers/yt/gateway/lib/transaction_cache.h

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@ class TTransactionCache {
4747
bool KeepTables = false;
4848
THashMap<std::pair<TString, ui32>, std::tuple<TString, NYT::TTransactionId, ui64>> Snapshots; // {tablepath, epoch} -> {table_id, transaction_id, revision}
4949
NYT::TNode TransactionSpec;
50-
THashMap<TString, NYT::TTableColumnarStatistics> StatisticsCache;
5150
THashMap<TString, TString> BinarySnapshots; // remote path -> snapshot path
5251
NYT::ITransactionPtr BinarySnapshotTx;
5352
THashMap<TString, NYT::ITransactionPtr> CheckpointTxs;
@@ -114,8 +113,10 @@ class TTransactionCache {
114113
void CompleteWriteTx(const NYT::TTransactionId& id, bool abort);
115114

116115
TMaybe<ui64> GetColumnarStat(NYT::TRichYPath ytPath) const;
116+
TMaybe<NYT::TTableColumnarStatistics> GetExtendedColumnarStat(NYT::TRichYPath ytPath) const;
117+
117118
void UpdateColumnarStat(NYT::TRichYPath ytPath, ui64 size);
118-
void UpdateColumnarStat(NYT::TRichYPath ytPath, const NYT::TTableColumnarStatistics& columnStat);
119+
void UpdateColumnarStat(NYT::TRichYPath ytPath, const NYT::TTableColumnarStatistics& columnStat, bool extended = false);
119120

120121
std::pair<TString, NYT::TTransactionId> GetBinarySnapshot(TString remoteTmpFolder, const TString& md5, const TString& localPath, TDuration expirationInterval);
121122

@@ -124,6 +125,13 @@ class TTransactionCache {
124125
using TPtr = TIntrusivePtr<TEntry>;
125126

126127
private:
128+
struct TStatisticsCacheEntry {
129+
std::unordered_set<TString> ExtendedStatColumns;
130+
NYT::TTableColumnarStatistics ColumnarStat;
131+
};
132+
133+
THashMap<TString, TStatisticsCacheEntry> StatisticsCache;
134+
127135
void DeleteAtFinalizeUnlocked(const TString& table, bool isInternal);
128136
bool CancelDeleteAtFinalizeUnlocked(const TString& table, bool isInternal);
129137
void DoRemove(const TString& table);

ydb/library/yql/providers/yt/gateway/native/yql_yt_native.cpp

Lines changed: 64 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,27 @@ inline TType OptionFromNode(const NYT::TNode& value) {
195195
}
196196
}
197197

198+
void PopulatePathStatResult(IYtGateway::TPathStatResult& out, int index, NYT::TTableColumnarStatistics& extendedStat) {
199+
for (const auto& entry : extendedStat.ColumnDataWeight) {
200+
out.DataSize[index] += entry.second;
201+
}
202+
out.Extended[index] = IYtGateway::TPathStatResult::TExtendedResult{
203+
.DataWeight = extendedStat.ColumnDataWeight,
204+
.EstimatedUniqueCounts = extendedStat.ColumnEstimatedUniqueCounts
205+
};
206+
}
207+
208+
TString DebugPath(NYT::TRichYPath path) {
209+
constexpr int maxDebugColumns = 20;
210+
if (!path.Columns_ || std::ssize(path.Columns_->Parts_) <= maxDebugColumns) {
211+
return NYT::NodeToCanonicalYsonString(NYT::PathToNode(path), NYT::NYson::EYsonFormat::Text);
212+
}
213+
int numColumns = std::ssize(path.Columns_->Parts_);
214+
path.Columns_->Parts_.erase(path.Columns_->Parts_.begin() + maxDebugColumns, path.Columns_->Parts_.end());
215+
path.Columns_->Parts_.push_back("...");
216+
return NYT::NodeToCanonicalYsonString(NYT::PathToNode(path), NYT::NYson::EYsonFormat::Text) + " (" + std::to_string(numColumns) + " columns)";
217+
}
218+
198219
} // unnamed
199220

200221
///////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -4495,13 +4516,15 @@ class TYtNativeGateway : public IYtGateway {
44954516
try {
44964517
TPathStatResult res;
44974518
res.DataSize.resize(execCtx->Options_.Paths().size(), 0);
4519+
res.Extended.resize(execCtx->Options_.Paths().size());
44984520

44994521
auto entry = execCtx->GetOrCreateEntry();
45004522
auto tx = entry->Tx;
45014523
const TString tmpFolder = GetTablesTmpFolder(*execCtx->Options_.Config());
45024524
const NYT::EOptimizeForAttr tmpOptimizeFor = execCtx->Options_.Config()->OptimizeFor.Get(execCtx->Cluster_).GetOrElse(NYT::EOptimizeForAttr::OF_LOOKUP_ATTR);
45034525
TVector<NYT::TRichYPath> ytPaths(Reserve(execCtx->Options_.Paths().size()));
45044526
TVector<size_t> pathMap;
4527+
bool extended = execCtx->Options_.Extended();
45054528

45064529
auto extractSysColumns = [] (NYT::TRichYPath& ytPath) -> TVector<TString> {
45074530
TVector<TString> res;
@@ -4545,16 +4568,19 @@ class TYtNativeGateway : public IYtGateway {
45454568
YQL_CLOG(INFO, ProviderYt) << "Adding stat for " << col << ": " << size << " (virtual)";
45464569
}
45474570
}
4548-
if (auto val = entry->GetColumnarStat(ytPath)) {
4549-
res.DataSize[i] += *val;
4550-
YQL_CLOG(INFO, ProviderYt) << "Stat for " << req.Path().Path_ << ": " << res.DataSize[i] << " (from cache)";
4571+
TMaybe<ui64> cachedStat;
4572+
TMaybe<NYT::TTableColumnarStatistics> cachedExtendedStat;
4573+
if (!extended && (cachedStat = entry->GetColumnarStat(ytPath))) {
4574+
res.DataSize[i] += *cachedStat;
4575+
YQL_CLOG(INFO, ProviderYt) << "Stat for " << DebugPath(req.Path()) << ": " << res.DataSize[i] << " (from cache, extended: false)";
4576+
} else if (extended && (cachedExtendedStat = entry->GetExtendedColumnarStat(ytPath))) {
4577+
PopulatePathStatResult(res, i, *cachedExtendedStat);
4578+
YQL_CLOG(INFO, ProviderYt) << "Stat for " << DebugPath(req.Path()) << " (from cache, extended: true)";
45514579
} else if (onlyCached) {
4552-
YQL_CLOG(INFO, ProviderYt) << "Stat for " << req.Path().Path_ << " is missing in cache - sync path stat failed";
4580+
YQL_CLOG(INFO, ProviderYt) << "Stat for " << DebugPath(req.Path()) << " is missing in cache - sync path stat failed (extended: " << extended << ")";
45534581
return res;
4554-
} else if (NYT::EOptimizeForAttr::OF_SCAN_ATTR == tmpOptimizeFor) {
4555-
pathMap.push_back(i);
4556-
ytPaths.push_back(ytPath);
4557-
} else {
4582+
} else if (NYT::EOptimizeForAttr::OF_SCAN_ATTR != tmpOptimizeFor && !extended) {
4583+
45584584
// Use entire table size for lookup tables (YQL-7257)
45594585
if (attrs.IsUndefined()) {
45604586
attrs = tx->Get(ytPath.Path_ + "/@", NYT::TGetOptions().AttributeFilter(
@@ -4566,7 +4592,10 @@ class TYtNativeGateway : public IYtGateway {
45664592
auto size = CalcDataSize(ytPath, attrs);
45674593
res.DataSize[i] += size;
45684594
entry->UpdateColumnarStat(ytPath, size);
4569-
YQL_CLOG(INFO, ProviderYt) << "Stat for " << req.Path().Path_ << ": " << res.DataSize[i] << " (uncompressed_data_size for lookup)";
4595+
YQL_CLOG(INFO, ProviderYt) << "Stat for " << DebugPath(req.Path()) << ": " << res.DataSize[i] << " (uncompressed_data_size for lookup, extended: false)";
4596+
} else {
4597+
ytPaths.push_back(ytPath);
4598+
pathMap.push_back(i);
45704599
}
45714600
} else {
45724601
auto p = entry->Snapshots.FindPtr(std::make_pair(tablePath, req.Epoch()));
@@ -4597,11 +4626,19 @@ class TYtNativeGateway : public IYtGateway {
45974626
YQL_CLOG(INFO, ProviderYt) << "Adding stat for " << col << ": " << size << " (virtual)";
45984627
}
45994628
}
4600-
if (auto val = entry->GetColumnarStat(ytPath)) {
4601-
res.DataSize[i] += *val;
4602-
YQL_CLOG(INFO, ProviderYt) << "Stat for " << req.Path().Path_ << " (epoch=" << req.Epoch() << "): " << res.DataSize[i] << " (from cache)";
4629+
TMaybe<ui64> cachedStat;
4630+
TMaybe<NYT::TTableColumnarStatistics> cachedExtendedStat;
4631+
if (!extended && (cachedStat = entry->GetColumnarStat(ytPath))) {
4632+
res.DataSize[i] += *cachedStat;
4633+
YQL_CLOG(INFO, ProviderYt) << "Stat for " << DebugPath(req.Path()) << " (epoch=" << req.Epoch() << "): " << res.DataSize[i] << " (from cache, extended: false)";
4634+
} else if (extended && (cachedExtendedStat = entry->GetExtendedColumnarStat(ytPath))) {
4635+
PopulatePathStatResult(res, i, *cachedExtendedStat);
4636+
YQL_CLOG(INFO, ProviderYt) << "Stat for " << DebugPath(req.Path()) << " (from cache, extended: true)";
46034637
} else if (onlyCached) {
4604-
YQL_CLOG(INFO, ProviderYt) << "Stat for " << req.Path().Path_ << " (epoch=" << req.Epoch() << ") is missing in cache - sync path stat failed";
4638+
YQL_CLOG(INFO, ProviderYt)
4639+
<< "Stat for " << DebugPath(req.Path())
4640+
<< " (epoch=" << req.Epoch() << ", extended: " << extended
4641+
<< ") is missing in cache - sync path stat failed";
46054642
return res;
46064643
} else {
46074644
if (attrs.IsUndefined()) {
@@ -4613,36 +4650,43 @@ class TYtNativeGateway : public IYtGateway {
46134650
.AddAttribute(TString("schema"))
46144651
));
46154652
}
4616-
if (attrs.HasKey("optimize_for") && attrs["optimize_for"] == "scan" &&
4617-
AllPathColumnsAreInSchema(req.Path(), attrs))
4653+
if (extended ||
4654+
(attrs.HasKey("optimize_for") && attrs["optimize_for"] == "scan" &&
4655+
AllPathColumnsAreInSchema(req.Path(), attrs)))
46184656
{
46194657
pathMap.push_back(i);
46204658
ytPaths.push_back(ytPath);
4621-
YQL_CLOG(INFO, ProviderYt) << "Stat for " << req.Path().Path_ << " (epoch=" << req.Epoch() << ") add for request with path " << ytPath.Path_;
4659+
YQL_CLOG(INFO, ProviderYt) << "Stat for " << DebugPath(req.Path()) << " (epoch=" << req.Epoch() << ") add for request with path " << ytPath.Path_ << " (extended: " << extended << ")";
46224660
} else {
46234661
// Use entire table size for lookup tables (YQL-7257)
46244662
auto size = CalcDataSize(ytPath, attrs);
46254663
res.DataSize[i] += size;
46264664
entry->UpdateColumnarStat(ytPath, size);
4627-
YQL_CLOG(INFO, ProviderYt) << "Stat for " << req.Path().Path_ << " (epoch=" << req.Epoch() << "): " << res.DataSize[i] << " (uncompressed_data_size for lookup)";
4665+
YQL_CLOG(INFO, ProviderYt) << "Stat for " << DebugPath(req.Path()) << " (epoch=" << req.Epoch() << "): " << res.DataSize[i] << " (uncompressed_data_size for lookup)";
46284666
}
46294667
}
46304668
}
46314669
}
46324670

46334671
if (ytPaths) {
46344672
YQL_ENSURE(!onlyCached);
4635-
auto fetchMode = execCtx->Options_.Config()->JoinColumnarStatisticsFetcherMode.Get().GetOrElse(NYT::EColumnarStatisticsFetcherMode::Fallback);
4673+
auto fetchMode = extended ?
4674+
NYT::EColumnarStatisticsFetcherMode::FromNodes :
4675+
execCtx->Options_.Config()->JoinColumnarStatisticsFetcherMode.Get().GetOrElse(NYT::EColumnarStatisticsFetcherMode::Fallback);
46364676
auto columnStats = tx->GetTableColumnarStatistics(ytPaths, NYT::TGetTableColumnarStatisticsOptions().FetcherMode(fetchMode));
46374677
YQL_ENSURE(pathMap.size() == columnStats.size());
4638-
for (size_t i: xrange(columnStats.size())) {
4678+
for (size_t i: xrange(columnStats.size())) {
46394679
auto& columnStat = columnStats[i];
46404680
const ui64 weight = columnStat.LegacyChunksDataWeight +
46414681
Accumulate(columnStat.ColumnDataWeight.begin(), columnStat.ColumnDataWeight.end(), 0ull,
46424682
[](ui64 sum, decltype(*columnStat.ColumnDataWeight.begin())& v) { return sum + v.second; });
46434683

4684+
if (extended) {
4685+
PopulatePathStatResult(res, pathMap[i], columnStat);
4686+
}
4687+
46444688
res.DataSize[pathMap[i]] += weight;
4645-
entry->UpdateColumnarStat(ytPaths[i], columnStat);
4689+
entry->UpdateColumnarStat(ytPaths[i], columnStat, extended);
46464690
YQL_CLOG(INFO, ProviderYt) << "Stat for " << execCtx->Options_.Paths()[pathMap[i]].Path().Path_ << ": " << weight << " (fetched)";
46474691
}
46484692
}

ydb/library/yql/providers/yt/provider/phy_opt/yql_yt_phy_opt_join.cpp

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
#include <ydb/library/yql/core/yql_type_helpers.h>
99
#include <ydb/library/yql/core/yql_opt_utils.h>
10+
#include <ydb/library/yql/providers/common/provider/yql_provider.h>
1011

1112
#include <ydb/library/yql/utils/log/log.h>
1213

@@ -321,6 +322,7 @@ TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::EarlyMergeJoin(TExprBas
321322

322323
TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::RuntimeEquiJoin(TExprBase node, TExprContext& ctx) const {
323324
auto equiJoin = node.Cast<TYtEquiJoin>();
325+
auto cluster = equiJoin.DataSink().Cluster().StringValue();
324326

325327
const bool tryReorder = State_->Types->CostBasedOptimizer != ECostBasedOptimizerType::Disable
326328
&& equiJoin.Input().Size() > 2
@@ -338,10 +340,19 @@ TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::RuntimeEquiJoin(TExprBa
338340
}
339341
}
340342
}
341-
342343
const auto tree = ImportYtEquiJoin(equiJoin, ctx);
344+
345+
const TMaybe<ui64> maxChunkCountExtendedStats = State_->Configuration->ExtendedStatsMaxChunkCount.Get();
346+
347+
if (tryReorder && waitAllInputs && maxChunkCountExtendedStats) {
348+
YQL_CLOG(INFO, ProviderYt) << "Collecting cbo stats for equiJoin";
349+
auto collectStatus = CollectCboStats(cluster, *tree, State_, ctx);
350+
if (collectStatus == TStatus::Repeat) {
351+
return ExportYtEquiJoin(equiJoin, *tree, ctx, State_);
352+
}
353+
}
343354
if (tryReorder) {
344-
const auto optimizedTree = OrderJoins(tree, State_, ctx);
355+
const auto optimizedTree = OrderJoins(tree, State_, cluster, ctx);
345356
if (optimizedTree != tree) {
346357
return ExportYtEquiJoin(equiJoin, *optimizedTree, ctx, State_);
347358
}

0 commit comments

Comments
 (0)