Skip to content

Commit e13dea6

Browse files
authored
Reconnect session has been supported (#9862)
1 parent 09cea11 commit e13dea6

File tree

7 files changed

+35
-1
lines changed

7 files changed

+35
-1
lines changed

ydb/library/yql/providers/common/proto/gateways_config.proto

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -326,7 +326,8 @@ message TPqClusterConfig {
326326
optional bool AddBearerToToken = 11; // whether to use prefix "Bearer " in token
327327
optional string DatabaseId = 12;
328328
repeated TAttr Settings = 100;
329-
optional bool SharedReading = 101;
329+
optional bool SharedReading = 101;
330+
optional string ReconnectPeriod = 102; // disabled by default, example of a parameter: 5m
330331
}
331332

332333
message TPqGatewayConfig {

ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ struct TEvPrivate {
7474
EvBegin = EventSpaceBegin(NActors::TEvents::ES_PRIVATE),
7575

7676
EvSourceDataReady = EvBegin,
77+
EvReconnectSession,
7778

7879
EvEnd
7980
};
@@ -83,6 +84,7 @@ struct TEvPrivate {
8384
// Events
8485

8586
struct TEvSourceDataReady : public TEventLocal<TEvSourceDataReady, EvSourceDataReady> {};
87+
struct TEvReconnectSession : public TEventLocal<TEvReconnectSession, EvReconnectSession> {};
8688
};
8789

8890
} // namespace
@@ -98,6 +100,7 @@ class TDqPqReadActor : public NActors::TActor<TDqPqReadActor>, public NYql::NDq:
98100
InFlyAsyncInputData = task->GetCounter("InFlyAsyncInputData");
99101
InFlySubscribe = task->GetCounter("InFlySubscribe");
100102
AsyncInputDataRate = task->GetCounter("AsyncInputDataRate", true);
103+
ReconnectRate = task->GetCounter("ReconnectRate", true);
101104
}
102105

103106
~TMetrics() {
@@ -110,6 +113,7 @@ class TDqPqReadActor : public NActors::TActor<TDqPqReadActor>, public NYql::NDq:
110113
::NMonitoring::TDynamicCounters::TCounterPtr InFlyAsyncInputData;
111114
::NMonitoring::TDynamicCounters::TCounterPtr InFlySubscribe;
112115
::NMonitoring::TDynamicCounters::TCounterPtr AsyncInputDataRate;
116+
::NMonitoring::TDynamicCounters::TCounterPtr ReconnectRate;
113117
};
114118

