Skip to content

Commit d655293

Browse files
authored
Merge 2083d4d into c692b6f
2 parents c692b6f + 2083d4d commit d655293

File tree

12 files changed

+55
-8
lines changed

12 files changed

+55
-8
lines changed

ydb/core/fq/libs/config/protos/common.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,4 +32,6 @@ message TCommonConfig {
3232
bool ShowQueryTimeline = 16;
3333
uint64 MaxQueryTimelineSize = 17; // default: 200KB
3434
string PqReconnectPeriod = 18; // default: disabled
35+
uint32 TopicClientHandlersExecutorThreadsNum = 19; // default: 0 that means use default from TopicClientSettings (1)
36+
uint32 TopicClientCompressionExecutorThreadsNum = 20; // default: 0 that means use default from TopicClientSettings (2)
3537
}

ydb/core/fq/libs/init/init.cpp

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,17 @@ namespace NFq {
5656

5757
using namespace NKikimr;
5858

59+
NYdb::NTopic::TTopicClientSettings GetCommonTopicClientSettings(const NFq::NConfig::TCommonConfig& config) {
60+
NYdb::NTopic::TTopicClientSettings settings;
61+
if (config.GetTopicClientHandlersExecutorThreadsNum()) {
62+
settings.DefaultHandlersExecutor(NYdb::NTopic::CreateThreadPoolExecutor(config.GetTopicClientHandlersExecutorThreadsNum()));
63+
}
64+
if (config.GetTopicClientCompressionExecutorThreadsNum()) {
65+
settings.DefaultCompressionExecutor(NYdb::NTopic::CreateThreadPoolExecutor(config.GetTopicClientCompressionExecutorThreadsNum()));
66+
}
67+
return settings;
68+
}
69+
5970
void Init(
6071
const NFq::NConfig::TConfig& protoConfig,
6172
ui32 nodeId,
@@ -189,13 +200,18 @@ void Init(
189200
credentialsFactory = NYql::CreateSecuredServiceAccountCredentialsOverTokenAccessorFactory(tokenAccessorConfig.GetEndpoint(), tokenAccessorConfig.GetUseSsl(), caContent, tokenAccessorConfig.GetConnectionPoolSize());
190201
}
191202

203+
TMaybe<NYdb::NTopic::TTopicClientSettings> commonTopicClientSettings;
204+
192205
if (protoConfig.GetRowDispatcher().GetEnabled()) {
206+
commonTopicClientSettings = GetCommonTopicClientSettings(protoConfig.GetCommon());
193207
NYql::TPqGatewayServices pqServices(
194208
yqSharedResources->UserSpaceYdbDriver,
195209
nullptr,
196210
nullptr,
197211
std::make_shared<NYql::TPqGatewayConfig>(),
198-
nullptr);
212+
nullptr,
213+
nullptr,
214+
*commonTopicClientSettings);
199215

200216
auto rowDispatcher = NFq::NewRowDispatcherService(
201217
protoConfig.GetRowDispatcher(),
@@ -223,7 +239,9 @@ void Init(
223239
pqCmConnections,
224240
credentialsFactory,
225241
std::make_shared<NYql::TPqGatewayConfig>(protoConfig.GetGateways().GetPq()),
226-
appData->FunctionRegistry
242+
appData->FunctionRegistry,
243+
nullptr,
244+
commonTopicClientSettings.GetOrElse(GetCommonTopicClientSettings(protoConfig.GetCommon()))
227245
);
228246
auto pqGateway = NYql::CreatePqNativeGateway(std::move(pqServices));
229247
RegisterDqPqReadActorFactory(*asyncIoFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory, pqGateway,

ydb/core/fq/libs/row_dispatcher/topic_session.cpp

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -411,12 +411,11 @@ void TTopicSession::SubscribeOnNextEvent() {
411411
}
412412

413413
NYdb::NTopic::TTopicClientSettings TTopicSession::GetTopicClientSettings(const NYql::NPq::NProto::TDqPqTopicSource& sourceParams) const {
414-
NYdb::NTopic::TTopicClientSettings opts;
415-
opts.Database(Database)
414+
return PqGateway->GetTopicClientSettings()
415+
.Database(Database)
416416
.DiscoveryEndpoint(Endpoint)
417417
.SslCredentials(NYdb::TSslCredentials(sourceParams.GetUseSsl()))
418418
.CredentialsProviderFactory(CredentialsProviderFactory);
419-
return opts;
420419
}
421420

422421
NYql::ITopicClient& TTopicSession::GetTopicClient(const NYql::NPq::NProto::TDqPqTopicSource& sourceParams) {

ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ class TDqPqReadActor : public NActors::TActor<TDqPqReadActor>, public NYql::NDq:
161161
}
162162

163163
NYdb::NTopic::TTopicClientSettings GetTopicClientSettings() const {
164-
NYdb::NTopic::TTopicClientSettings opts;
164+
NYdb::NTopic::TTopicClientSettings opts = PqGateway->GetTopicClientSettings();
165165
opts.Database(SourceParams.GetDatabase())
166166
.DiscoveryEndpoint(SourceParams.GetEndpoint())
167167
.SslCredentials(NYdb::TSslCredentials(SourceParams.GetUseSsl()))

ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -312,7 +312,7 @@ class TDqPqWriteActor : public NActors::TActor<TDqPqWriteActor>, public IDqCompu
312312
}
313313

314314
NYdb::NTopic::TTopicClientSettings GetTopicClientSettings() {
315-
return NYdb::NTopic::TTopicClientSettings()
315+
return PqGateway->GetTopicClientSettings()
316316
.Database(SinkParams.GetDatabase())
317317
.DiscoveryEndpoint(SinkParams.GetEndpoint())
318318
.SslCredentials(NYdb::TSslCredentials(SinkParams.GetUseSsl()))

ydb/library/yql/providers/pq/gateway/dummy/yql_pq_dummy_gateway.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,4 +80,8 @@ void TDummyPqGateway::UpdateClusterConfigs(
8080
Y_UNUSED(secure);
8181
}
8282

83+
NYdb::NTopic::TTopicClientSettings TDummyPqGateway::GetTopicClientSettings() {
84+
return NYdb::NTopic::TTopicClientSettings();
85+
}
86+
8387
} // namespace NYql

ydb/library/yql/providers/pq/gateway/dummy/yql_pq_dummy_gateway.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ class TDummyPqGateway : public IPqGateway {
5858
bool secure) override;
5959

6060
ITopicClient::TPtr GetTopicClient(const NYdb::TDriver& driver, const NYdb::NTopic::TTopicClientSettings& settings) override;
61+
NYdb::NTopic::TTopicClientSettings GetTopicClientSettings() const override;
6162

6263
using TClusterNPath = std::pair<TString, TString>;
6364
private:

ydb/library/yql/providers/pq/gateway/native/yql_pq_gateway.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ class TPqNativeGateway : public IPqGateway {
4141
bool secure) override;
4242

4343
ITopicClient::TPtr GetTopicClient(const NYdb::TDriver& driver, const NYdb::NTopic::TTopicClientSettings& settings) override;
44+
NYdb::NTopic::TTopicClientSettings GetTopicClientSettings() const override;
4445

4546
private:
4647
void InitClusterConfigs();
@@ -56,6 +57,7 @@ class TPqNativeGateway : public IPqGateway {
5657
NYdb::TDriver YdbDriver;
5758
TPqClusterConfigsMapPtr ClusterConfigs;
5859
THashMap<TString, TPqSession::TPtr> Sessions;
60+
TMaybe<NYdb::NTopic::TTopicClientSettings> CommonTopicClientSettings;
5961
};
6062

6163
TPqNativeGateway::TPqNativeGateway(const TPqGatewayServices& services)
@@ -65,6 +67,7 @@ TPqNativeGateway::TPqNativeGateway(const TPqGatewayServices& services)
6567
, CredentialsFactory(services.CredentialsFactory)
6668
, CmConnections(services.CmConnections)
6769
, YdbDriver(services.YdbDriver)
70+
, CommonTopicClientSettings(services.CommonTopicClientSettings)
6871
{
6972
Y_UNUSED(FunctionRegistry);
7073
InitClusterConfigs();
@@ -144,6 +147,10 @@ ITopicClient::TPtr TPqNativeGateway::GetTopicClient(const NYdb::TDriver& driver,
144147
return MakeIntrusive<TNativeTopicClient>(driver, settings);
145148
}
146149

150+
NYdb::NTopic::TTopicClientSettings TPqNativeGateway::GetTopicClientSettings() {
151+
return CommonTopicClientSettings.GetOrElse(NYdb::NTopic::TTopicClientSettings());
152+
}
153+
147154
TPqNativeGateway::~TPqNativeGateway() {
148155
Sessions.clear();
149156
}

ydb/library/yql/providers/pq/gateway/native/yql_pq_gateway.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,20 +26,23 @@ struct TPqGatewayServices {
2626
ISecuredServiceAccountCredentialsFactory::TPtr CredentialsFactory;
2727
::NPq::NConfigurationManager::IConnections::TPtr CmConnections;
2828
NYdb::TDriver YdbDriver;
29+
TMaybe<NYdb::NTopic::TTopicClientSettings> CommonTopicClientSettings;
2930

3031
TPqGatewayServices(
3132
NYdb::TDriver driver,
3233
::NPq::NConfigurationManager::IConnections::TPtr cmConnections,
3334
ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
3435
TPqGatewayConfigPtr config,
3536
const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
36-
IMetricsRegistryPtr metrics = nullptr)
37+
IMetricsRegistryPtr metrics = nullptr,
38+
TMaybe<NYdb::NTopic::TTopicClientSettings> commonTopicClientSettings = Nothing())
3739
: FunctionRegistry(functionRegistry)
3840
, Config(std::move(config))
3941
, Metrics(std::move(metrics))
4042
, CredentialsFactory(std::move(credentialsFactory))
4143
, CmConnections(std::move(cmConnections))
4244
, YdbDriver(std::move(driver))
45+
, CommonTopicClientSettings(commonTopicClientSettings)
4346
{
4447
}
4548
};

ydb/library/yql/providers/pq/provider/yql_pq_gateway.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ struct IPqGateway : public TThrRefBase {
3636
const TString& endpoint,
3737
const TString& database,
3838
bool secure) = 0;
39+
40+
virtual NYdb::NTopic::TTopicClientSettings GetTopicClientSettings() const = 0;
3941
};
4042

4143
} // namespace NYql

0 commit comments

Comments
 (0)