Skip to content

YQ-4108 Add topic session thread num settings #14532

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 19 commits into from
Feb 26, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions ydb/core/fq/libs/actors/pending_fetcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ class TPendingFetcher : public NActors::TActorBootstrapped<TPendingFetcher> {
const TString& tenantName,
NActors::TMon* monitoring,
std::shared_ptr<NYql::NDq::IS3ActorsFactory> s3ActorsFactory,
NYql::IPqGateway::TPtr defaultPqGateway
NYql::IPqGatewayFactory::TPtr pqGatewayFactory
)
: YqSharedResources(yqSharedResources)
, CredentialsProviderFactory(credentialsProviderFactory)
Expand All @@ -181,7 +181,7 @@ class TPendingFetcher : public NActors::TActorBootstrapped<TPendingFetcher> {
, Monitoring(monitoring)
, ComputeConfig(config.GetCompute())
, S3ActorsFactory(std::move(s3ActorsFactory))
, DefaultPqGateway(std::move(defaultPqGateway))
, PqGatewayFactory(std::move(pqGatewayFactory))
{
Y_ENSURE(GetYqlDefaultModuleResolverWithContext(ModuleResolver));
}
Expand Down Expand Up @@ -475,7 +475,7 @@ class TPendingFetcher : public NActors::TActorBootstrapped<TPendingFetcher> {
std::map<TString, Ydb::TypedValue>(task.parameters().begin(), task.parameters().end()),
S3ActorsFactory,
ComputeConfig.GetWorkloadManagerConfig(task.scope()),
DefaultPqGateway
PqGatewayFactory
);

auto runActorId =
Expand Down Expand Up @@ -551,7 +551,7 @@ class TPendingFetcher : public NActors::TActorBootstrapped<TPendingFetcher> {
NActors::TMon* Monitoring;
TComputeConfig ComputeConfig;
std::shared_ptr<NYql::NDq::IS3ActorsFactory> S3ActorsFactory;
NYql::IPqGateway::TPtr DefaultPqGateway;
NYql::IPqGatewayFactory::TPtr PqGatewayFactory;
};


Expand All @@ -572,7 +572,7 @@ NActors::IActor* CreatePendingFetcher(
const TString& tenantName,
NActors::TMon* monitoring,
std::shared_ptr<NYql::NDq::IS3ActorsFactory> s3ActorsFactory,
NYql::IPqGateway::TPtr defaultPqGateway)
NYql::IPqGatewayFactory::TPtr pqGatewayFactory)
{
return new TPendingFetcher(
yqSharedResources,
Expand All @@ -591,7 +591,7 @@ NActors::IActor* CreatePendingFetcher(
tenantName,
monitoring,
std::move(s3ActorsFactory),
defaultPqGateway);
std::move(pqGatewayFactory));
}

