Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 2 additions & 7 deletions ydb/core/fq/libs/actors/run_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -779,11 +779,7 @@ class TRunActor : public NActors::TActorBootstrapped<TRunActor> {
mkqlDefaultLimit = 8_GB;
}

// This part is for backward compatibility. TODO: remove this part after migration to TS3GatewayConfig
auto s3ReadDefaultInflightLimit = Params.Config.GetReadActorsFactoryConfig().GetS3ReadActorFactoryConfig().GetDataInflight();
if (s3ReadDefaultInflightLimit == 0) {
s3ReadDefaultInflightLimit = Params.Config.GetGateways().GetS3().GetDataInflight();
}
auto s3ReadDefaultInflightLimit = Params.Config.GetGateways().GetS3().GetDataInflight();
if (s3ReadDefaultInflightLimit == 0) {
s3ReadDefaultInflightLimit = 200_MB;
}
Expand Down Expand Up @@ -1970,8 +1966,7 @@ class TRunActor : public NActors::TActorBootstrapped<TRunActor> {
}

{
dataProvidersInit.push_back(GetS3DataProviderInitializer(Params.S3Gateway, Params.CredentialsFactory,
Params.Config.GetReadActorsFactoryConfig().HasS3ReadActorFactoryConfig() ? Params.Config.GetReadActorsFactoryConfig().GetS3ReadActorFactoryConfig().GetAllowLocalFiles() : Params.Config.GetGateways().GetS3().GetAllowLocalFiles(), NActors::TActivationContext::ActorSystem())); // This part is for backward compatibility. TODO: remove this part after migration to TS3GatewayConfig
dataProvidersInit.push_back(GetS3DataProviderInitializer(Params.S3Gateway, Params.CredentialsFactory, NActors::TActivationContext::ActorSystem()));
}

