Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion ydb/core/fq/libs/actors/run_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -779,7 +779,11 @@ class TRunActor : public NActors::TActorBootstrapped<TRunActor> {
mkqlDefaultLimit = 8_GB;
}

// This part is for backward compatibility. TODO: remove this part after migration to TS3GatewayConfig
auto s3ReadDefaultInflightLimit = Params.Config.GetReadActorsFactoryConfig().GetS3ReadActorFactoryConfig().GetDataInflight();
if (s3ReadDefaultInflightLimit == 0) {
s3ReadDefaultInflightLimit = Params.Config.GetGateways().GetS3().GetDataInflight();
}
if (s3ReadDefaultInflightLimit == 0) {
s3ReadDefaultInflightLimit = 200_MB;
}
Expand Down Expand Up @@ -1936,7 +1940,7 @@ class TRunActor : public NActors::TActorBootstrapped<TRunActor> {

{
dataProvidersInit.push_back(GetS3DataProviderInitializer(Params.S3Gateway, Params.CredentialsFactory,
Params.Config.GetReadActorsFactoryConfig().GetS3ReadActorFactoryConfig().GetAllowLocalFiles()));
Params.Config.GetReadActorsFactoryConfig().HasS3ReadActorFactoryConfig() ? Params.Config.GetReadActorsFactoryConfig().GetS3ReadActorFactoryConfig().GetAllowLocalFiles() : Params.Config.GetGateways().GetS3().GetAllowLocalFiles())); // This part is for backward compatibility. TODO: remove this part after migration to TS3GatewayConfig
}