TActorId MakePendingFetcherId(ui32 nodeId) {
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/fq/libs/actors/proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ NActors::IActor* CreatePendingFetcher(
const TString& tenantName,
NActors::TMon* monitoring,
std::shared_ptr<NYql::NDq::IS3ActorsFactory> s3ActorsFactory,
NYql::IPqGateway::TPtr defaultPqGateway
NYql::IPqGatewayFactory::TPtr pqGatewayFactory
);

NActors::IActor* CreateRunActor(
Expand Down
12 changes: 4 additions & 8 deletions ydb/core/fq/libs/actors/run_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,7 @@ class TRunActor : public NActors::TActorBootstrapped<TRunActor> {
SelfId(),
Params.QueryId,
Params.YqSharedResources->UserSpaceYdbDriver,
Params.PqGatewayFactory->CreatePqGateway(),
Params.Resources.topic_consumers(),
PrepareReadRuleCredentials()
)
Expand Down Expand Up @@ -1435,6 +1436,7 @@ class TRunActor : public NActors::TActorBootstrapped<TRunActor> {
SelfId(),
Params.QueryId,
Params.YqSharedResources->UserSpaceYdbDriver,
Params.PqGatewayFactory->CreatePqGateway(),
Params.Resources.topic_consumers(),
PrepareReadRuleCredentials()
)
Expand Down Expand Up @@ -1970,14 +1972,8 @@ class TRunActor : public NActors::TActorBootstrapped<TRunActor> {
}

{
NYql::TPqGatewayServices pqServices(
Params.YqSharedResources->UserSpaceYdbDriver,
Params.PqCmConnections,
Params.CredentialsFactory,
std::make_shared<NYql::TPqGatewayConfig>(gatewaysConfig.GetPq()),
Params.FunctionRegistry
);
const auto pqGateway = Params.DefaultPqGateway ? Params.DefaultPqGateway : NYql::CreatePqNativeGateway(pqServices);
auto pqGateway = Params.PqGatewayFactory->CreatePqGateway();
pqGateway->UpdateClusterConfigs(std::make_shared<NYql::TPqGatewayConfig>(gatewaysConfig.GetPq()));
dataProvidersInit.push_back(GetPqDataProviderInitializer(pqGateway, false, dbResolver));
}

Expand Down
4 changes: 2 additions & 2 deletions ydb/core/fq/libs/compute/common/run_actor_params.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ TRunActorParams::TRunActorParams(
std::map<TString, Ydb::TypedValue>&& queryParameters,
std::shared_ptr<NYql::NDq::IS3ActorsFactory> s3ActorsFactory,
const ::NFq::NConfig::TWorkloadManagerConfig& workloadManager,
NYql::IPqGateway::TPtr defaultPqGateway
NYql::IPqGatewayFactory::TPtr pqGatewayFactory
)
: YqSharedResources(yqSharedResources)
, CredentialsProviderFactory(credentialsProviderFactory)
Expand Down Expand Up @@ -118,7 +118,7 @@ TRunActorParams::TRunActorParams(
, QueryParameters(std::move(queryParameters))
, S3ActorsFactory(std::move(s3ActorsFactory))
, WorkloadManager(workloadManager)
, DefaultPqGateway(defaultPqGateway)
, PqGatewayFactory(std::move(pqGatewayFactory))
{
}

Expand Down
4 changes: 2 additions & 2 deletions ydb/core/fq/libs/compute/common/run_actor_params.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ struct TRunActorParams { // TODO2 : Change name
std::map<TString, Ydb::TypedValue>&& queryParameters,
std::shared_ptr<NYql::NDq::IS3ActorsFactory> s3ActorsFactory,
const ::NFq::NConfig::TWorkloadManagerConfig& workloadManager,
NYql::IPqGateway::TPtr defaultPqGateway
NYql::IPqGatewayFactory::TPtr pqGatewayFactory
);

TRunActorParams(const TRunActorParams& params) = default;
Expand Down Expand Up @@ -147,7 +147,7 @@ struct TRunActorParams { // TODO2 : Change name
std::map<TString, Ydb::TypedValue> QueryParameters;
std::shared_ptr<NYql::NDq::IS3ActorsFactory> S3ActorsFactory;
::NFq::NConfig::TWorkloadManagerConfig WorkloadManager;
NYql::IPqGateway::TPtr DefaultPqGateway;
NYql::IPqGatewayFactory::TPtr PqGatewayFactory;
};

} /* NFq */
2 changes: 2 additions & 0 deletions ydb/core/fq/libs/config/protos/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,6 @@ message TCommonConfig {
bool ShowQueryTimeline = 16;
uint64 MaxQueryTimelineSize = 17; // default: 200KB
string PqReconnectPeriod = 18; // default: disabled
uint32 TopicClientHandlersExecutorThreadsNum = 19; // default: 0 that means use default from TopicClientSettings (1)
uint32 TopicClientCompressionExecutorThreadsNum = 20; // default: 0 that means use default from TopicClientSettings (2)
}
40 changes: 33 additions & 7 deletions ydb/core/fq/libs/init/init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,17 @@ namespace NFq {

using namespace NKikimr;

NYdb::NTopic::TTopicClientSettings GetCommonTopicClientSettings(const NFq::NConfig::TCommonConfig& config) {
NYdb::NTopic::TTopicClientSettings settings;
if (config.GetTopicClientHandlersExecutorThreadsNum()) {
settings.DefaultHandlersExecutor(NYdb::NTopic::CreateThreadPoolExecutor(config.GetTopicClientHandlersExecutorThreadsNum()));
}
if (config.GetTopicClientCompressionExecutorThreadsNum()) {
settings.DefaultCompressionExecutor(NYdb::NTopic::CreateThreadPoolExecutor(config.GetTopicClientCompressionExecutorThreadsNum()));
}
return settings;
}

void Init(
const NFq::NConfig::TConfig& protoConfig,
ui32 nodeId,
Expand All @@ -67,7 +78,7 @@ void Init(
const std::function<IActor*(const NKikimrProto::NFolderService::TFolderServiceConfig& authConfig)>& folderServiceFactory,
ui32 icPort,
const std::vector<NKikimr::NMiniKQL::TComputationNodeFactory>& additionalCompNodeFactories,
NYql::IPqGateway::TPtr defaultPqGateway
NYql::IPqGatewayFactory::TPtr pqGatewayFactory
)
{
Y_ABORT_UNLESS(iyqSharedResources, "No YQ shared resources created");
Expand Down Expand Up @@ -190,22 +201,26 @@ void Init(
credentialsFactory = NYql::CreateSecuredServiceAccountCredentialsOverTokenAccessorFactory(tokenAccessorConfig.GetEndpoint(), tokenAccessorConfig.GetUseSsl(), caContent, tokenAccessorConfig.GetConnectionPoolSize());
}

auto commonTopicClientSettings = GetCommonTopicClientSettings(protoConfig.GetCommon());

if (protoConfig.GetRowDispatcher().GetEnabled()) {
NYql::TPqGatewayServices pqServices(
yqSharedResources->UserSpaceYdbDriver,
nullptr,
nullptr,
std::make_shared<NYql::TPqGatewayConfig>(),
nullptr);

nullptr,
nullptr,
commonTopicClientSettings
);
auto rowDispatcher = NFq::NewRowDispatcherService(
protoConfig.GetRowDispatcher(),
NKikimr::CreateYdbCredentialsProviderFactory,
yqSharedResources,
credentialsFactory,
tenant,
yqCounters->GetSubgroup("subsystem", "row_dispatcher"),
defaultPqGateway ? defaultPqGateway : CreatePqNativeGateway(pqServices),
pqGatewayFactory ? pqGatewayFactory->CreatePqGateway() : CreatePqNativeGateway(pqServices),
appData->Mon,
appData->Counters);
actorRegistrator(NFq::RowDispatcherServiceActorId(), rowDispatcher.release());
Expand All @@ -224,9 +239,11 @@ void Init(
pqCmConnections,
credentialsFactory,
std::make_shared<NYql::TPqGatewayConfig>(protoConfig.GetGateways().GetPq()),
appData->FunctionRegistry
appData->FunctionRegistry,
nullptr,
commonTopicClientSettings
);
auto pqGateway = defaultPqGateway ? defaultPqGateway : NYql::CreatePqNativeGateway(std::move(pqServices));
auto pqGateway = pqGatewayFactory ? pqGatewayFactory->CreatePqGateway() : NYql::CreatePqNativeGateway(std::move(pqServices));
RegisterDqPqReadActorFactory(*asyncIoFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory, pqGateway,
yqCounters->GetSubgroup("subsystem", "DqSourceTracker"), protoConfig.GetCommon().GetPqReconnectPeriod());

Expand Down Expand Up @@ -330,6 +347,15 @@ void Init(
}

if (protoConfig.GetPendingFetcher().GetEnabled()) {
NYql::TPqGatewayServices pqServices(
yqSharedResources->UserSpaceYdbDriver,
pqCmConnections,
credentialsFactory,
std::make_shared<NYql::TPqGatewayConfig>(protoConfig.GetGateways().GetPq()),
appData->FunctionRegistry,
nullptr,
commonTopicClientSettings
);
auto fetcher = CreatePendingFetcher(
yqSharedResources,
NKikimr::CreateYdbCredentialsProviderFactory,
Expand All @@ -347,7 +373,7 @@ void Init(
tenant,
appData->Mon,
s3ActorsFactory,
defaultPqGateway
pqGatewayFactory ? pqGatewayFactory : NYql::CreatePqNativeGatewayFactory(pqServices)
);

actorRegistrator(MakePendingFetcherId(nodeId), fetcher);
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/fq/libs/init/init.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ void Init(
const std::function<IActor*(const NKikimrProto::NFolderService::TFolderServiceConfig& authConfig)>& folderServiceFactory,
ui32 icPort,
const std::vector<NKikimr::NMiniKQL::TComputationNodeFactory>& additionalCompNodeFactories,
NYql::IPqGateway::TPtr defaultPqGateway = nullptr
NYql::IPqGatewayFactory::TPtr pqGatewayFactory = nullptr
);

} // NFq
12 changes: 10 additions & 2 deletions ydb/core/fq/libs/read_rule/read_rule_creator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ class TSingleReadRuleCreator : public TActorBootstrapped<TSingleReadRuleCreator>
NActors::TActorId owner,
TString queryId,
NYdb::TDriver ydbDriver,
const NYql::IPqGateway::TPtr& pqGateway,
const Fq::Private::TopicConsumer& topicConsumer,
std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProvider,
ui64 index
Expand All @@ -80,6 +81,7 @@ class TSingleReadRuleCreator : public TActorBootstrapped<TSingleReadRuleCreator>
, QueryId(std::move(queryId))
, TopicConsumer(topicConsumer)
, YdbDriver(std::move(ydbDriver))
, PqGateway(pqGateway)
, TopicClient(YdbDriver, GetTopicClientSettings(std::move(credentialsProvider)))
, Index(index)
{
Expand Down Expand Up @@ -184,7 +186,7 @@ class TSingleReadRuleCreator : public TActorBootstrapped<TSingleReadRuleCreator>

private:
NYdb::NTopic::TTopicClientSettings GetTopicClientSettings(std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProvider) {
return NYdb::NTopic::TTopicClientSettings()
return PqGateway->GetTopicClientSettings()
.Database(TopicConsumer.database())
.DiscoveryEndpoint(TopicConsumer.cluster_endpoint())
.CredentialsProviderFactory(std::move(credentialsProvider))
Expand All @@ -197,6 +199,7 @@ class TSingleReadRuleCreator : public TActorBootstrapped<TSingleReadRuleCreator>
const TString QueryId;
const Fq::Private::TopicConsumer TopicConsumer;
NYdb::TDriver YdbDriver;
NYql::IPqGateway::TPtr PqGateway;
NYdb::NTopic::TTopicClient TopicClient;
ui64 Index = 0;
NYdb::NTopic::IRetryPolicy::IRetryState::TPtr RetryState;
Expand All @@ -211,12 +214,14 @@ class TReadRuleCreator : public TActorBootstrapped<TReadRuleCreator> {
NActors::TActorId owner,
TString queryId,
NYdb::TDriver ydbDriver,
const NYql::IPqGateway::TPtr& pqGateway,
const ::google::protobuf::RepeatedPtrField<Fq::Private::TopicConsumer>& topicConsumers,
TVector<std::shared_ptr<NYdb::ICredentialsProviderFactory>> credentials
)
: Owner(owner)
, QueryId(std::move(queryId))
, YdbDriver(std::move(ydbDriver))
, PqGateway(pqGateway)
, TopicConsumers(VectorFromProto(topicConsumers))
, Credentials(std::move(credentials))
{
Expand All @@ -233,7 +238,7 @@ class TReadRuleCreator : public TActorBootstrapped<TReadRuleCreator> {
Results.reserve(TopicConsumers.size());
for (size_t i = 0; i < TopicConsumers.size(); ++i) {
LOG_D("Create read rule creation actor for `" << TopicConsumers[i].topic_path() << "` [" << i << "]");
Children.push_back(Register(new TSingleReadRuleCreator(SelfId(), QueryId, YdbDriver, TopicConsumers[i], Credentials[i], i)));
Children.push_back(Register(new TSingleReadRuleCreator(SelfId(), QueryId, YdbDriver, PqGateway, TopicConsumers[i], Credentials[i], i)));
}
}

Expand Down Expand Up @@ -282,6 +287,7 @@ class TReadRuleCreator : public TActorBootstrapped<TReadRuleCreator> {
const NActors::TActorId Owner;
const TString QueryId;
NYdb::TDriver YdbDriver;
NYql::IPqGateway::TPtr PqGateway;
const TVector<Fq::Private::TopicConsumer> TopicConsumers;
const TVector<std::shared_ptr<NYdb::ICredentialsProviderFactory>> Credentials;
size_t ResultsGot = 0;
Expand All @@ -296,6 +302,7 @@ NActors::IActor* MakeReadRuleCreatorActor(
NActors::TActorId owner,
TString queryId,
NYdb::TDriver ydbDriver,
const NYql::IPqGateway::TPtr& pqGateway,
const ::google::protobuf::RepeatedPtrField<Fq::Private::TopicConsumer>& topicConsumers,
TVector<std::shared_ptr<NYdb::ICredentialsProviderFactory>> credentials
)
Expand All @@ -304,6 +311,7 @@ NActors::IActor* MakeReadRuleCreatorActor(
owner,
std::move(queryId),
std::move(ydbDriver),
pqGateway,
topicConsumers,
std::move(credentials)
);
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/fq/libs/read_rule/read_rule_creator.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@
#include <ydb-cpp-sdk/client/driver/driver.h>

#include <ydb/library/actors/core/actor.h>
#include <ydb/library/yql/providers/pq/provider/yql_pq_gateway.h>

namespace NFq {

NActors::IActor* MakeReadRuleCreatorActor(
NActors::TActorId owner,
TString queryId,
NYdb::TDriver ydbDriver,
const NYql::IPqGateway::TPtr& pqGateway,
const ::google::protobuf::RepeatedPtrField<Fq::Private::TopicConsumer>& topicConsumers,
TVector<std::shared_ptr<NYdb::ICredentialsProviderFactory>> credentials // For each topic
);
Expand Down
Loading
Loading