115119
public:
@@ -139,6 +143,7 @@ class TDqPqReadActor : public NActors::TActor<TDqPqReadActor>, public NYql::NDq:
139143
, CredentialsProviderFactory(std::move(credentialsProviderFactory))
140144
, PqGateway(pqGateway)
141145
{
146+
Y_UNUSED(TDuration::TryParse(SourceParams.GetReconnectPeriod(), ReconnectPeriod));
142147
MetadataFields.reserve(SourceParams.MetadataFieldsSize());
143148
TPqMetaExtractor fieldsExtractor;
144149
for (const auto& fieldName : SourceParams.GetMetadataFields()) {
@@ -209,6 +214,7 @@ class TDqPqReadActor : public NActors::TActor<TDqPqReadActor>, public NYql::NDq:
209214
private:
210215
STRICT_STFUNC(StateFunc,
211216
hFunc(TEvPrivate::TEvSourceDataReady, Handle);
217+
hFunc(TEvPrivate::TEvReconnectSession, Handle);
212218
)
213219

214220
void Handle(TEvPrivate::TEvSourceDataReady::TPtr& ev) {
@@ -222,6 +228,18 @@ class TDqPqReadActor : public NActors::TActor<TDqPqReadActor>, public NYql::NDq:
222228
Send(ComputeActorId, new TEvNewAsyncInputDataArrived(InputIndex));
223229
}
224230

231+
void Handle(TEvPrivate::TEvReconnectSession::TPtr&) {
232+
SRC_LOG_D("SessionId: " << GetSessionId() << ", Reconnect epoch: " << Metrics.ReconnectRate->Val());
233+
Metrics.ReconnectRate->Inc();
234+
if (ReadSession) {
235+
ReadSession->Close(TDuration::Zero());
236+
ReadSession.reset();
237+
ReadyBuffer = std::queue<TReadyBatch>{}; // clear read buffer
238+
}
239+
240+
Schedule(ReconnectPeriod, new TEvPrivate::TEvReconnectSession());
241+
}
242+
225243
// IActor & IDqComputeActorAsyncInput
226244
void PassAway() override { // Is called from Compute Actor
227245
std::queue<TReadyBatch> empty;
@@ -259,6 +277,12 @@ class TDqPqReadActor : public NActors::TActor<TDqPqReadActor>, public NYql::NDq:
259277
const auto now = TInstant::Now();
260278
MaybeScheduleNextIdleCheck(now);
261279

280+
if (!InflightReconnect && ReconnectPeriod != TDuration::Zero()) {
281+
Metrics.ReconnectRate->Inc();
282+
Schedule(ReconnectPeriod, new TEvPrivate::TEvSourceDataReady());
283+
InflightReconnect = true;
284+
}
285+
262286
i64 usedSpace = 0;
263287
if (MaybeReturnReadyBatch(buffer, watermark, usedSpace)) {
264288
return usedSpace;
@@ -565,6 +589,8 @@ class TDqPqReadActor : public NActors::TActor<TDqPqReadActor>, public NYql::NDq:
565589
};
566590

567591
private:
592+
bool InflightReconnect = false;
593+
TDuration ReconnectPeriod;
568594
TMetrics Metrics;
569595
const i64 BufferSize;
570596
const THolderFactory& HolderFactory;

ydb/library/yql/providers/pq/common/yql_names.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,5 +15,6 @@ constexpr TStringBuf WatermarksEnableSetting = "WatermarksEnable";
1515
constexpr TStringBuf WatermarksGranularityUsSetting = "WatermarksGranularityUs";
1616
constexpr TStringBuf WatermarksLateArrivalDelayUsSetting = "WatermarksLateArrivalDelayUs";
1717
constexpr TStringBuf WatermarksIdlePartitionsSetting = "WatermarksIdlePartitions";
18+
constexpr TStringBuf ReconnectPeriod = "ReconnectPeriod";
1819

1920
} // namespace NYql

ydb/library/yql/providers/pq/proto/dq_io.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ message TDqPqTopicSource {
3737
repeated string ColumnTypes = 13;
3838
string Predicate = 14;
3939
bool SharedReading = 15;
40+
string ReconnectPeriod = 16; // disabled by default, example of a parameter: 5m
4041
}
4142

4243
message TDqPqTopicSink {

ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,8 @@ class TPqDqIntegration: public TDqIntegrationBase {
226226
srcDesc.SetEndpoint(TString(Value(setting)));
227227
} else if (name == SharedReading) {
228228
sharedReading = FromString<bool>(Value(setting));
229+
} else if (name == ReconnectPeriod) {
230+
srcDesc.SetReconnectPeriod(TString(Value(setting)));
229231
} else if (name == Format) {
230232
format = TString(Value(setting));
231233
} else if (name == UseSslSetting) {
@@ -348,6 +350,7 @@ class TPqDqIntegration: public TDqIntegrationBase {
348350

349351
Add(props, EndpointSetting, clusterConfiguration->Endpoint, pos, ctx);
350352
Add(props, SharedReading, ToString(clusterConfiguration->SharedReading), pos, ctx);
353+
Add(props, ReconnectPeriod, ToString(clusterConfiguration->ReconnectPeriod), pos, ctx);
351354
Add(props, Format, format, pos, ctx);
352355

353356

ydb/library/yql/providers/pq/provider/yql_pq_settings.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ void TPqConfiguration::Init(
4343
clusterSettings.UseSsl = cluster.GetUseSsl();
4444
clusterSettings.AddBearerToToken = cluster.GetAddBearerToToken();
4545
clusterSettings.SharedReading = cluster.GetSharedReading();
46+
clusterSettings.ReconnectPeriod = cluster.GetReconnectPeriod();
4647

4748
const TString authToken = typeCtx->Credentials->FindCredentialContent("cluster:default_" + clusterSettings.ClusterName, "default_pq", cluster.GetToken());
4849
clusterSettings.AuthToken = authToken;

ydb/library/yql/providers/pq/provider/yql_pq_settings.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ struct TPqClusterConfigurationSettings {
3030
TString AuthToken;
3131
bool AddBearerToToken = false;
3232
bool SharedReading = false;
33+
TString ReconnectPeriod;
3334
};
3435

3536
struct TPqConfiguration : public TPqSettings, public NCommon::TSettingDispatcher {

0 commit comments

Comments
 (0)