{
Expand Down
10 changes: 1 addition & 9 deletions ydb/core/fq/libs/config/protos/read_actors_factory.proto
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,11 @@ import "ydb/library/yql/providers/s3/proto/retry_config.proto";

////////////////////////////////////////////////////////////

message TS3ReadActorFactoryConfig {
NYql.NS3.TRetryConfig RetryConfig = 1;
uint64 RowsInBatch = 2; // Default = 1000
uint64 MaxInflight = 3; // Default = 20
uint64 DataInflight = 4; // Default = 200 MB
bool AllowLocalFiles = 5;
}

message TPqReadActorFactoryConfig {
bool CookieCommitMode = 1; // Turn off RangesMode setting in PQ read session.
}

message TReadActorsFactoryConfig {
TS3ReadActorFactoryConfig S3ReadActorFactoryConfig = 1;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Предлагаю сначала выпилить из всех конфигов, затем раскатать их на прод, потом уже удалять из прото

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Да, так и хочу сделать. Подготовил отделное ревью для удаления конфигов https://a.yandex-team.ru/review/6585395/details. А это будет висеть пока в проде не обновим.

reserved 1;
TPqReadActorFactoryConfig PqReadActorFactoryConfig = 2;
}
12 changes: 0 additions & 12 deletions ydb/core/fq/libs/init/init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -196,21 +196,9 @@ void Init(
auto s3ActorsFactory = NYql::NDq::CreateS3ActorsFactory();

if (protoConfig.GetPrivateApi().GetEnabled()) {
const auto& s3readConfig = protoConfig.GetReadActorsFactoryConfig().GetS3ReadActorFactoryConfig();
auto s3HttpRetryPolicy = NYql::GetHTTPDefaultRetryPolicy(NYql::THttpRetryPolicyOptions{.MaxTime = TDuration::Max(), .RetriedCurlCodes = NYql::FqRetriedCurlCodes()});
NYql::NDq::TS3ReadActorFactoryConfig readActorFactoryCfg = NYql::NDq::CreateReadActorFactoryConfig(protoConfig.GetGateways().GetS3());

// These fillings were left for the backward compatibility. TODO: remove this part after migration to TS3GatewayConfig
if (const ui64 rowsInBatch = s3readConfig.GetRowsInBatch()) {
readActorFactoryCfg.RowsInBatch = rowsInBatch;
}
if (const ui64 maxInflight = s3readConfig.GetMaxInflight()) {
readActorFactoryCfg.MaxInflight = maxInflight;
}
if (const ui64 dataInflight = s3readConfig.GetDataInflight()) {
readActorFactoryCfg.DataInflight = dataInflight;
}

RegisterDqInputTransformLookupActorFactory(*asyncIoFactory);
RegisterDqPqReadActorFactory(*asyncIoFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory, yqCounters->GetSubgroup("subsystem", "DqSourceTracker"));
RegisterYdbReadActorFactory(*asyncIoFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory);
Expand Down
5 changes: 2 additions & 3 deletions ydb/library/yql/providers/s3/provider/yql_s3_provider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@

namespace NYql {

TDataProviderInitializer GetS3DataProviderInitializer(IHTTPGateway::TPtr gateway, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, bool allowLocalFiles, NActors::TActorSystem* actorSystem) {
return [gateway, credentialsFactory, allowLocalFiles, actorSystem] (
TDataProviderInitializer GetS3DataProviderInitializer(IHTTPGateway::TPtr gateway, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, NActors::TActorSystem* actorSystem) {
return [gateway, credentialsFactory, actorSystem] (
const TString& userName,
const TString& sessionId,
const TGatewaysConfig* gatewaysConfig,
Expand Down Expand Up @@ -35,7 +35,6 @@ TDataProviderInitializer GetS3DataProviderInitializer(IHTTPGateway::TPtr gateway
if (gatewaysConfig) {
state->Configuration->Init(gatewaysConfig->GetS3(), typeCtx);
}
state->Configuration->AllowLocalFiles = allowLocalFiles;
state->Gateway = gateway;

TDataProviderInfo info;
Expand Down
2 changes: 1 addition & 1 deletion ydb/library/yql/providers/s3/provider/yql_s3_provider.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ struct TS3State : public TThrRefBase
NActors::TActorSystem* ActorSystem = nullptr;
};

TDataProviderInitializer GetS3DataProviderInitializer(IHTTPGateway::TPtr gateway, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory = nullptr, bool allowLocalFiles = false, NActors::TActorSystem* actorSystem = nullptr);
TDataProviderInitializer GetS3DataProviderInitializer(IHTTPGateway::TPtr gateway, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory = nullptr, NActors::TActorSystem* actorSystem = nullptr);

TIntrusivePtr<IDataProvider> CreateS3DataSource(TS3State::TPtr state);
TIntrusivePtr<IDataProvider> CreateS3DataSink(TS3State::TPtr state);
Expand Down
3 changes: 2 additions & 1 deletion ydb/library/yql/tools/dqrun/dqrun.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -949,13 +949,14 @@ int RunMain(int argc, const char* argv[])
}

if (gatewaysConfig.HasS3()) {
gatewaysConfig.MutableS3()->SetAllowLocalFiles(true);
for (auto& cluster: gatewaysConfig.GetS3().GetClusterMapping()) {
clusters.emplace(to_lower(cluster.GetName()), TString{S3ProviderName});
}
if (!httpGateway) {
httpGateway = IHTTPGateway::Make(gatewaysConfig.HasHttpGateway() ? &gatewaysConfig.GetHttpGateway() : nullptr);
}
dataProvidersInit.push_back(GetS3DataProviderInitializer(httpGateway, nullptr, true, nullptr));
dataProvidersInit.push_back(GetS3DataProviderInitializer(httpGateway, nullptr));
}

if (gatewaysConfig.HasPq()) {
Expand Down
2 changes: 1 addition & 1 deletion ydb/tests/fq/common/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
@pytest.fixture
def kikimr(request: pytest.FixtureRequest, yq_version: str):
kikimr_extensions = [
AddFormatSizeLimitExtension(),
AddInflightExtension(),
AddDataInflightExtension(),
AddFormatSizeLimitExtension(),
DefaultConfigExtension(''),
YQv2Extension(yq_version),
ComputeExtension(),
Expand Down
2 changes: 1 addition & 1 deletion ydb/tests/fq/plans/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ def is_s3_ready():
@pytest.fixture
def kikimr(request: pytest.FixtureRequest, s3: S3, yq_version: str, stats_mode: str):
kikimr_extensions = [
AddFormatSizeLimitExtension(),
AddInflightExtension(),
AddDataInflightExtension(),
AddFormatSizeLimitExtension(),
DefaultConfigExtension(s3.s3_url),
YQv2Extension(yq_version),
ComputeExtension(),
Expand Down
2 changes: 1 addition & 1 deletion ydb/tests/fq/restarts/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@ def stats_mode():
@pytest.fixture
def kikimr(request: pytest.FixtureRequest, s3: S3, yq_version: str, stats_mode: str):
kikimr_extensions = [
AddFormatSizeLimitExtension(),
AddInflightExtension(),
AddDataInflightExtension(),
AddFormatSizeLimitExtension(),
DefaultConfigExtension(s3.s3_url),
YQv2Extension(yq_version),
ComputeExtension(),
Expand Down
2 changes: 1 addition & 1 deletion ydb/tests/fq/s3/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,9 @@ def kikimr_params(request: pytest.FixtureRequest):

def get_kikimr_extensions(s3: S3, yq_version: str, kikimr_settings, mvp_external_ydb_endpoint):
return [
AddFormatSizeLimitExtension(),
AddInflightExtension(),
AddDataInflightExtension(),
AddFormatSizeLimitExtension(),
DefaultConfigExtension(s3.s3_url),
YQv2Extension(yq_version, kikimr_settings.get("is_replace_if_exists", False)),
ComputeExtension(),
Expand Down
8 changes: 0 additions & 8 deletions ydb/tests/tools/fq_runner/kikimr_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -520,14 +520,6 @@ def fill_config(self, control_plane):
fq_config['quotas_manager'] = {'enabled': True}
self.fill_rate_limiter_config(fq_config['rate_limiter'], "RateLimiter_" + self.uuid)

fq_config['read_actors_factory_config'] = {
's3_read_actor_factory_config': {
'retry_config': {
'max_retry_time_ms': 3000
}
}
}


class TenantType(Enum):
YQ = 1
Expand Down
6 changes: 2 additions & 4 deletions ydb/tests/tools/fq_runner/kikimr_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,7 @@ def is_applicable(self, request):

def apply_to_kikimr(self, request, kikimr):
kikimr.inflight = request.param["inflight"]
kikimr.compute_plane.fq_config['read_actors_factory_config']['s3_read_actor_factory_config'][
'max_inflight'] = kikimr.inflight
kikimr.compute_plane.fq_config['gateways']['s3']['max_inflight'] = kikimr.inflight
del request.param["inflight"]


Expand All @@ -59,8 +58,7 @@ def is_applicable(self, request):

def apply_to_kikimr(self, request, kikimr):
kikimr.data_inflight = request.param["data_inflight"]
kikimr.compute_plane.fq_config['read_actors_factory_config']['s3_read_actor_factory_config'][
'data_inflight'] = kikimr.data_inflight
kikimr.compute_plane.fq_config['gateways']['s3']['data_inflight'] = kikimr.data_inflight
del request.param["data_inflight"]


Expand Down