{
Expand Down
21 changes: 5 additions & 16 deletions ydb/core/fq/libs/init/init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,9 @@ void Init(
if (protoConfig.GetPrivateApi().GetEnabled()) {
const auto& s3readConfig = protoConfig.GetReadActorsFactoryConfig().GetS3ReadActorFactoryConfig();
auto s3HttpRetryPolicy = NYql::GetHTTPDefaultRetryPolicy(NYql::THttpRetryPolicyOptions{.MaxTime = TDuration::Max(), .RetriedCurlCodes = NYql::FqRetriedCurlCodes()});
NYql::NDq::TS3ReadActorFactoryConfig readActorFactoryCfg;
NYql::NDq::TS3ReadActorFactoryConfig readActorFactoryCfg = NYql::NDq::CreateReadActorFactoryConfig(protoConfig.GetGateways().GetS3());

// These fillings were left for the backward compatibility. TODO: remove this part after migration to TS3GatewayConfig
if (const ui64 rowsInBatch = s3readConfig.GetRowsInBatch()) {
readActorFactoryCfg.RowsInBatch = rowsInBatch;
}
Expand All @@ -208,22 +210,9 @@ void Init(
if (const ui64 dataInflight = s3readConfig.GetDataInflight()) {
readActorFactoryCfg.DataInflight = dataInflight;
}
for (auto& formatSizeLimit: protoConfig.GetGateways().GetS3().GetFormatSizeLimit()) {
if (formatSizeLimit.GetName()) { // ignore unnamed limits
readActorFactoryCfg.FormatSizeLimits.emplace(
formatSizeLimit.GetName(), formatSizeLimit.GetFileSizeLimit());
}
}
if (protoConfig.GetGateways().GetS3().HasFileSizeLimit()) {
readActorFactoryCfg.FileSizeLimit =
protoConfig.GetGateways().GetS3().GetFileSizeLimit();
}
if (protoConfig.GetGateways().GetS3().HasBlockFileSizeLimit()) {
readActorFactoryCfg.BlockFileSizeLimit =
protoConfig.GetGateways().GetS3().GetBlockFileSizeLimit();
}

RegisterDqInputTransformLookupActorFactory(*asyncIoFactory);
RegisterDqPqReadActorFactory(*asyncIoFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory);
RegisterDqPqReadActorFactory(*asyncIoFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory, yqCounters->GetSubgroup("subsystem", "DqSourceTracker"));
RegisterYdbReadActorFactory(*asyncIoFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory);

s3ActorsFactory->RegisterS3ReadActorFactory(*asyncIoFactory, credentialsFactory, httpGateway, s3HttpRetryPolicy, readActorFactoryCfg,
Expand Down
5 changes: 5 additions & 0 deletions ydb/core/grpc_services/ydb_over_fq/execute_data_query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,11 @@ class ExecuteDataQueryRPC
)

void HandleResultSets(const TString& queryId, const TActorContext& ctx) {
if (ResultSetSizes_.empty()) {
SendReply(ctx);
return;
}

Become(&ExecuteDataQueryRPC::GatherResultSetsState);
QueryId_ = queryId;
MakeLocalCall(CreateResultSetRequest(queryId, 0, 0), ctx);
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/compute_actor/kqp_compute_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ NYql::NDq::IDqAsyncIoFactory::TPtr CreateKqpAsyncIoFactory(

if (federatedQuerySetup) {
auto s3HttpRetryPolicy = NYql::GetHTTPDefaultRetryPolicy(NYql::THttpRetryPolicyOptions{.RetriedCurlCodes = NYql::FqRetriedCurlCodes()});
s3ActorsFactory->RegisterS3ReadActorFactory(*factory, federatedQuerySetup->CredentialsFactory, federatedQuerySetup->HttpGateway, s3HttpRetryPolicy);
s3ActorsFactory->RegisterS3ReadActorFactory(*factory, federatedQuerySetup->CredentialsFactory, federatedQuerySetup->HttpGateway, s3HttpRetryPolicy, federatedQuerySetup->S3ReadActorFactoryConfig);
s3ActorsFactory->RegisterS3WriteActorFactory(*factory, federatedQuerySetup->CredentialsFactory, federatedQuerySetup->HttpGateway, s3HttpRetryPolicy);

if (federatedQuerySetup->ConnectorClient) {
Expand Down
5 changes: 4 additions & 1 deletion ydb/core/kqp/federated_query/kqp_federated_query_helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ namespace NKikimr::NKqp {

S3GatewayConfig = queryServiceConfig.GetS3();

S3ReadActorFactoryConfig = NYql::NDq::CreateReadActorFactoryConfig(S3GatewayConfig);

YtGatewayConfig = queryServiceConfig.GetYt();
YtGateway = MakeYtGateway(appData->FunctionRegistry, queryServiceConfig);

Expand Down Expand Up @@ -127,7 +129,8 @@ namespace NKikimr::NKqp {
GenericGatewaysConfig,
YtGatewayConfig,
YtGateway,
nullptr};
nullptr,
S3ReadActorFactoryConfig};

// Init DatabaseAsyncResolver only if all requirements are met
if (DatabaseResolverActorId && MdbEndpointGenerator &&
Expand Down
6 changes: 5 additions & 1 deletion ydb/core/kqp/federated_query/kqp_federated_query_helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h>
#include <ydb/library/yql/providers/common/token_accessor/client/factory.h>
#include <ydb/library/yql/providers/generic/connector/libcpp/client.h>
#include <ydb/library/yql/providers/s3/actors_factory/yql_s3_actors_factory.h>
#include <ydb/library/yql/providers/yt/provider/yql_yt_gateway.h>

namespace NKikimrConfig {
Expand All @@ -30,6 +31,7 @@ namespace NKikimr::NKqp {
NYql::TYtGatewayConfig YtGatewayConfig;
NYql::IYtGateway::TPtr YtGateway;
NMiniKQL::TComputationNodeFactory ComputationFactory;
NYql::NDq::TS3ReadActorFactoryConfig S3ReadActorFactoryConfig;
};

struct IKqpFederatedQuerySetupFactory {
Expand Down Expand Up @@ -65,6 +67,7 @@ namespace NKikimr::NKqp {
NYql::NConnector::IClient::TPtr ConnectorClient;
std::optional<NActors::TActorId> DatabaseResolverActorId;
NYql::IMdbEndpointGenerator::TPtr MdbEndpointGenerator;
NYql::NDq::TS3ReadActorFactoryConfig S3ReadActorFactoryConfig;
};

struct TKqpFederatedQuerySetupFactoryMock: public IKqpFederatedQuerySetupFactory {
Expand Down Expand Up @@ -94,7 +97,7 @@ namespace NKikimr::NKqp {

std::optional<TKqpFederatedQuerySetup> Make(NActors::TActorSystem*) override {
return TKqpFederatedQuerySetup{
HttpGateway, ConnectorClient, CredentialsFactory, DatabaseAsyncResolver, S3GatewayConfig, GenericGatewayConfig, YtGatewayConfig, YtGateway, ComputationFactories};
HttpGateway, ConnectorClient, CredentialsFactory, DatabaseAsyncResolver, S3GatewayConfig, GenericGatewayConfig, YtGatewayConfig, YtGateway, ComputationFactories, S3ReadActorFactoryConfig};
}

private:
Expand All @@ -107,6 +110,7 @@ namespace NKikimr::NKqp {
NYql::TYtGatewayConfig YtGatewayConfig;
NYql::IYtGateway::TPtr YtGateway;
NMiniKQL::TComputationNodeFactory ComputationFactories;
NYql::NDq::TS3ReadActorFactoryConfig S3ReadActorFactoryConfig;
};

IKqpFederatedQuerySetupFactory::TPtr MakeKqpFederatedQuerySetupFactory(
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ TIntrusivePtr<IKqpHost> CreateKikimrQueryProcessor(TIntrusivePtr<IKqpGateway> ga
UNIT_ASSERT(TryParseFromTextFormat(defaultSettingsStream, defaultSettings));
kikimrConfig->Init(defaultSettings.GetDefaultSettings(), cluster, settings, true);

auto federatedQuerySetup = std::make_optional<TKqpFederatedQuerySetup>({NYql::IHTTPGateway::Make(), nullptr, nullptr, nullptr, {}, {}, {}, nullptr, nullptr});
auto federatedQuerySetup = std::make_optional<TKqpFederatedQuerySetup>({NYql::IHTTPGateway::Make(), nullptr, nullptr, nullptr, {}, {}, {}, nullptr, nullptr, {}});
return NKqp::CreateKqpHost(gateway, cluster, "/Root", kikimrConfig, moduleResolver,
federatedQuerySetup, nullptr, nullptr, {}, funcRegistry, funcRegistry, keepConfigChanges, nullptr, actorSystem);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,10 @@ message TS3GatewayConfig {
optional uint64 RegexpCacheSize = 14;
optional uint64 GeneratorPathsLimit = 15;
optional uint64 MaxListingResultSizePerPartition = 16;
optional uint64 RowsInBatch = 17; // Default = 1000
optional uint64 MaxInflight = 18; // Default = 20
optional uint64 DataInflight = 19; // Default = 200 MB
optional bool AllowLocalFiles = 20;

repeated TAttr DefaultSettings = 100;
}
Expand Down
45 changes: 41 additions & 4 deletions ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,30 @@ struct TEvPrivate {
} // namespace

class TDqPqReadActor : public NActors::TActor<TDqPqReadActor>, public IDqComputeActorAsyncInput {
struct TMetrics {
TMetrics(const TTxId& txId, ui64 taskId, const ::NMonitoring::TDynamicCounterPtr& counters)
: TxId(std::visit([](auto arg) { return ToString(arg); }, txId))
, Counters(counters) {
SubGroup = Counters->GetSubgroup("sink", "PqRead");
auto sink = SubGroup->GetSubgroup("tx_id", TxId);
auto task = sink->GetSubgroup("task_id", ToString(taskId));
InFlyAsyncInputData = task->GetCounter("InFlyAsyncInputData");
InFlySubscribe = task->GetCounter("InFlySubscribe");
AsyncInputDataRate = task->GetCounter("AsyncInputDataRate", true);
}

~TMetrics() {
SubGroup->RemoveSubgroup("id", TxId);
}

TString TxId;
::NMonitoring::TDynamicCounterPtr Counters;
::NMonitoring::TDynamicCounterPtr SubGroup;
::NMonitoring::TDynamicCounters::TCounterPtr InFlyAsyncInputData;
::NMonitoring::TDynamicCounters::TCounterPtr InFlySubscribe;
::NMonitoring::TDynamicCounters::TCounterPtr AsyncInputDataRate;
};

public:
using TPartitionKey = std::pair<TString, ui64>; // Cluster, partition id.
using TDebugOffsets = TMaybe<std::pair<ui64, ui64>>;
Expand All @@ -100,10 +124,12 @@ class TDqPqReadActor : public NActors::TActor<TDqPqReadActor>, public IDqCompute
NYdb::TDriver driver,
std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory,
const NActors::TActorId& computeActorId,
const ::NMonitoring::TDynamicCounterPtr& counters,
i64 bufferSize)
: TActor<TDqPqReadActor>(&TDqPqReadActor::StateFunc)
, InputIndex(inputIndex)
, TxId(txId)
, Metrics(txId, taskId, counters)
, BufferSize(bufferSize)
, HolderFactory(holderFactory)
, LogPrefix(TStringBuilder() << "SelfId: " << this->SelfId() << ", TxId: " << TxId << ", task: " << taskId << ". PQ source. ")
Expand Down Expand Up @@ -245,9 +271,14 @@ class TDqPqReadActor : public NActors::TActor<TDqPqReadActor>, public IDqCompute
hFunc(TEvPrivate::TEvSourceDataReady, Handle);
)

void Handle(TEvPrivate::TEvSourceDataReady::TPtr&) {
void Handle(TEvPrivate::TEvSourceDataReady::TPtr& ev) {
SRC_LOG_T("SessionId: " << GetSessionId() << " Source data ready");
SubscribedOnEvent = false;
if (ev.Get()->Cookie) {
Metrics.InFlySubscribe->Dec();
}
Metrics.InFlyAsyncInputData->Set(1);
Metrics.AsyncInputDataRate->Inc();
Send(ComputeActorId, new TEvNewAsyncInputDataArrived(InputIndex));
}

Expand Down Expand Up @@ -282,6 +313,7 @@ class TDqPqReadActor : public NActors::TActor<TDqPqReadActor>, public IDqCompute
}

i64 GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueBatch& buffer, TMaybe<TInstant>& watermark, bool&, i64 freeSpace) override {
Metrics.InFlyAsyncInputData->Set(0);
SRC_LOG_T("SessionId: " << GetSessionId() << " GetAsyncInputData freeSpace = " << freeSpace);

const auto now = TInstant::Now();
Expand Down Expand Up @@ -387,9 +419,10 @@ class TDqPqReadActor : public NActors::TActor<TDqPqReadActor>, public IDqCompute
void SubscribeOnNextEvent() {
if (!SubscribedOnEvent) {
SubscribedOnEvent = true;
Metrics.InFlySubscribe->Inc();
NActors::TActorSystem* actorSystem = NActors::TActivationContext::ActorSystem();
EventFuture = GetReadSession().WaitEvent().Subscribe([actorSystem, selfId = SelfId()](const auto&){
actorSystem->Send(selfId, new TEvPrivate::TEvSourceDataReady());
actorSystem->Send(selfId, new TEvPrivate::TEvSourceDataReady(), 0, 1);
});
}
}
Expand Down Expand Up @@ -595,6 +628,7 @@ class TDqPqReadActor : public NActors::TActor<TDqPqReadActor>, public IDqCompute
const ui64 InputIndex;
TDqAsyncStats IngressStats;
const TTxId TxId;
TMetrics Metrics;
const i64 BufferSize;
const THolderFactory& HolderFactory;
const TString LogPrefix;
Expand Down Expand Up @@ -629,6 +663,7 @@ std::pair<IDqComputeActorAsyncInput*, NActors::IActor*> CreateDqPqReadActor(
ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
const NActors::TActorId& computeActorId,
const NKikimr::NMiniKQL::THolderFactory& holderFactory,
const ::NMonitoring::TDynamicCounterPtr& counters,
i64 bufferSize
)
{
Expand All @@ -653,15 +688,16 @@ std::pair<IDqComputeActorAsyncInput*, NActors::IActor*> CreateDqPqReadActor(
std::move(driver),
CreateCredentialsProviderFactoryForStructuredToken(credentialsFactory, token, addBearerToToken),
computeActorId,
counters,
bufferSize
);

return {actor, actor};
}

void RegisterDqPqReadActorFactory(TDqAsyncIoFactory& factory, NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory) {
void RegisterDqPqReadActorFactory(TDqAsyncIoFactory& factory, NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, const ::NMonitoring::TDynamicCounterPtr& counters) {
factory.RegisterSource<NPq::NProto::TDqPqTopicSource>("PqSource",
[driver = std::move(driver), credentialsFactory = std::move(credentialsFactory)](
[driver = std::move(driver), credentialsFactory = std::move(credentialsFactory), counters](
NPq::NProto::TDqPqTopicSource&& settings,
IDqAsyncIoFactory::TSourceArguments&& args)
{
Expand All @@ -678,6 +714,7 @@ void RegisterDqPqReadActorFactory(TDqAsyncIoFactory& factory, NYdb::TDriver driv
credentialsFactory,
args.ComputeActorId,
args.HolderFactory,
counters,
PQReadDefaultFreeSpace);
});

Expand Down
3 changes: 2 additions & 1 deletion ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,10 @@ std::pair<IDqComputeActorAsyncInput*, NActors::IActor*> CreateDqPqReadActor(
ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
const NActors::TActorId& computeActorId,
const NKikimr::NMiniKQL::THolderFactory& holderFactory,
const ::NMonitoring::TDynamicCounterPtr& counters,
i64 bufferSize = PQReadDefaultFreeSpace
);

void RegisterDqPqReadActorFactory(TDqAsyncIoFactory& factory, NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory);
void RegisterDqPqReadActorFactory(TDqAsyncIoFactory& factory, NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, const ::NMonitoring::TDynamicCounterPtr& counters = MakeIntrusive<::NMonitoring::TDynamicCounters>());

} // namespace NYql::NDq
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ class TS3DecompressorCoroImpl : public TActorCoroImpl {
private:
bool nextImpl() final {
while (!Coro->InputFinished || !Coro->Requests.empty()) {
Coro->CpuTime += Coro->GetCpuTimeDelta();
Coro->ProcessOneEvent();
Coro->StartCycleCount = GetCycleCountFast();
if (Coro->InputBuffer) {
RawDataBuffer.swap(Coro->InputBuffer);
Coro->InputBuffer.clear();
Expand Down Expand Up @@ -65,6 +67,8 @@ class TS3DecompressorCoroImpl : public TActorCoroImpl {
}

void Run() final {
StartCycleCount = GetCycleCountFast();

try {
std::unique_ptr<NDB::ReadBuffer> coroBuffer = std::make_unique<TCoroReadBuffer>(this);
NDB::ReadBuffer* buffer = coroBuffer.get();
Expand All @@ -74,15 +78,15 @@ class TS3DecompressorCoroImpl : public TActorCoroImpl {
decompressorBuffer->nextIfAtEnd();
TString data{decompressorBuffer->available(), ' '};
decompressorBuffer->read(&data.front(), decompressorBuffer->available());
Send(Parent, new TEvS3Provider::TEvDecompressDataResult(std::move(data)));
Send(Parent, new TEvS3Provider::TEvDecompressDataResult(std::move(data), TakeCpuTimeDelta()));
}
} catch (const TDtorException&) {
// Stop any activity instantly
return;
} catch (...) {
Send(Parent, new TEvS3Provider::TEvDecompressDataResult(std::current_exception()));
Send(Parent, new TEvS3Provider::TEvDecompressDataResult(std::current_exception(), TakeCpuTimeDelta()));
}
Send(Parent, new TEvS3Provider::TEvDecompressDataFinish());
Send(Parent, new TEvS3Provider::TEvDecompressDataFinish(TakeCpuTimeDelta()));
}

void ProcessOneEvent() {
Expand All @@ -99,7 +103,19 @@ class TS3DecompressorCoroImpl : public TActorCoroImpl {
InputBuffer = std::move(event.Data);
}

TDuration GetCpuTimeDelta() {
return TDuration::Seconds(NHPTimer::GetSeconds(GetCycleCountFast() - StartCycleCount));
}

TDuration TakeCpuTimeDelta() {
auto currentCpuTime = CpuTime;
CpuTime = TDuration::Zero();
return currentCpuTime;
}

private:
TDuration CpuTime;
ui64 StartCycleCount = 0;
TString InputBuffer;
TString Compression;
TActorId Parent;
Expand Down
5 changes: 4 additions & 1 deletion ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -941,10 +941,13 @@ class TS3ReadCoroImpl : public TActorCoroImpl {
}

void Handle(TEvS3Provider::TEvDecompressDataResult::TPtr& ev) {
CpuTime += ev->Get()->CpuTime;
DeferredDecompressedDataParts.push(std::move(ev->Release()));

}

void Handle(TEvS3Provider::TEvDecompressDataFinish::TPtr&) {
void Handle(TEvS3Provider::TEvDecompressDataFinish::TPtr& ev) {
CpuTime += ev->Get()->CpuTime;
DecompressedInputFinished = true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,30 @@ namespace NYql::NDq {
std::shared_ptr<IS3ActorsFactory> CreateDefaultS3ActorsFactory() {
return std::make_shared<TDefaultS3ActorsFactory>();
}

TS3ReadActorFactoryConfig CreateReadActorFactoryConfig(const ::NYql::TS3GatewayConfig& s3Config) {
TS3ReadActorFactoryConfig s3ReadActoryConfig;
if (const ui64 rowsInBatch = s3Config.GetRowsInBatch()) {
s3ReadActoryConfig.RowsInBatch = rowsInBatch;
}
if (const ui64 maxInflight = s3Config.GetMaxInflight()) {
s3ReadActoryConfig.MaxInflight = maxInflight;
}
if (const ui64 dataInflight = s3Config.GetDataInflight()) {
s3ReadActoryConfig.DataInflight = dataInflight;
}
for (auto& formatSizeLimit: s3Config.GetFormatSizeLimit()) {
if (formatSizeLimit.GetName()) { // ignore unnamed limits
s3ReadActoryConfig.FormatSizeLimits.emplace(
formatSizeLimit.GetName(), formatSizeLimit.GetFileSizeLimit());
}
}
if (s3Config.HasFileSizeLimit()) {
s3ReadActoryConfig.FileSizeLimit = s3Config.GetFileSizeLimit();
}
if (s3Config.HasBlockFileSizeLimit()) {
s3ReadActoryConfig.BlockFileSizeLimit = s3Config.GetBlockFileSizeLimit();
}
return s3ReadActoryConfig;
}
}
Loading