Skip to content

Commit e7cb027

Browse files
authored
YQ-3912 Add read_group ( ~ connection_id) to pq cluster config (#12064)
1 parent 7af313d commit e7cb027

File tree

9 files changed

+21
-10
lines changed

9 files changed

+21
-10
lines changed

ydb/core/fq/libs/actors/clusters_from_connections.cpp

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,8 @@ void FillClusterAuth(TClusterConfig& clusterCfg,
4141
void FillPqClusterConfig(NYql::TPqClusterConfig& clusterConfig,
4242
const TString& name, bool useBearerForYdb,
4343
const TString& authToken, const THashMap<TString, TString>& accountIdSignatures,
44-
const FederatedQuery::DataStreams& ds) {
44+
const FederatedQuery::DataStreams& ds,
45+
const TString& readGroup) {
4546
clusterConfig.SetName(name);
4647
if (ds.endpoint()) {
4748
clusterConfig.SetEndpoint(ds.endpoint());
@@ -52,6 +53,7 @@ void FillPqClusterConfig(NYql::TPqClusterConfig& clusterConfig,
5253
clusterConfig.SetAddBearerToToken(useBearerForYdb);
5354
clusterConfig.SetClusterType(TPqClusterConfig::CT_DATA_STREAMS);
5455
clusterConfig.SetSharedReading(ds.shared_reading());
56+
clusterConfig.SetReadGroup(readGroup);
5557
FillClusterAuth(clusterConfig, ds.auth(), authToken, accountIdSignatures);
5658
}
5759

@@ -177,13 +179,14 @@ void FillGenericClusterConfig<FederatedQuery::PostgreSQLCluster>(
177179

178180
NYql::TPqClusterConfig CreatePqClusterConfig(const TString& name,
179181
bool useBearerForYdb, const TString& authToken,
180-
const TString& accountSignature, const FederatedQuery::DataStreams& ds) {
182+
const TString& accountSignature, const FederatedQuery::DataStreams& ds,
183+
const TString& readGroup) {
181184
NYql::TPqClusterConfig cluster;
182185
THashMap<TString, TString> accountIdSignatures;
183186
if (ds.auth().has_service_account()) {
184187
accountIdSignatures[ds.auth().service_account().id()] = accountSignature;
185188
}
186-
FillPqClusterConfig(cluster, name, useBearerForYdb, authToken, accountIdSignatures, ds);
189+
FillPqClusterConfig(cluster, name, useBearerForYdb, authToken, accountIdSignatures, ds, readGroup);
187190
return cluster;
188191
}
189192

@@ -255,7 +258,7 @@ void AddClustersFromConnections(
255258
case FederatedQuery::ConnectionSetting::kDataStreams: {
256259
const auto& ds = conn.content().setting().data_streams();
257260
auto* clusterCfg = gatewaysConfig.MutablePq()->AddClusterMapping();
258-
FillPqClusterConfig(*clusterCfg, connectionName, common.GetUseBearerForYdb(), authToken, accountIdSignatures, ds);
261+
FillPqClusterConfig(*clusterCfg, connectionName, common.GetUseBearerForYdb(), authToken, accountIdSignatures, ds, conn.meta().id());
259262
clusters.emplace(connectionName, PqProviderName);
260263
break;
261264
}

ydb/core/fq/libs/actors/clusters_from_connections.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
namespace NFq {
88

9-
NYql::TPqClusterConfig CreatePqClusterConfig(const TString& name, bool useBearerForYdb, const TString& authToken, const TString& accountSignature, const FederatedQuery::DataStreams& ds);
9+
NYql::TPqClusterConfig CreatePqClusterConfig(const TString& name, bool useBearerForYdb, const TString& authToken, const TString& accountSignature, const FederatedQuery::DataStreams& ds, const TString& readGroup);
1010

1111
NYql::TS3ClusterConfig CreateS3ClusterConfig(const TString& name, const TString& authToken, const TString& objectStorageEndpoint, const TString& accountSignature, const FederatedQuery::ObjectStorageConnection& s3);
1212

ydb/core/fq/libs/test_connection/test_data_streams.cpp

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ using namespace NActors;
7171

7272
class TTestDataStreamsConnectionActor : public NActors::TActorBootstrapped<TTestDataStreamsConnectionActor> {
7373
inline static const TString SessionName = "test_connection_data_streams";
74+
inline static const TString ReadGroup = "read_group";
7475

7576
NFq::NConfig::TCommonConfig CommonConfig;
7677
TActorId Sender;
@@ -117,7 +118,7 @@ class TTestDataStreamsConnectionActor : public NActors::TActorBootstrapped<TTest
117118
, CmConnections(cmConnections)
118119
, FunctionRegistry(functionRegistry)
119120
, DbResolver(dbResolver)
120-
, ClusterConfig(CreateClusterConfig(SessionName, CommonConfig, Token, signer, ds))
121+
, ClusterConfig(CreateClusterConfig(SessionName, CommonConfig, Token, signer, ds, ReadGroup))
121122
, StructuredToken(NYql::ComposeStructuredTokenJsonForServiceAccount(ClusterConfig.GetServiceAccountId(), ClusterConfig.GetServiceAccountIdSignature(), ClusterConfig.GetToken()))
122123
{
123124
Counters->InFly->Inc();
@@ -248,10 +249,10 @@ class TTestDataStreamsConnectionActor : public NActors::TActorBootstrapped<TTest
248249
DestroyActor();
249250
}
250251

251-
static NYql::TPqClusterConfig CreateClusterConfig(const TString& sessionName, const NFq::NConfig::TCommonConfig& commonConfig, const TString& token, const NFq::TSigner::TPtr& signer, const FederatedQuery::DataStreams& ds) {
252+
static NYql::TPqClusterConfig CreateClusterConfig(const TString& sessionName, const NFq::NConfig::TCommonConfig& commonConfig, const TString& token, const NFq::TSigner::TPtr& signer, const FederatedQuery::DataStreams& ds, const TString& readGroup) {
252253
const auto& auth = ds.auth();
253254
const TString signedAccountId = signer && auth.has_service_account() ? signer->SignAccountId(auth.service_account().id()) : TString{};
254-
return NFq::CreatePqClusterConfig(sessionName, commonConfig.GetUseBearerForYdb(), token, signedAccountId, ds);
255+
return NFq::CreatePqClusterConfig(sessionName, commonConfig.GetUseBearerForYdb(), token, signedAccountId, ds, readGroup);
255256
}
256257

257258
NYql::TPqGatewayServices CreateGatewayServices() {

ydb/library/yql/providers/pq/common/yql_names.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,5 +16,6 @@ constexpr TStringBuf WatermarksGranularityUsSetting = "WatermarksGranularityUs";
1616
constexpr TStringBuf WatermarksLateArrivalDelayUsSetting = "WatermarksLateArrivalDelayUs";
1717
constexpr TStringBuf WatermarksIdlePartitionsSetting = "WatermarksIdlePartitions";
1818
constexpr TStringBuf ReconnectPeriod = "ReconnectPeriod";
19+
constexpr TStringBuf ReadGroup = "ReadGroup";
1920

2021
} // namespace NYql

ydb/library/yql/providers/pq/proto/dq_io.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ message TDqPqTopicSource {
3939
bool SharedReading = 15;
4040
string ReconnectPeriod = 16; // disabled by default, example of a parameter: 5m
4141
bool EnabledLLVM = 17;
42+
string ReadGroup = 18;
4243
}
4344

4445
message TDqPqTopicSink {

ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,8 @@ class TPqDqIntegration: public TDqIntegrationBase {
229229
sharedReading = FromString<bool>(Value(setting));
230230
} else if (name == ReconnectPeriod) {
231231
srcDesc.SetReconnectPeriod(TString(Value(setting)));
232+
} else if (name == ReadGroup) {
233+
srcDesc.SetReadGroup(TString(Value(setting)));
232234
} else if (name == Format) {
233235
format = TString(Value(setting));
234236
} else if (name == UseSslSetting) {
@@ -348,7 +350,7 @@ class TPqDqIntegration: public TDqIntegrationBase {
348350
Add(props, SharedReading, ToString(clusterConfiguration->SharedReading), pos, ctx);
349351
Add(props, ReconnectPeriod, ToString(clusterConfiguration->ReconnectPeriod), pos, ctx);
350352
Add(props, Format, format, pos, ctx);
351-
353+
Add(props, ReadGroup, clusterConfiguration->ReadGroup, pos, ctx);
352354

353355
if (clusterConfiguration->UseSsl) {
354356
Add(props, UseSslSetting, "1", pos, ctx);

ydb/library/yql/providers/pq/provider/yql_pq_settings.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ void TPqConfiguration::Init(
4343
clusterSettings.UseSsl = cluster.GetUseSsl();
4444
clusterSettings.AddBearerToToken = cluster.GetAddBearerToToken();
4545
clusterSettings.SharedReading = cluster.GetSharedReading();
46+
clusterSettings.ReadGroup = cluster.GetReadGroup();
4647

4748
const TString authToken = typeCtx->Credentials->FindCredentialContent("cluster:default_" + clusterSettings.ClusterName, "default_pq", cluster.GetToken());
4849
clusterSettings.AuthToken = authToken;

ydb/library/yql/providers/pq/provider/yql_pq_settings.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ struct TPqClusterConfigurationSettings {
3131
bool AddBearerToToken = false;
3232
bool SharedReading = false;
3333
TString ReconnectPeriod;
34+
TString ReadGroup;
3435
};
3536

3637
struct TPqConfiguration : public TPqSettings, public NCommon::TSettingDispatcher {

ydb/library/yql/tools/dqrun/examples/gateways.conf

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,8 @@ Pq {
143143
Database: "local"
144144
ClusterType: CT_DATA_STREAMS
145145
UseSsl: True
146-
SharedReading:False
146+
SharedReading:True
147+
ReadGroup: "read_group"
147148
}
148149
}
149150

0 commit comments

Comments
 (0)