Skip to content

Commit 8878688

Browse files
committed
AS has been moved to consturctor of s3 lister
1 parent 4ae928f commit 8878688

File tree

11 files changed

+37
-27
lines changed

11 files changed

+37
-27
lines changed

ydb/core/external_sources/object_storage.cpp

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,6 @@ struct TObjectStorageExternalSource : public IExternalSource {
277277
};
278278

279279
virtual NThreading::TFuture<std::shared_ptr<TMetadata>> LoadDynamicMetadata(std::shared_ptr<TMetadata> meta) override {
280-
Y_UNUSED(ActorSystem);
281280
auto format = meta->Attributes.FindPtr("format");
282281
if (!format || !meta->Attributes.contains("withinfer")) {
283282
return NThreading::MakeFuture(std::move(meta));
@@ -320,7 +319,7 @@ struct TObjectStorageExternalSource : public IExternalSource {
320319
.Url = meta->DataSourceLocation,
321320
.Credentials = credentials,
322321
.Pattern = effectiveFilePattern,
323-
}, Nothing(), false);
322+
}, Nothing(), false, ActorSystem);
324323
auto afterListing = s3Lister->Next().Apply([path = effectiveFilePattern](const NThreading::TFuture<NYql::NS3Lister::TListResult>& listResFut) {
325324
auto& listRes = listResFut.GetValue();
326325
if (std::holds_alternative<NYql::NS3Lister::TListError>(listRes)) {

ydb/core/fq/libs/actors/run_actor.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1940,7 +1940,7 @@ class TRunActor : public NActors::TActorBootstrapped<TRunActor> {
19401940

19411941
{
19421942
dataProvidersInit.push_back(GetS3DataProviderInitializer(Params.S3Gateway, Params.CredentialsFactory,
1943-
Params.Config.GetReadActorsFactoryConfig().HasS3ReadActorFactoryConfig() ? Params.Config.GetReadActorsFactoryConfig().GetS3ReadActorFactoryConfig().GetAllowLocalFiles() : Params.Config.GetGateways().GetS3().GetAllowLocalFiles())); // This part is for backward compatibility. TODO: remove this part after migration to TS3GatewayConfig
1943+
Params.Config.GetReadActorsFactoryConfig().HasS3ReadActorFactoryConfig() ? Params.Config.GetReadActorsFactoryConfig().GetS3ReadActorFactoryConfig().GetAllowLocalFiles() : Params.Config.GetGateways().GetS3().GetAllowLocalFiles(), NActors::TActivationContext::ActorSystem())); // This part is for backward compatibility. TODO: remove this part after migration to TS3GatewayConfig
19441944
}
19451945

19461946
{

ydb/core/kqp/host/kqp_host.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1698,6 +1698,7 @@ class TKqpHost : public IKqpHost {
16981698
state->Gateway = FederatedQuerySetup->HttpGateway;
16991699
state->GatewayRetryPolicy = NYql::GetHTTPDefaultRetryPolicy(NYql::THttpRetryPolicyOptions{.RetriedCurlCodes = NYql::FqRetriedCurlCodes()});
17001700
state->ExecutorPoolId = AppData()->UserPoolId;
1701+
state->ActorSystem = ActorSystem;
17011702

17021703
auto dataSource = NYql::CreateS3DataSource(state);
17031704
auto dataSink = NYql::CreateS3DataSink(state);

ydb/library/yql/providers/s3/actors/yql_s3_source_queue.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -500,7 +500,8 @@ class TS3FileQueueActor : public NActors::TActorBootstrapped<TS3FileQueueActor>
500500
PatternType,
501501
object.GetPath()},
502502
Nothing(),
503-
false);
503+
false,
504+
NActors::TActivationContext::ActorSystem());
504505
Fetch();
505506
return true;
506507
}

ydb/library/yql/providers/s3/object_listers/yql_s3_list.cpp

Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,7 @@ class TS3Lister : public IS3Lister {
239239
const TMaybe<TString> ContinuationToken;
240240
const ui64 MaxKeys;
241241
const std::pair<TString, TString> CurrentLogContextPath;
242+
const NActors::TActorSystem* ActorSystem;
242243
};
243244

