Skip to content

Commit ad7fc6e

Browse files
authored
Merge 8f5b59b into 34021cb
2 parents 34021cb + 8f5b59b commit ad7fc6e

File tree

4 files changed

+42
-20
lines changed

4 files changed

+42
-20
lines changed

ydb/library/yql/providers/yt/provider/yql_yt_io_discovery.cpp

Lines changed: 36 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,9 @@ class TYtIODiscoveryTransformer : public TGraphTransformerBase {
191191
else if (key.GetWalkFolderArgs()) {
192192
return ctx.ChangeChild(readNode.Ref(), 2, InitializeWalkFolders(std::move(key), cluster, keyPos, ctx));
193193
}
194+
else if (key.GetWalkFolderImplArgs()) {
195+
PendingWalkFoldersKeys_.insert(key.GetWalkFolderImplArgs()->StateKey);
196+
}
194197
else if (!key.IsAnonymous()) {
195198
if (PendingCanonizations_.insert(std::make_pair(std::make_pair(cluster, key.GetPath()), paths.size())).second) {
196199
paths.push_back(IYtGateway::TCanonizeReq()
@@ -209,13 +212,13 @@ class TYtIODiscoveryTransformer : public TGraphTransformerBase {
209212
PendingCanonizations_.clear();
210213
PendingFolders_.clear();
211214
PendingRanges_.clear();
212-
PendingWalkFolders_.clear();
215+
PendingWalkFoldersKeys_.clear();
213216
YQL_CLOG(INFO, ProviderYt) << "YtIODiscovery - finish, status: " << (TStatus::ELevel)status.Level;
214217
return status;
215218
}
216219

217220
if (PendingRanges_.empty() && PendingFolders_.empty()
218-
&& PendingCanonizations_.empty() && PendingWalkFolders_.empty()) {
221+
&& PendingCanonizations_.empty() && PendingWalkFoldersKeys_.empty()) {
219222
YQL_CLOG(INFO, ProviderYt) << "YtIODiscovery - finish, status: " << (TStatus::ELevel)status.Level;
220223
return status;
221224
}
@@ -344,6 +347,11 @@ class TYtIODiscoveryTransformer : public TGraphTransformerBase {
344347
CanonizeFuture_ = {};
345348
CanonizationRangesFoldersFuture_ = {};
346349

350+
PendingWalkFoldersKeys_.clear();
351+
for (const auto& key : PendingWalkFoldersKeys_) {
352+
State_->WalkFoldersState.erase(key);
353+
}
354+
347355
return TStatus::Error;
348356
}
349357
}
@@ -538,7 +546,7 @@ class TYtIODiscoveryTransformer : public TGraphTransformerBase {
538546
CanonizeFuture_ = {};
539547
CanonizationRangesFoldersFuture_ = {};
540548

541-
if (status == TStatus::Ok && !PendingWalkFolders_.empty()) {
549+
if (status == TStatus::Ok && !PendingWalkFoldersKeys_.empty()) {
542550
const auto walkFoldersStatus = RewriteWalkFoldersOnAsyncOrEvalChanges(output, ctx);
543551
return walkFoldersStatus;
544552
}
@@ -552,6 +560,8 @@ class TYtIODiscoveryTransformer : public TGraphTransformerBase {
552560
PendingRanges_.clear();
553561
PendingFolders_.clear();
554562
PendingCanonizations_.clear();
563+
PendingWalkFoldersKeys_.clear();
564+
555565
CanonizeFuture_ = {};
556566
CanonizationRangesFoldersFuture_ = {};
557567
}
@@ -669,7 +679,8 @@ class TYtIODiscoveryTransformer : public TGraphTransformerBase {
669679
<< args.InitialFolder.Prefix << "`" << " with root attributes cnt: "
670680
<< args.InitialFolder.Attributes.size();
671681
const auto instanceKey = ctx.NextUniqueId;
672-
PendingWalkFolders_.emplace(instanceKey, std::move(walkFolders));
682+
State_->WalkFoldersState.emplace(instanceKey, std::move(walkFolders));
683+
PendingWalkFoldersKeys_.insert(instanceKey);
673684

674685
auto walkFoldersImplNode = Build<TYtWalkFoldersImpl>(ctx, key.GetNode()->Pos())
675686
.ProcessStateKey()
@@ -685,12 +696,9 @@ class TYtIODiscoveryTransformer : public TGraphTransformerBase {
685696
}
686697

687698
TStatus RewriteWalkFoldersOnAsyncOrEvalChanges(TExprNode::TPtr& output, TExprContext& ctx) {
688-
Y_ENSURE(!PendingWalkFolders_.empty());
689-
690-
const auto currImplKey = PendingWalkFolders_.begin()->first;
691699
TStatus walkFoldersStatus = IGraphTransformer::TStatus::Ok;
692700

693-
auto status = VisitInputKeys(output, ctx, [this, &ctx, currImplKey, &walkFoldersStatus] (TYtRead readNode, TYtInputKeys&& keys) -> TExprNode::TPtr {
701+
auto status = VisitInputKeys(output, ctx, [this, &ctx, &walkFoldersStatus] (TYtRead readNode, TYtInputKeys&& keys) -> TExprNode::TPtr {
694702
if (keys.GetType() == TYtKey::EType::WalkFoldersImpl) {
695703
YQL_CLOG(INFO, ProviderYt) << "YtIODiscovery - DoApplyAsyncChanges WalkFoldersImpl handling start";
696704

@@ -699,15 +707,13 @@ class TYtIODiscoveryTransformer : public TGraphTransformerBase {
699707
YQL_CLOG(INFO, ProviderYt) << "Failed to parse WalkFolderImpl args";
700708
return {};
701709
}
702-
703-
const auto instanceKey = parsedKey.GetWalkFolderImplArgs()->StateKey;
704-
if (instanceKey != currImplKey) {
710+
const ui64 instanceKey = parsedKey.GetWalkFolderImplArgs()->StateKey;
711+
if (*PendingWalkFoldersKeys_.begin() != instanceKey) {
705712
return readNode.Ptr();
706713
}
707714

708-
auto walkFoldersInstanceIt = PendingWalkFolders_.find(currImplKey);
709-
YQL_ENSURE(!walkFoldersInstanceIt.IsEnd(), "Failed to find walkFoldersInstance with key: " << instanceKey);
710-
715+
auto walkFoldersInstanceIt = this->State_->WalkFoldersState.find(instanceKey);
716+
YQL_ENSURE(!walkFoldersInstanceIt.IsEnd());
711717
auto& walkFoldersImpl = walkFoldersInstanceIt->second;
712718

713719
Y_ENSURE(walkFoldersImpl.GetAnyOpFuture().HasValue(),
@@ -722,7 +728,8 @@ class TYtIODiscoveryTransformer : public TGraphTransformerBase {
722728

723729
if (walkFoldersImpl.IsFinished()) {
724730
YQL_CLOG(INFO, ProviderYt) << "Building result expr for WalkFolders with key: " << instanceKey;
725-
PendingWalkFolders_.erase(currImplKey);
731+
this->State_->WalkFoldersState.erase(instanceKey);
732+
PendingWalkFoldersKeys_.erase(instanceKey);
726733

727734
auto type = Build<TCoStructType>(ctx, readNode.Pos())
728735
.Add<TExprList>()
@@ -854,12 +861,21 @@ class TYtIODiscoveryTransformer : public TGraphTransformerBase {
854861
}
855862
return res;
856863
}
857-
864+
865+
TWalkFoldersImpl& GetCurrentWalkFoldersInstance() const {
866+
Y_ENSURE(!PendingWalkFoldersKeys_.empty());
867+
const auto key = PendingWalkFoldersKeys_.begin();
868+
auto stateIt = State_->WalkFoldersState.find(*key);
869+
YQL_ENSURE(stateIt != State_->WalkFoldersState.end());
870+
return stateIt->second;
871+
}
872+
858873
TMaybe<NThreading::TFuture<void>> MaybeGetWalkFoldersFuture() const {
859874
// inflight 1
860-
return !PendingWalkFolders_.empty()
861-
? MakeMaybe(PendingWalkFolders_.begin()->second.GetAnyOpFuture())
862-
: Nothing();
875+
if (!PendingWalkFoldersKeys_.empty()) {
876+
return GetCurrentWalkFoldersInstance().GetAnyOpFuture();
877+
}
878+
return Nothing();
863879
}
864880

865881
private:
@@ -868,7 +884,7 @@ class TYtIODiscoveryTransformer : public TGraphTransformerBase {
868884
THashMap<std::pair<TString, TYtKey::TRange>, std::pair<TPosition, NThreading::TFuture<IYtGateway::TTableRangeResult>>> PendingRanges_;
869885
THashMap<std::pair<TString, TYtKey::TFolderList>, std::pair<TPosition, NThreading::TFuture<IYtGateway::TFolderResult>>> PendingFolders_;
870886
THashMap<std::pair<TString, TString>, size_t> PendingCanonizations_; // cluster, original table path -> positions in canon result
871-
THashMap<ui64, TWalkFoldersImpl> PendingWalkFolders_;
887+
TSet<ui64> PendingWalkFoldersKeys_;
872888
NThreading::TFuture<IYtGateway::TCanonizePathsResult> CanonizeFuture_;
873889
NThreading::TFuture<void> CanonizationRangesFoldersFuture_;
874890

ydb/library/yql/providers/yt/provider/yql_yt_io_discovery_walk_folders.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
#pragma once
2+
13
#include <ydb/library/yql/providers/yt/provider/yql_yt_gateway.h>
24
#include <ydb/library/yql/providers/yt/provider/yql_yt_key.h>
35

ydb/library/yql/providers/yt/provider/yql_yt_provider.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -307,6 +307,7 @@ void TYtState::Reset() {
307307
AnonymousLabels.clear();
308308
NodeHash.clear();
309309
Checkpoints.clear();
310+
WalkFoldersState.clear();
310311
NextEpochId = 1;
311312
}
312313

ydb/library/yql/providers/yt/provider/yql_yt_provider.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#include "yql_yt_gateway.h"
44
#include "yql_yt_table_desc.h"
55
#include "yql_yt_table.h"
6+
#include "yql_yt_io_discovery_walk_folders.h"
67

78
#include <ydb/library/yql/providers/yt/common/yql_yt_settings.h>
89
#include <ydb/library/yql/providers/yt/lib/row_spec/yql_row_spec.h>
@@ -108,6 +109,8 @@ struct TYtState : public TThrRefBase {
108109
TDuration TimeSpentInHybrid;
109110
NMonotonic::TMonotonic HybridStartTime;
110111
std::unordered_set<ui32> HybridInFlightOprations;
112+
THashMap<ui64, TWalkFoldersImpl> WalkFoldersState;
113+
111114
private:
112115
std::unordered_map<ui64, TYtVersionedConfiguration::TState> ConfigurationEvalStates_;
113116
std::unordered_map<ui64, ui32> EpochEvalStates_;

0 commit comments

Comments
 (0)