Skip to content

Commit 9a3409c

Browse files
authored
YQL-18247: Move WalkFolders state to YtState from transformer (#4410)
1 parent 04c06c9 commit 9a3409c

File tree

4 files changed

+47
-20
lines changed

4 files changed

+47
-20
lines changed

ydb/library/yql/providers/yt/provider/yql_yt_io_discovery.cpp

Lines changed: 41 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,9 @@ class TYtIODiscoveryTransformer : public TGraphTransformerBase {
191191
else if (key.GetWalkFolderArgs()) {
192192
return ctx.ChangeChild(readNode.Ref(), 2, InitializeWalkFolders(std::move(key), cluster, keyPos, ctx));
193193
}
194+
else if (key.GetWalkFolderImplArgs()) {
195+
PendingWalkFoldersKeys_.insert(key.GetWalkFolderImplArgs()->StateKey);
196+
}
194197
else if (!key.IsAnonymous()) {
195198
if (PendingCanonizations_.insert(std::make_pair(std::make_pair(cluster, key.GetPath()), paths.size())).second) {
196199
paths.push_back(IYtGateway::TCanonizeReq()
@@ -209,13 +212,18 @@ class TYtIODiscoveryTransformer : public TGraphTransformerBase {
209212
PendingCanonizations_.clear();
210213
PendingFolders_.clear();
211214
PendingRanges_.clear();
212-
PendingWalkFolders_.clear();
215+
216+
for (const auto& key : PendingWalkFoldersKeys_) {
217+
State_->WalkFoldersState.erase(key);
218+
}
219+
PendingWalkFoldersKeys_.clear();
220+
213221
YQL_CLOG(INFO, ProviderYt) << "YtIODiscovery - finish, status: " << (TStatus::ELevel)status.Level;
214222
return status;
215223
}
216224

217225
if (PendingRanges_.empty() && PendingFolders_.empty()
218-
&& PendingCanonizations_.empty() && PendingWalkFolders_.empty()) {
226+
&& PendingCanonizations_.empty() && PendingWalkFoldersKeys_.empty()) {
219227
YQL_CLOG(INFO, ProviderYt) << "YtIODiscovery - finish, status: " << (TStatus::ELevel)status.Level;
220228
return status;
221229
}
@@ -344,6 +352,11 @@ class TYtIODiscoveryTransformer : public TGraphTransformerBase {
344352
CanonizeFuture_ = {};
345353
CanonizationRangesFoldersFuture_ = {};
346354

355+
for (const auto& key : PendingWalkFoldersKeys_) {
356+
State_->WalkFoldersState.erase(key);
357+
}
358+
PendingWalkFoldersKeys_.clear();
359+
347360
return TStatus::Error;
348361
}
349362
}
@@ -538,7 +551,7 @@ class TYtIODiscoveryTransformer : public TGraphTransformerBase {
538551
CanonizeFuture_ = {};
539552
CanonizationRangesFoldersFuture_ = {};
540553

541-
if (status == TStatus::Ok && !PendingWalkFolders_.empty()) {
554+
if (status == TStatus::Ok && !PendingWalkFoldersKeys_.empty()) {
542555
const auto walkFoldersStatus = RewriteWalkFoldersOnAsyncOrEvalChanges(output, ctx);
543556
return walkFoldersStatus;
544557
}
@@ -552,6 +565,8 @@ class TYtIODiscoveryTransformer : public TGraphTransformerBase {
552565
PendingRanges_.clear();
553566
PendingFolders_.clear();
554567
PendingCanonizations_.clear();
568+
PendingWalkFoldersKeys_.clear();
569+
555570
CanonizeFuture_ = {};
556571
CanonizationRangesFoldersFuture_ = {};
557572
}
@@ -669,7 +684,8 @@ class TYtIODiscoveryTransformer : public TGraphTransformerBase {
669684
<< args.InitialFolder.Prefix << "`" << " with root attributes cnt: "
670685
<< args.InitialFolder.Attributes.size();
671686
const auto instanceKey = ctx.NextUniqueId;
672-
PendingWalkFolders_.emplace(instanceKey, std::move(walkFolders));
687+
State_->WalkFoldersState.emplace(instanceKey, std::move(walkFolders));
688+
PendingWalkFoldersKeys_.insert(instanceKey);
673689

674690
auto walkFoldersImplNode = Build<TYtWalkFoldersImpl>(ctx, key.GetNode()->Pos())
675691
.ProcessStateKey()
@@ -685,12 +701,9 @@ class TYtIODiscoveryTransformer : public TGraphTransformerBase {
685701
}
686702

687703
TStatus RewriteWalkFoldersOnAsyncOrEvalChanges(TExprNode::TPtr& output, TExprContext& ctx) {
688-
Y_ENSURE(!PendingWalkFolders_.empty());
689-
690-
const auto currImplKey = PendingWalkFolders_.begin()->first;
691704
TStatus walkFoldersStatus = IGraphTransformer::TStatus::Ok;
692705

693-
auto status = VisitInputKeys(output, ctx, [this, &ctx, currImplKey, &walkFoldersStatus] (TYtRead readNode, TYtInputKeys&& keys) -> TExprNode::TPtr {
706+
auto status = VisitInputKeys(output, ctx, [this, &ctx, &walkFoldersStatus] (TYtRead readNode, TYtInputKeys&& keys) -> TExprNode::TPtr {
694707
if (keys.GetType() == TYtKey::EType::WalkFoldersImpl) {
695708
YQL_CLOG(INFO, ProviderYt) << "YtIODiscovery - DoApplyAsyncChanges WalkFoldersImpl handling start";
696709

@@ -699,15 +712,13 @@ class TYtIODiscoveryTransformer : public TGraphTransformerBase {
699712
YQL_CLOG(INFO, ProviderYt) << "Failed to parse WalkFolderImpl args";
700713
return {};
701714
}
702-
703-
const auto instanceKey = parsedKey.GetWalkFolderImplArgs()->StateKey;
704-
if (instanceKey != currImplKey) {
715+
const ui64 instanceKey = parsedKey.GetWalkFolderImplArgs()->StateKey;
716+
if (*PendingWalkFoldersKeys_.begin() != instanceKey) {
705717
return readNode.Ptr();
706718
}
707719

708-
auto walkFoldersInstanceIt = PendingWalkFolders_.find(currImplKey);
709-
YQL_ENSURE(!walkFoldersInstanceIt.IsEnd(), "Failed to find walkFoldersInstance with key: " << instanceKey);
710-
720+
auto walkFoldersInstanceIt = this->State_->WalkFoldersState.find(instanceKey);
721+
YQL_ENSURE(!walkFoldersInstanceIt.IsEnd());
711722
auto& walkFoldersImpl = walkFoldersInstanceIt->second;
712723

713724
Y_ENSURE(walkFoldersImpl.GetAnyOpFuture().HasValue(),
@@ -722,7 +733,8 @@ class TYtIODiscoveryTransformer : public TGraphTransformerBase {
722733

723734
if (walkFoldersImpl.IsFinished()) {
724735
YQL_CLOG(INFO, ProviderYt) << "Building result expr for WalkFolders with key: " << instanceKey;
725-
PendingWalkFolders_.erase(currImplKey);
736+
this->State_->WalkFoldersState.erase(instanceKey);
737+
PendingWalkFoldersKeys_.erase(instanceKey);
726738

727739
auto type = Build<TCoStructType>(ctx, readNode.Pos())
728740
.Add<TExprList>()
@@ -854,12 +866,21 @@ class TYtIODiscoveryTransformer : public TGraphTransformerBase {
854866
}
855867
return res;
856868
}
857-
869+
870+
TWalkFoldersImpl& GetCurrentWalkFoldersInstance() const {
871+
Y_ENSURE(!PendingWalkFoldersKeys_.empty());
872+
const auto key = PendingWalkFoldersKeys_.begin();
873+
auto stateIt = State_->WalkFoldersState.find(*key);
874+
YQL_ENSURE(stateIt != State_->WalkFoldersState.end());
875+
return stateIt->second;
876+
}
877+
858878
TMaybe<NThreading::TFuture<void>> MaybeGetWalkFoldersFuture() const {
859879
// inflight 1
860-
return !PendingWalkFolders_.empty()
861-
? MakeMaybe(PendingWalkFolders_.begin()->second.GetAnyOpFuture())
862-
: Nothing();
880+
if (!PendingWalkFoldersKeys_.empty()) {
881+
return GetCurrentWalkFoldersInstance().GetAnyOpFuture();
882+
}
883+
return Nothing();
863884
}
864885

865886
private:
@@ -868,7 +889,7 @@ class TYtIODiscoveryTransformer : public TGraphTransformerBase {
868889
THashMap<std::pair<TString, TYtKey::TRange>, std::pair<TPosition, NThreading::TFuture<IYtGateway::TTableRangeResult>>> PendingRanges_;
869890
THashMap<std::pair<TString, TYtKey::TFolderList>, std::pair<TPosition, NThreading::TFuture<IYtGateway::TFolderResult>>> PendingFolders_;
870891
THashMap<std::pair<TString, TString>, size_t> PendingCanonizations_; // cluster, original table path -> positions in canon result
871-
THashMap<ui64, TWalkFoldersImpl> PendingWalkFolders_;
892+
TSet<ui64> PendingWalkFoldersKeys_;
872893
NThreading::TFuture<IYtGateway::TCanonizePathsResult> CanonizeFuture_;
873894
NThreading::TFuture<void> CanonizationRangesFoldersFuture_;
874895

ydb/library/yql/providers/yt/provider/yql_yt_io_discovery_walk_folders.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
#pragma once
2+
13
#include <ydb/library/yql/providers/yt/provider/yql_yt_gateway.h>
24
#include <ydb/library/yql/providers/yt/provider/yql_yt_key.h>
35

ydb/library/yql/providers/yt/provider/yql_yt_provider.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -307,6 +307,7 @@ void TYtState::Reset() {
307307
AnonymousLabels.clear();
308308
NodeHash.clear();
309309
Checkpoints.clear();
310+
WalkFoldersState.clear();
310311
NextEpochId = 1;
311312
}
312313

ydb/library/yql/providers/yt/provider/yql_yt_provider.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#include "yql_yt_gateway.h"
44
#include "yql_yt_table_desc.h"
55
#include "yql_yt_table.h"
6+
#include "yql_yt_io_discovery_walk_folders.h"
67

78
#include <ydb/library/yql/providers/yt/common/yql_yt_settings.h>
89
#include <ydb/library/yql/providers/yt/lib/row_spec/yql_row_spec.h>
@@ -108,6 +109,8 @@ struct TYtState : public TThrRefBase {
108109
TDuration TimeSpentInHybrid;
109110
NMonotonic::TMonotonic HybridStartTime;
110111
std::unordered_set<ui32> HybridInFlightOprations;
112+
THashMap<ui64, TWalkFoldersImpl> WalkFoldersState;
113+
111114
private:
112115
std::unordered_map<ui64, TYtVersionedConfiguration::TState> ConfigurationEvalStates_;
113116
std::unordered_map<ui64, ui32> EpochEvalStates_;

0 commit comments

Comments
 (0)