244245
TS3Lister(
@@ -247,7 +248,8 @@ class TS3Lister : public IS3Lister {
247248
const TListingRequest& listingRequest,
248249
const TMaybe<TString>& delimiter,
249250
size_t maxFilesPerQuery,
250-
TSharedListingContextPtr sharedCtx)
251+
TSharedListingContextPtr sharedCtx,
252+
NActors::TActorSystem* actorSystem)
251253
: MaxFilesPerQuery(maxFilesPerQuery) {
252254
Y_ENSURE(
253255
listingRequest.Url.substr(0, 7) != "file://",
@@ -272,7 +274,8 @@ class TS3Lister : public IS3Lister {
272274
delimiter,
273275
Nothing(),
274276
MaxFilesPerQuery,
275-
NLog::CurrentLogContextPath()};
277+
NLog::CurrentLogContextPath(),
278+
actorSystem};
276279

277280
YQL_CLOG(TRACE, ProviderS3)
278281
<< "[TS3Lister] Got URL: '" << ctx.ListingRequest.Url
@@ -338,14 +341,10 @@ class TS3Lister : public IS3Lister {
338341
retryPolicy);
339342
}
340343

341-
static NActors::TActorSystem* GetActorSystem() {
342-
return NActors::TlsActivationContext ? NActors::TActivationContext::ActorSystem() : nullptr;
343-
}
344-
345344
static IHTTPGateway::TOnResult CallbackFactoryMethod(TListingContext&& listingContext) {
346-
return [c = std::move(listingContext), actorSystem = GetActorSystem()](IHTTPGateway::TResult&& result) {
347-
if (actorSystem) {
348-
NDq::TYqlLogScope logScope(actorSystem, NKikimrServices::KQP_YQL, c.CurrentLogContextPath.first, c.CurrentLogContextPath.second);
345+
return [c = std::move(listingContext)](IHTTPGateway::TResult&& result) {
346+
if (c.ActorSystem) {
347+
NDq::TYqlLogScope logScope(c.ActorSystem, NKikimrServices::KQP_YQL, c.CurrentLogContextPath.first, c.CurrentLogContextPath.second);
349348
OnDiscovery(c, std::move(result));
350349
} else {
351350
/*
@@ -468,9 +467,10 @@ class TS3ParallelLimitedListerFactory : public IS3ListerFactory {
468467
using TPtr = std::shared_ptr<TS3ParallelLimitedListerFactory>;
469468

470469
explicit TS3ParallelLimitedListerFactory(
471-
size_t maxParallelOps, TSharedListingContextPtr sharedCtx)
470+
size_t maxParallelOps, TSharedListingContextPtr sharedCtx, NActors::TActorSystem* actorSystem)
472471
: SharedCtx(std::move(sharedCtx))
473-
, Semaphore(TAsyncSemaphore::Make(std::max<size_t>(1, maxParallelOps))) { }
472+
, Semaphore(TAsyncSemaphore::Make(std::max<size_t>(1, maxParallelOps)))
473+
, ActorSystem(actorSystem) { }
474474

475475
TFuture<NS3Lister::IS3Lister::TPtr> Make(
476476
const IHTTPGateway::TPtr& httpGateway,
@@ -480,10 +480,10 @@ class TS3ParallelLimitedListerFactory : public IS3ListerFactory {
480480
bool allowLocalFiles) override {
481481
auto acquired = Semaphore->AcquireAsync();
482482
return acquired.Apply(
483-
[ctx = SharedCtx, httpGateway, retryPolicy, listingRequest, delimiter, allowLocalFiles](const auto& f) {
483+
[ctx = SharedCtx, httpGateway, retryPolicy, listingRequest, delimiter, allowLocalFiles, actorSystem = ActorSystem](const auto& f) {
484484
return std::shared_ptr<NS3Lister::IS3Lister>(new TListerLockReleaseWrapper{
485485
NS3Lister::MakeS3Lister(
486-
httpGateway, retryPolicy, listingRequest, delimiter, allowLocalFiles, ctx),
486+
httpGateway, retryPolicy, listingRequest, delimiter, allowLocalFiles, actorSystem, ctx),
487487
std::make_unique<TAsyncSemaphore::TAutoRelease>(
488488
f.GetValue()->MakeAutoRelease())});
489489
});
@@ -519,6 +519,7 @@ class TS3ParallelLimitedListerFactory : public IS3ListerFactory {
519519
private:
520520
TSharedListingContextPtr SharedCtx;
521521
const TAsyncSemaphore::TPtr Semaphore;
522+
NActors::TActorSystem* ActorSystem;
522523
};
523524

524525
} // namespace
@@ -529,10 +530,11 @@ IS3Lister::TPtr MakeS3Lister(
529530
const TListingRequest& listingRequest,
530531
const TMaybe<TString>& delimiter,
531532
bool allowLocalFiles,
533+
NActors::TActorSystem* actorSystem,
532534
TSharedListingContextPtr sharedCtx) {
533535
if (listingRequest.Url.substr(0, 7) != "file://") {
534536
return std::make_shared<TS3Lister>(
535-
httpGateway, retryPolicy, listingRequest, delimiter, 1000, std::move(sharedCtx));
537+
httpGateway, retryPolicy, listingRequest, delimiter, 1000, std::move(sharedCtx), actorSystem);
536538
}
537539

538540
if (!allowLocalFiles) {
@@ -546,13 +548,14 @@ IS3ListerFactory::TPtr MakeS3ListerFactory(
546548
size_t maxParallelOps,
547549
size_t callbackThreadCount,
548550
size_t callbackPerThreadQueueSize,
549-
size_t regexpCacheSize) {
551+
size_t regexpCacheSize,
552+
NActors::TActorSystem* actorSystem) {
550553
std::shared_ptr<TSharedListingContext> sharedCtx = nullptr;
551554
if (callbackThreadCount != 0 || regexpCacheSize != 0) {
552555
sharedCtx = std::make_shared<TSharedListingContext>(
553556
callbackThreadCount, callbackPerThreadQueueSize, regexpCacheSize);
554557
}
555-
return std::make_shared<TS3ParallelLimitedListerFactory>(maxParallelOps, sharedCtx);
558+
return std::make_shared<TS3ParallelLimitedListerFactory>(maxParallelOps, sharedCtx, actorSystem);
556559
}
557560

558561
} // namespace NYql::NS3Lister

ydb/library/yql/providers/s3/object_listers/yql_s3_list.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#include <library/cpp/cache/cache.h>
44
#include <library/cpp/threading/future/future.h>
55
#include <util/thread/pool.h>
6+
#include <ydb/library/actors/core/actorsystem.h>
67
#include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h>
78
#include <ydb/library/yql/providers/s3/credentials/credentials.h>
89

@@ -169,6 +170,7 @@ IS3Lister::TPtr MakeS3Lister(
169170
const TListingRequest& listingRequest,
170171
const TMaybe<TString>& delimiter,
171172
bool allowLocalFiles,
173+
NActors::TActorSystem* actorSystem,
172174
TSharedListingContextPtr sharedCtx = nullptr);
173175

174176
class IS3ListerFactory {
@@ -189,7 +191,8 @@ IS3ListerFactory::TPtr MakeS3ListerFactory(
189191
size_t maxParallelOps,
190192
size_t callbackThreadCount,
191193
size_t callbackPerThreadQueueSize,
192-
size_t regexpCacheSize);
194+
size_t regexpCacheSize,
195+
NActors::TActorSystem* actorSystem);
193196

194197
} // namespace NS3Lister
195198
} // namespace NYql

ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,8 @@ class TS3IODiscoveryTransformer : public TGraphTransformerBase {
8383
State_->Configuration->MaxInflightListsPerQuery,
8484
State_->Configuration->ListingCallbackThreadCount,
8585
State_->Configuration->ListingCallbackPerThreadQueueSize,
86-
State_->Configuration->RegexpCacheSize))
86+
State_->Configuration->RegexpCacheSize,
87+
State_->ActorSystem))
8788
, ListingStrategy_(MakeS3ListingStrategy(
8889
State_->Gateway,
8990
State_->GatewayRetryPolicy,

ydb/library/yql/providers/s3/provider/yql_s3_provider.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@
44

55
namespace NYql {
66

7-
TDataProviderInitializer GetS3DataProviderInitializer(IHTTPGateway::TPtr gateway, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, bool allowLocalFiles) {
8-
return [gateway, credentialsFactory, allowLocalFiles] (
7+
TDataProviderInitializer GetS3DataProviderInitializer(IHTTPGateway::TPtr gateway, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, bool allowLocalFiles, NActors::TActorSystem* actorSystem) {
8+
return [gateway, credentialsFactory, allowLocalFiles, actorSystem] (
99
const TString& userName,
1010
const TString& sessionId,
1111
const TGatewaysConfig* gatewaysConfig,
@@ -31,6 +31,7 @@ TDataProviderInitializer GetS3DataProviderInitializer(IHTTPGateway::TPtr gateway
3131
state->Types = typeCtx.Get();
3232
state->FunctionRegistry = functionRegistry;
3333
state->CredentialsFactory = credentialsFactory;
34+
state->ActorSystem = actorSystem;
3435
if (gatewaysConfig) {
3536
state->Configuration->Init(gatewaysConfig->GetS3(), typeCtx);
3637
}

ydb/library/yql/providers/s3/provider/yql_s3_provider.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,10 @@ struct TS3State : public TThrRefBase
3232
IHTTPGateway::TRetryPolicy::TPtr GatewayRetryPolicy = GetHTTPDefaultRetryPolicy();
3333
ui32 ExecutorPoolId = 0;
3434
std::list<TVector<TString>> PrimaryKeys;
35+
NActors::TActorSystem* ActorSystem = nullptr;
3536
};
3637

37-
TDataProviderInitializer GetS3DataProviderInitializer(IHTTPGateway::TPtr gateway, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory = nullptr, bool allowLocalFiles = false);
38+
TDataProviderInitializer GetS3DataProviderInitializer(IHTTPGateway::TPtr gateway, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory = nullptr, bool allowLocalFiles = false, NActors::TActorSystem* actorSystem = nullptr);
3839

3940
TIntrusivePtr<IDataProvider> CreateS3DataSource(TS3State::TPtr state);
4041
TIntrusivePtr<IDataProvider> CreateS3DataSink(TS3State::TPtr state);

ydb/library/yql/tools/dqrun/dqrun.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -926,7 +926,7 @@ int RunMain(int argc, const char* argv[])
926926
if (!httpGateway) {
927927
httpGateway = IHTTPGateway::Make(gatewaysConfig.HasHttpGateway() ? &gatewaysConfig.GetHttpGateway() : nullptr);
928928
}
929-
dataProvidersInit.push_back(GetS3DataProviderInitializer(httpGateway, nullptr, true));
929+
dataProvidersInit.push_back(GetS3DataProviderInitializer(httpGateway, nullptr, true, actorSystemManager->GetActorSystem()));
930930
}
931931

932932
if (gatewaysConfig.HasPq()) {

ydb/tests/fq/s3/test_s3_1.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -482,7 +482,7 @@ def test_huge_source(self, kikimr, s3, client, runtime_listing, unique_prefix):
482482
# 1024 x 1024 x 10 = 10 MB of raw data + little overhead for header, eols etc
483483
assert sum(kikimr.control_plane.get_metering(1)) == 21
484484

485-
# it looks like the runtime_listing for v1 doesn't work in case of
485+
# it looks like the runtime_listing for v1 doesn't work in case of
486486
# restart of query because the v1 keeps the compiled query in the cache
487487
@yq_all
488488
@pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True)

0 commit comments

Comments
 (0)