Skip to content

Commit 3c9b479

Browse files
authored
Merge bce8674 into 86aed8c
2 parents 86aed8c + bce8674 commit 3c9b479

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+423
-136
lines changed

ydb/core/driver_lib/run/kikimr_services_initializers.cpp

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,7 @@
173173

174174
#include <ydb/core/fq/libs/init/init.h>
175175
#include <ydb/core/fq/libs/logs/log.h>
176+
#include <ydb/core/fq/libs/row_dispatcher/row_dispatcher_service.h>
176177

177178
#include <ydb/library/folder_service/folder_service.h>
178179
#include <ydb/library/folder_service/proto/config.pb.h>
@@ -2155,10 +2156,12 @@ void TQuoterServiceInitializer::InitializeServices(NActors::TActorSystemSetup* s
21552156
TKqpServiceInitializer::TKqpServiceInitializer(
21562157
const TKikimrRunConfig& runConfig,
21572158
std::shared_ptr<TModuleFactories> factories,
2158-
IGlobalObjectStorage& globalObjects)
2159+
IGlobalObjectStorage& globalObjects,
2160+
NFq::IYqSharedResources::TPtr yqSharedResources)
21592161
: IKikimrServicesInitializer(runConfig)
21602162
, Factories(std::move(factories))
21612163
, GlobalObjects(globalObjects)
2164+
, YqSharedResources(yqSharedResources)
21622165
{}
21632166

21642167
void TKqpServiceInitializer::InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) {
@@ -2212,6 +2215,38 @@ void TKqpServiceInitializer::InitializeServices(NActors::TActorSystemSetup* setu
22122215
setup->LocalServices.push_back(std::make_pair(
22132216
NKqp::MakeKqpFinalizeScriptServiceId(NodeId),
22142217
TActorSetupCmd(finalize, TMailboxType::HTSwap, appData->UserPoolId)));
2218+
2219+
const auto& sharedReading = Config.GetQueryServiceConfig().GetSharedReading();
2220+
if (sharedReading.GetEnabled()) {
2221+
NFq::TYqSharedResources::TPtr yqSharedResources = NFq::TYqSharedResources::Cast(YqSharedResources);
2222+
2223+
NYql::TPqGatewayServices pqServices(
2224+
yqSharedResources->UserSpaceYdbDriver,
2225+
nullptr,
2226+
nullptr,
2227+
std::make_shared<NYql::TPqGatewayConfig>(),
2228+
nullptr,
2229+
nullptr
2230+
// commonTopicClientSettings
2231+
);
2232+
// FederatedQuerySetup = federatedQuerySetupFactory->Make(ctx.ActorSystem());
2233+
2234+
auto service = NFq::NewRowDispatcherService(
2235+
Config.GetQueryServiceConfig().GetSharedReading(),
2236+
NKikimr::CreateYdbCredentialsProviderFactory,
2237+
yqSharedResources,
2238+
nullptr, //credentialsFactory, todo
2239+
"tenant",
2240+
appData->Counters->GetSubgroup("subsystem", "row_dispatcher"),
2241+
CreatePqNativeGateway(pqServices),
2242+
NActors::TActorId{},
2243+
appData->Mon,
2244+
appData->Counters);
2245+
2246+
setup->LocalServices.push_back(std::make_pair(
2247+
NFq::RowDispatcherServiceActorId(),
2248+
TActorSetupCmd(service.release(), TMailboxType::HTSwap, appData->UserPoolId)));
2249+
}
22152250
}
22162251
}
22172252

ydb/core/driver_lib/run/kikimr_services_initializers.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -389,12 +389,13 @@ class TQuoterServiceInitializer : public IKikimrServicesInitializer {
389389
class TKqpServiceInitializer : public IKikimrServicesInitializer {
390390
public:
391391
TKqpServiceInitializer(const TKikimrRunConfig& runConfig, std::shared_ptr<TModuleFactories> factories,
392-
IGlobalObjectStorage& globalObjects);
392+
IGlobalObjectStorage& globalObjects, NFq::IYqSharedResources::TPtr yqSharedResources);
393393

394394
void InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) override;
395395
private:
396396
std::shared_ptr<TModuleFactories> Factories;
397397
IGlobalObjectStorage& GlobalObjects;
398+
NFq::IYqSharedResources::TPtr YqSharedResources;
398399
};
399400

