Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 41 additions & 20 deletions ydb/library/yql/providers/yt/provider/yql_yt_io_discovery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,9 @@ class TYtIODiscoveryTransformer : public TGraphTransformerBase {
else if (key.GetWalkFolderArgs()) {
return ctx.ChangeChild(readNode.Ref(), 2, InitializeWalkFolders(std::move(key), cluster, keyPos, ctx));
}
else if (key.GetWalkFolderImplArgs()) {
PendingWalkFoldersKeys_.insert(key.GetWalkFolderImplArgs()->StateKey);
}
else if (!key.IsAnonymous()) {
if (PendingCanonizations_.insert(std::make_pair(std::make_pair(cluster, key.GetPath()), paths.size())).second) {
paths.push_back(IYtGateway::TCanonizeReq()
Expand All @@ -209,13 +212,18 @@ class TYtIODiscoveryTransformer : public TGraphTransformerBase {
PendingCanonizations_.clear();
PendingFolders_.clear();
PendingRanges_.clear();
PendingWalkFolders_.clear();

for (const auto& key : PendingWalkFoldersKeys_) {
State_->WalkFoldersState.erase(key);
}
PendingWalkFoldersKeys_.clear();

YQL_CLOG(INFO, ProviderYt) << "YtIODiscovery - finish, status: " << (TStatus::ELevel)status.Level;
return status;
}

if (PendingRanges_.empty() && PendingFolders_.empty()
&& PendingCanonizations_.empty() && PendingWalkFolders_.empty()) {
&& PendingCanonizations_.empty() && PendingWalkFoldersKeys_.empty()) {
YQL_CLOG(INFO, ProviderYt) << "YtIODiscovery - finish, status: " << (TStatus::ELevel)status.Level;
return status;
}
Expand Down Expand Up @@ -344,6 +352,11 @@ class TYtIODiscoveryTransformer : public TGraphTransformerBase {
CanonizeFuture_ = {};
CanonizationRangesFoldersFuture_ = {};

for (const auto& key : PendingWalkFoldersKeys_) {
State_->WalkFoldersState.erase(key);
}
PendingWalkFoldersKeys_.clear();

return TStatus::Error;
}
}
Expand Down Expand Up @@ -538,7 +551,7 @@ class TYtIODiscoveryTransformer : public TGraphTransformerBase {
CanonizeFuture_ = {};
CanonizationRangesFoldersFuture_ = {};

if (status == TStatus::Ok && !PendingWalkFolders_.empty()) {
if (status == TStatus::Ok && !PendingWalkFoldersKeys_.empty()) {
const auto walkFoldersStatus = RewriteWalkFoldersOnAsyncOrEvalChanges(output, ctx);
return walkFoldersStatus;
}
Expand All @@ -552,6 +565,8 @@ class TYtIODiscoveryTransformer : public TGraphTransformerBase {
PendingRanges_.clear();
PendingFolders_.clear();
PendingCanonizations_.clear();
PendingWalkFoldersKeys_.clear();

CanonizeFuture_ = {};
CanonizationRangesFoldersFuture_ = {};
}
Expand Down Expand Up @@ -669,7 +684,8 @@ class TYtIODiscoveryTransformer : public TGraphTransformerBase {
<< args.InitialFolder.Prefix << "`" << " with root attributes cnt: "
<< args.InitialFolder.Attributes.size();
const auto instanceKey = ctx.NextUniqueId;
PendingWalkFolders_.emplace(instanceKey, std::move(walkFolders));
State_->WalkFoldersState.emplace(instanceKey, std::move(walkFolders));
PendingWalkFoldersKeys_.insert(instanceKey);

auto walkFoldersImplNode = Build<TYtWalkFoldersImpl>(ctx, key.GetNode()->Pos())
.ProcessStateKey()
Expand All @@ -685,12 +701,9 @@ class TYtIODiscoveryTransformer : public TGraphTransformerBase {
}

TStatus RewriteWalkFoldersOnAsyncOrEvalChanges(TExprNode::TPtr& output, TExprContext& ctx) {
Y_ENSURE(!PendingWalkFolders_.empty());

const auto currImplKey = PendingWalkFolders_.begin()->first;
TStatus walkFoldersStatus = IGraphTransformer::TStatus::Ok;

auto status = VisitInputKeys(output, ctx, [this, &ctx, currImplKey, &walkFoldersStatus] (TYtRead readNode, TYtInputKeys&& keys) -> TExprNode::TPtr {
auto status = VisitInputKeys(output, ctx, [this, &ctx, &walkFoldersStatus] (TYtRead readNode, TYtInputKeys&& keys) -> TExprNode::TPtr {
if (keys.GetType() == TYtKey::EType::WalkFoldersImpl) {
YQL_CLOG(INFO, ProviderYt) << "YtIODiscovery - DoApplyAsyncChanges WalkFoldersImpl handling start";

Expand All @@ -699,15 +712,13 @@ class TYtIODiscoveryTransformer : public TGraphTransformerBase {
YQL_CLOG(INFO, ProviderYt) << "Failed to parse WalkFolderImpl args";
return {};
}

const auto instanceKey = parsedKey.GetWalkFolderImplArgs()->StateKey;
if (instanceKey != currImplKey) {
const ui64 instanceKey = parsedKey.GetWalkFolderImplArgs()->StateKey;
if (*PendingWalkFoldersKeys_.begin() != instanceKey) {
return readNode.Ptr();
}

auto walkFoldersInstanceIt = PendingWalkFolders_.find(currImplKey);
YQL_ENSURE(!walkFoldersInstanceIt.IsEnd(), "Failed to find walkFoldersInstance with key: " << instanceKey);

auto walkFoldersInstanceIt = this->State_->WalkFoldersState.find(instanceKey);
YQL_ENSURE(!walkFoldersInstanceIt.IsEnd());
auto& walkFoldersImpl = walkFoldersInstanceIt->second;

Y_ENSURE(walkFoldersImpl.GetAnyOpFuture().HasValue(),
Expand All @@ -722,7 +733,8 @@ class TYtIODiscoveryTransformer : public TGraphTransformerBase {

if (walkFoldersImpl.IsFinished()) {
YQL_CLOG(INFO, ProviderYt) << "Building result expr for WalkFolders with key: " << instanceKey;
PendingWalkFolders_.erase(currImplKey);
this->State_->WalkFoldersState.erase(instanceKey);
PendingWalkFoldersKeys_.erase(instanceKey);

auto type = Build<TCoStructType>(ctx, readNode.Pos())
.Add<TExprList>()
Expand Down Expand Up @@ -854,12 +866,21 @@ class TYtIODiscoveryTransformer : public TGraphTransformerBase {
}
return res;
}


TWalkFoldersImpl& GetCurrentWalkFoldersInstance() const {
Y_ENSURE(!PendingWalkFoldersKeys_.empty());
const auto key = PendingWalkFoldersKeys_.begin();
auto stateIt = State_->WalkFoldersState.find(*key);
YQL_ENSURE(stateIt != State_->WalkFoldersState.end());
return stateIt->second;
}

TMaybe<NThreading::TFuture<void>> MaybeGetWalkFoldersFuture() const {
// inflight 1
return !PendingWalkFolders_.empty()
? MakeMaybe(PendingWalkFolders_.begin()->second.GetAnyOpFuture())
: Nothing();
if (!PendingWalkFoldersKeys_.empty()) {
return GetCurrentWalkFoldersInstance().GetAnyOpFuture();
}
return Nothing();
}

private:
Expand All @@ -868,7 +889,7 @@ class TYtIODiscoveryTransformer : public TGraphTransformerBase {
THashMap<std::pair<TString, TYtKey::TRange>, std::pair<TPosition, NThreading::TFuture<IYtGateway::TTableRangeResult>>> PendingRanges_;
THashMap<std::pair<TString, TYtKey::TFolderList>, std::pair<TPosition, NThreading::TFuture<IYtGateway::TFolderResult>>> PendingFolders_;
THashMap<std::pair<TString, TString>, size_t> PendingCanonizations_; // cluster, original table path -> positions in canon result
THashMap<ui64, TWalkFoldersImpl> PendingWalkFolders_;
TSet<ui64> PendingWalkFoldersKeys_;
NThreading::TFuture<IYtGateway::TCanonizePathsResult> CanonizeFuture_;
NThreading::TFuture<void> CanonizationRangesFoldersFuture_;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#pragma once

#include <ydb/library/yql/providers/yt/provider/yql_yt_gateway.h>
#include <ydb/library/yql/providers/yt/provider/yql_yt_key.h>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,7 @@ void TYtState::Reset() {
AnonymousLabels.clear();
NodeHash.clear();
Checkpoints.clear();
WalkFoldersState.clear();
NextEpochId = 1;
}

Expand Down
3 changes: 3 additions & 0 deletions ydb/library/yql/providers/yt/provider/yql_yt_provider.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "yql_yt_gateway.h"
#include "yql_yt_table_desc.h"
#include "yql_yt_table.h"
#include "yql_yt_io_discovery_walk_folders.h"

#include <ydb/library/yql/providers/yt/common/yql_yt_settings.h>
#include <ydb/library/yql/providers/yt/lib/row_spec/yql_row_spec.h>
Expand Down Expand Up @@ -108,6 +109,8 @@ struct TYtState : public TThrRefBase {
TDuration TimeSpentInHybrid;
NMonotonic::TMonotonic HybridStartTime;
std::unordered_set<ui32> HybridInFlightOprations;
THashMap<ui64, TWalkFoldersImpl> WalkFoldersState;

private:
std::unordered_map<ui64, TYtVersionedConfiguration::TState> ConfigurationEvalStates_;
std::unordered_map<ui64, ui32> EpochEvalStates_;
Expand Down