400401
class TCompDiskLimiterInitializer: public IKikimrServicesInitializer {

ydb/core/driver_lib/run/run.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1727,7 +1727,7 @@ TIntrusivePtr<TServiceInitializersList> TKikimrRunner::CreateServiceInitializers
17271727
sil->AddServiceInitializer(new TMemoryControllerInitializer(runConfig, ProcessMemoryInfoProvider));
17281728

17291729
if (serviceMask.EnableKqp) {
1730-
sil->AddServiceInitializer(new TKqpServiceInitializer(runConfig, ModuleFactories, *this));
1730+
sil->AddServiceInitializer(new TKqpServiceInitializer(runConfig, ModuleFactories, *this, YqSharedResources));
17311731
}
17321732

17331733
if (serviceMask.EnableMetadataProvider) {

ydb/core/external_sources/external_source_factory.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ IExternalSourceFactory::TPtr CreateExternalSourceFactory(const std::vector<TStri
129129
},
130130
{
131131
ToString(NYql::EDatabaseType::Ydb),
132-
CreateExternalDataSource(TString{NYql::GenericProviderName}, {"NONE", "BASIC", "SERVICE_ACCOUNT", "TOKEN"}, {"database_name", "use_tls", "database_id"}, hostnamePatternsRegEx)
132+
CreateExternalDataSource(TString{NYql::GenericProviderName}, {"NONE", "BASIC", "SERVICE_ACCOUNT", "TOKEN"}, {"database_name", "use_tls", "database_id", "shared_reading"}, hostnamePatternsRegEx)
133133
},
134134
{
135135
ToString(NYql::EDatabaseType::YT),
@@ -177,7 +177,7 @@ IExternalSourceFactory::TPtr CreateExternalSourceFactory(const std::vector<TStri
177177
},
178178
{
179179
ToString(NYql::EDatabaseType::YdbTopics),
180-
CreateExternalDataSource(TString{NYql::PqProviderName}, {"NONE", "BASIC", "TOKEN"}, {"database_name", "use_tls"}, hostnamePatternsRegEx)
180+
CreateExternalDataSource(TString{NYql::PqProviderName}, {"NONE", "BASIC", "TOKEN"}, {"database_name", "use_tls", "shared_reading"}, hostnamePatternsRegEx)
181181
}
182182
},
183183
allExternalDataSourcesAreAvailable,

ydb/core/fq/libs/init/init.cpp

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -233,18 +233,18 @@ void Init(
233233
nullptr,
234234
commonTopicClientSettings
235235
);
236-
auto rowDispatcher = NFq::NewRowDispatcherService(
237-
protoConfig.GetRowDispatcher(),
238-
NKikimr::CreateYdbCredentialsProviderFactory,
239-
yqSharedResources,
240-
credentialsFactory,
241-
tenant,
242-
yqCounters->GetSubgroup("subsystem", "row_dispatcher"),
243-
pqGatewayFactory ? pqGatewayFactory->CreatePqGateway() : CreatePqNativeGateway(pqServices),
244-
MakeNodesManagerId(),
245-
appData->Mon,
246-
appData->Counters);
247-
actorRegistrator(NFq::RowDispatcherServiceActorId(), rowDispatcher.release());
236+
// auto rowDispatcher = NFq::NewRowDispatcherService(
237+
// protoConfig.GetRowDispatcher(),
238+
// NKikimr::CreateYdbCredentialsProviderFactory,
239+
// yqSharedResources,
240+
// credentialsFactory,
241+
// tenant,
242+
// yqCounters->GetSubgroup("subsystem", "row_dispatcher"),
243+
// pqGatewayFactory ? pqGatewayFactory->CreatePqGateway() : CreatePqNativeGateway(pqServices),
244+
// MakeNodesManagerId(),
245+
// appData->Mon,
246+
// appData->Counters);
247+
// actorRegistrator(NFq::RowDispatcherServiceActorId(), rowDispatcher.release());
248248
}
249249

250250
auto s3ActorsFactory = NYql::NDq::CreateS3ActorsFactory();

ydb/core/fq/libs/row_dispatcher/actors_factory.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ struct TActorFactory : public IActorFactory {
1313
const TString& topicPath,
1414
const TString& endpoint,
1515
const TString& database,
16-
const NConfig::TRowDispatcherConfig& config,
16+
const NKikimrConfig::TSharedReadingConfig& config,
1717
NActors::TActorId rowDispatcherActorId,
1818
NActors::TActorId compileServiceActorId,
1919
ui32 partitionId,

ydb/core/fq/libs/row_dispatcher/actors_factory.h

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
11
#pragma once
22

3-
#include <ydb/core/fq/libs/config/protos/row_dispatcher.pb.h>
43
#include <util/generic/ptr.h>
54
#include <ydb/library/actors/core/actor.h>
65
#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/driver/driver.h>
76
#include <ydb/library/yql/providers/pq/provider/yql_pq_gateway.h>
87

8+
namespace NKikimrConfig {
9+
class TSharedReadingConfig;
10+
} // namespace NKikimrConfig
11+
912
namespace NFq::NRowDispatcher {
1013

1114
struct IActorFactory : public TThrRefBase {
@@ -16,7 +19,7 @@ struct IActorFactory : public TThrRefBase {
1619
const TString& topicPath,
1720
const TString& endpoint,
1821
const TString& database,
19-
const NConfig::TRowDispatcherConfig& config,
22+
const NKikimrConfig::TSharedReadingConfig& config,
2023
NActors::TActorId rowDispatcherActorId,
2124
NActors::TActorId compileServiceActorId,
2225
ui32 partitionId,

ydb/core/fq/libs/row_dispatcher/coordinator.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ class TActorCoordinator : public TActorBootstrapped<TActorCoordinator> {
172172
TTopicMetrics Metrics;
173173
};
174174

175-
NConfig::TRowDispatcherCoordinatorConfig Config;
175+
NKikimrConfig::TSharedReadingConfig::TCoordinatorConfig Config;
176176
TYqSharedResources::TPtr YqSharedResources;
177177
TActorId LocalRowDispatcherId;
178178
NActors::TActorId NodesManagerId;
@@ -189,7 +189,7 @@ class TActorCoordinator : public TActorBootstrapped<TActorCoordinator> {
189189
public:
190190
TActorCoordinator(
191191
NActors::TActorId localRowDispatcherId,
192-
const NConfig::TRowDispatcherCoordinatorConfig& config,
192+
const NKikimrConfig::TSharedReadingConfig::TCoordinatorConfig& config,
193193
const TYqSharedResources::TPtr& yqSharedResources,
194194
const TString& tenant,
195195
const ::NMonitoring::TDynamicCounterPtr& counters,
@@ -236,7 +236,7 @@ class TActorCoordinator : public TActorBootstrapped<TActorCoordinator> {
236236

237237
TActorCoordinator::TActorCoordinator(
238238
NActors::TActorId localRowDispatcherId,
239-
const NConfig::TRowDispatcherCoordinatorConfig& config,
239+
const NKikimrConfig::TSharedReadingConfig::TCoordinatorConfig& config,
240240
const TYqSharedResources::TPtr& yqSharedResources,
241241
const TString& tenant,
242242
const ::NMonitoring::TDynamicCounterPtr& counters,
@@ -544,7 +544,7 @@ void TActorCoordinator::SendError(TActorId readActorId, const TCoordinatorReques
544544

545545
std::unique_ptr<NActors::IActor> NewCoordinator(
546546
NActors::TActorId rowDispatcherId,
547-
const NConfig::TRowDispatcherCoordinatorConfig& config,
547+
const NKikimrConfig::TSharedReadingConfig::TCoordinatorConfig& config,
548548
const TYqSharedResources::TPtr& yqSharedResources,
549549
const TString& tenant,
550550
const ::NMonitoring::TDynamicCounterPtr& counters,

ydb/core/fq/libs/row_dispatcher/coordinator.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,17 @@
11
#pragma once
22

3-
#include <ydb/core/fq/libs/config/protos/row_dispatcher.pb.h>
4-
53
#include <ydb/library/actors/core/actor.h>
64
#include <ydb/core/fq/libs/shared_resources/shared_resources.h>
75

6+
#include <ydb/core/protos/config.pb.h>
7+
88
namespace NFq {
99

1010
////////////////////////////////////////////////////////////////////////////////
1111

1212
std::unique_ptr<NActors::IActor> NewCoordinator(
1313
NActors::TActorId rowDispatcherId,
14-
const NConfig::TRowDispatcherCoordinatorConfig& config,
14+
const NKikimrConfig::TSharedReadingConfig::TCoordinatorConfig& config,
1515
const TYqSharedResources::TPtr& yqSharedResources,
1616
const TString& tenant,
1717
const ::NMonitoring::TDynamicCounterPtr& counters,

ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010

1111
#include <yql/essentials/minikql/computation/mkql_computation_node_pack.h>
1212

13+
#include <ydb/core/protos/config.pb.h>
14+
1315
namespace NFq::NRowDispatcher {
1416

1517
namespace {
@@ -606,7 +608,7 @@ ITopicFormatHandler::TPtr CreateTopicFormatHandler(const NActors::TActorContext&
606608
return ITopicFormatHandler::TPtr(handler);
607609
}
608610

609-
TFormatHandlerConfig CreateFormatHandlerConfig(const NConfig::TRowDispatcherConfig& rowDispatcherConfig, NActors::TActorId compileServiceId) {
611+
TFormatHandlerConfig CreateFormatHandlerConfig(const NKikimrConfig::TSharedReadingConfig& rowDispatcherConfig, NActors::TActorId compileServiceId) {
610612
return {
611613
.JsonParserConfig = CreateJsonParserConfig(rowDispatcherConfig.GetJsonParser()),
612614
.FiltersConfig = {

0 commit comments

Comments
 (0)