Skip to content

Commit f0ce60b

Browse files
authored
Merge b50b536 into f49e354
2 parents f49e354 + b50b536 commit f0ce60b

25 files changed

+121
-123
lines changed

ydb/core/kqp/host/kqp_host.cpp

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1509,12 +1509,11 @@ class TKqpHost : public IKqpHost {
15091509
state->CredentialsFactory = FederatedQuerySetup->CredentialsFactory;
15101510
state->Configuration->WriteThroughDqIntegration = true;
15111511
state->Configuration->AllowAtomicUploadCommit = queryType == EKikimrQueryType::Script;
1512-
state->MaxTasksPerStage = SessionCtx->ConfigPtr()->MaxTasksPerStage.Get();
1513-
15141512
state->Configuration->Init(FederatedQuerySetup->S3GatewayConfig, TypesCtx);
1513+
state->Gateway = FederatedQuerySetup->HttpGateway;
15151514

1516-
auto dataSource = NYql::CreateS3DataSource(state, FederatedQuerySetup->HttpGateway);
1517-
auto dataSink = NYql::CreateS3DataSink(state, FederatedQuerySetup->HttpGateway);
1515+
auto dataSource = NYql::CreateS3DataSource(state);
1516+
auto dataSink = NYql::CreateS3DataSink(state);
15181517

15191518
TypesCtx->AddDataSource(NYql::S3ProviderName, std::move(dataSource));
15201519
TypesCtx->AddDataSink(NYql::S3ProviderName, std::move(dataSink));

ydb/core/kqp/query_compiler/kqp_query_compiler.cpp

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -946,11 +946,6 @@ class TKqpQueryCompiler : public IKqpQueryCompiler {
946946
NYql::IDqIntegration* dqIntegration = provider->second->GetDqIntegration();
947947
YQL_ENSURE(dqIntegration, "Unsupported dq source for provider: \"" << dataSourceCategory << "\"");
948948
auto& externalSource = *protoSource->MutableExternalSource();
949-
google::protobuf::Any& settings = *externalSource.MutableSettings();
950-
TString& sourceType = *externalSource.MutableType();
951-
dqIntegration->FillSourceSettings(source.Ref(), settings, sourceType);
952-
YQL_ENSURE(!settings.type_url().empty(), "Data source provider \"" << dataSourceCategory << "\" did't fill dq source settings for its dq source node");
953-
YQL_ENSURE(sourceType, "Data source provider \"" << dataSourceCategory << "\" did't fill dq source settings type for its dq source node");
954949

955950
// Partitioning
956951
TVector<TString> partitionParams;
@@ -975,6 +970,12 @@ class TKqpQueryCompiler : public IKqpQueryCompiler {
975970
externalSource.SetAuthInfo(CreateStructuredTokenParser(token).ToBuilder().RemoveSecrets().ToJson());
976971
CreateStructuredTokenParser(token).ListReferences(SecretNames);
977972
}
973+
974+
google::protobuf::Any& settings = *externalSource.MutableSettings();
975+
TString& sourceType = *externalSource.MutableType();
976+
dqIntegration->FillSourceSettings(source.Ref(), settings, sourceType, maxTasksPerStage);
977+
YQL_ENSURE(!settings.type_url().empty(), "Data source provider \"" << dataSourceCategory << "\" did't fill dq source settings for its dq source node");
978+
YQL_ENSURE(sourceType, "Data source provider \"" << dataSourceCategory << "\" did't fill dq source settings type for its dq source node");
978979
}
979980
}
980981

ydb/library/yql/dq/actors/compute/retry_queue.cpp

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,16 @@ void TRetryEventsQueue::HandleNodeConnected(ui32 nodeId) {
4747
}
4848
}
4949

50+
bool TRetryEventsQueue::HandleUndelivered(NActors::TEvents::TEvUndelivered::TPtr& ev) {
51+
if (ev->Sender == RecipientId && ev->Get()->Reason == NActors::TEvents::TEvUndelivered::Disconnected) {
52+
Connected = false;
53+
ScheduleRetry();
54+
return true;
55+
}
56+
57+
return false;
58+
}
59+
5060
void TRetryEventsQueue::Retry() {
5161
RetryScheduled = false;
5262
if (!Connected) {

ydb/library/yql/dq/actors/compute/retry_queue.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,7 @@ class TRetryEventsQueue {
129129
void OnNewRecipientId(const NActors::TActorId& recipientId, bool unsubscribe = true);
130130
void HandleNodeConnected(ui32 nodeId);
131131
void HandleNodeDisconnected(ui32 nodeId);
132+
bool HandleUndelivered(NActors::TEvents::TEvUndelivered::TPtr& ev);
132133
void Retry();
133134
void Unsubscribe();
134135

@@ -165,7 +166,7 @@ class TRetryEventsQueue {
165166
THolder<T> ev = MakeHolder<T>();
166167
ev->Record = Event->Record;
167168
ev->Record.MutableTransportMeta()->SetConfirmedSeqNo(confirmedSeqNo);
168-
return MakeHolder<NActors::IEventHandle>(Recipient, Sender, ev.Release(), 0, Cookie);
169+
return MakeHolder<NActors::IEventHandle>(Recipient, Sender, ev.Release(), NActors::IEventHandle::FlagTrackDelivery, Cookie);
169170
}
170171

171172
private:

ydb/library/yql/dq/integration/yql_dq_integration.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ class IDqIntegration {
6262
virtual bool CanBlockRead(const NNodes::TExprBase& node, TExprContext& ctx, TTypeAnnotationContext& typesCtx) = 0;
6363
virtual void RegisterMkqlCompiler(NCommon::TMkqlCallableCompilerBase& compiler) = 0;
6464
virtual bool CanFallback() = 0;
65-
virtual void FillSourceSettings(const TExprNode& node, ::google::protobuf::Any& settings, TString& sourceType) = 0;
65+
virtual void FillSourceSettings(const TExprNode& node, ::google::protobuf::Any& settings, TString& sourceType, size_t maxPartitions) = 0;
6666
virtual void FillSinkSettings(const TExprNode& node, ::google::protobuf::Any& settings, TString& sinkType) = 0;
6767
virtual void FillTransformSettings(const TExprNode& node, ::google::protobuf::Any& settings) = 0;
6868
virtual void Annotate(const TExprNode& node, THashMap<TString, TString>& params) = 0;

ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_dq_integration.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ class TClickHouseDqIntegration: public TDqIntegrationBase {
7575
return 0ULL;
7676
}
7777

78-
void FillSourceSettings(const TExprNode& node, ::google::protobuf::Any& protoSettings, TString& sourceType) override {
78+
void FillSourceSettings(const TExprNode& node, ::google::protobuf::Any& protoSettings, TString& sourceType, size_t) override {
7979
const TDqSource source(&node);
8080
if (const auto maySettings = source.Settings().Maybe<TClSourceSettings>()) {
8181
const auto settings = maySettings.Cast();

ydb/library/yql/providers/common/dq/yql_dq_integration_impl.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ bool TDqIntegrationBase::CanFallback() {
5151
return false;
5252
}
5353

54-
void TDqIntegrationBase::FillSourceSettings(const TExprNode&, ::google::protobuf::Any&, TString&) {
54+
void TDqIntegrationBase::FillSourceSettings(const TExprNode&, ::google::protobuf::Any&, TString&, size_t) {
5555
}
5656

5757
void TDqIntegrationBase::FillSinkSettings(const TExprNode&, ::google::protobuf::Any&, TString&) {

ydb/library/yql/providers/common/dq/yql_dq_integration_impl.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ class TDqIntegrationBase: public IDqIntegration {
1818
bool CanBlockRead(const NNodes::TExprBase& node, TExprContext& ctx, TTypeAnnotationContext& typesCtx) override;
1919
TExprNode::TPtr WrapWrite(const TExprNode::TPtr& write, TExprContext& ctx) override;
2020
bool CanFallback() override;
21-
void FillSourceSettings(const TExprNode& node, ::google::protobuf::Any& settings, TString& sourceType) override;
21+
void FillSourceSettings(const TExprNode& node, ::google::protobuf::Any& settings, TString& sourceType, size_t) override;
2222
void FillSinkSettings(const TExprNode& node, ::google::protobuf::Any& settings, TString& sinkType) override;
2323
void FillTransformSettings(const TExprNode& node, ::google::protobuf::Any& settings) override;
2424
void Annotate(const TExprNode& node, THashMap<TString, TString>& params) override;

ydb/library/yql/providers/dq/planner/execution_planner.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -558,7 +558,7 @@ namespace NYql::NDqs {
558558
TString sourceType;
559559
if (dqSource) {
560560
sourceSettings.ConstructInPlace();
561-
dqIntegration->FillSourceSettings(*read, *sourceSettings, sourceType);
561+
dqIntegration->FillSourceSettings(*read, *sourceSettings, sourceType, maxPartitions);
562562
YQL_ENSURE(!sourceSettings->type_url().empty(), "Data source provider \"" << dataSourceName << "\" did't fill dq source settings for its dq source node");
563563
YQL_ENSURE(sourceType, "Data source provider \"" << dataSourceName << "\" did't fill dq source settings type for its dq source node");
564564
}

ydb/library/yql/providers/generic/provider/ut/pushdown/pushdown_ut.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ class TBuildDqSourceSettingsTransformer: public TOptimizeTransformerBase {
180180
.Ptr();
181181
::google::protobuf::Any settings;
182182
TString sourceType;
183-
dqIntegration->FillSourceSettings(*dqSourceNode, settings, sourceType);
183+
dqIntegration->FillSourceSettings(*dqSourceNode, settings, sourceType, 1);
184184
UNIT_ASSERT_STRINGS_EQUAL(sourceType, "PostgreSqlGeneric");
185185
UNIT_ASSERT(settings.Is<Generic::TSource>());
186186
settings.UnpackTo(DqSourceSettings_);

ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ namespace NYql {
9393
}
9494

9595
void FillSourceSettings(const TExprNode& node, ::google::protobuf::Any& protoSettings,
96-
TString& sourceType) override {
96+
TString& sourceType, size_t) override {
9797
const TDqSource source(&node);
9898
if (const auto maybeSettings = source.Settings().Maybe<TGenSourceSettings>()) {
9999
const auto settings = maybeSettings.Cast();

ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ class TPqDqIntegration: public TDqIntegrationBase {
179179
}
180180
}
181181

182-
void FillSourceSettings(const TExprNode& node, ::google::protobuf::Any& protoSettings, TString& sourceType) override {
182+
void FillSourceSettings(const TExprNode& node, ::google::protobuf::Any& protoSettings, TString& sourceType, size_t) override {
183183
if (auto maybeDqSource = TMaybeNode<TDqSource>(&node)) {
184184
auto settings = maybeDqSource.Cast().Settings();
185185
if (auto maybeTopicSource = TMaybeNode<TDqPqTopicSource>(settings.Raw())) {

ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp

Lines changed: 63 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -163,19 +163,15 @@ struct TEvS3FileQueue {
163163
struct TEvUpdateConsumersCount :
164164
public TEventPB<TEvUpdateConsumersCount, NS3::FileQueue::TEvUpdateConsumersCount, EvUpdateConsumersCount> {
165165

166-
TEvUpdateConsumersCount() {
167-
Record.SetConsumersCountDelta(0);
168-
}
169-
170-
explicit TEvUpdateConsumersCount(ui64 consumersCountDelta) {
166+
explicit TEvUpdateConsumersCount(ui64 consumersCountDelta = 0) {
171167
Record.SetConsumersCountDelta(consumersCountDelta);
172168
}
173169
};
174170

175171
struct TEvAck :
176172
public TEventPB<TEvAck, NS3::FileQueue::TEvAck, EvAck> {
177173

178-
TEvAck() {}
174+
TEvAck() = default;
179175

180176
explicit TEvAck(const TMessageTransportMeta& transportMeta) {
181177
Record.MutableTransportMeta()->CopyFrom(transportMeta);
@@ -205,7 +201,7 @@ struct TEvS3FileQueue {
205201
struct TEvObjectPathReadError :
206202
public NActors::TEventPB<TEvObjectPathReadError, NS3::FileQueue::TEvObjectPathReadError, EvObjectPathReadError> {
207203

208-
TEvObjectPathReadError() {}
204+
TEvObjectPathReadError() = default;
209205

210206
TEvObjectPathReadError(TIssues issues, const TMessageTransportMeta& transportMeta) {
211207
IssuesToMessage(issues, Record.MutableIssues());
@@ -441,7 +437,7 @@ class TS3FileQueueActor : public TActorBootstrapped<TS3FileQueueActor> {
441437
hFunc(TEvS3FileQueue::TEvGetNextBatch, HandleGetNextBatch);
442438
hFunc(TEvPrivatePrivate::TEvNextListingChunkReceived, HandleNextListingChunkReceived);
443439
cFunc(TEvPrivatePrivate::EvRoundRobinStageTimeout, HandleRoundRobinStageTimeout);
444-
cFunc(TEvents::TSystem::Poison, PassAway);
440+
cFunc(TEvents::TSystem::Poison, HandlePoison);
445441
default:
446442
MaybeIssues = TIssues{TIssue{TStringBuilder() << "An event with unknown type has been received: '" << etype << "'"}};
447443
TransitToErrorState();
@@ -541,7 +537,7 @@ class TS3FileQueueActor : public TActorBootstrapped<TS3FileQueueActor> {
541537
hFunc(TEvS3FileQueue::TEvUpdateConsumersCount, HandleUpdateConsumersCount);
542538
hFunc(TEvS3FileQueue::TEvGetNextBatch, HandleGetNextBatchForEmptyState);
543539
cFunc(TEvPrivatePrivate::EvRoundRobinStageTimeout, HandleRoundRobinStageTimeout);
544-
cFunc(TEvents::TSystem::Poison, PassAway);
540+
cFunc(TEvents::TSystem::Poison, HandlePoison);
545541
default:
546542
MaybeIssues = TIssues{TIssue{TStringBuilder() << "An event with unknown type has been received: '" << etype << "'"}};
547543
TransitToErrorState();
@@ -566,7 +562,7 @@ class TS3FileQueueActor : public TActorBootstrapped<TS3FileQueueActor> {
566562
hFunc(TEvS3FileQueue::TEvUpdateConsumersCount, HandleUpdateConsumersCount);
567563
hFunc(TEvS3FileQueue::TEvGetNextBatch, HandleGetNextBatchForErrorState);
568564
cFunc(TEvPrivatePrivate::EvRoundRobinStageTimeout, HandleRoundRobinStageTimeout);
569-
cFunc(TEvents::TSystem::Poison, PassAway);
565+
cFunc(TEvents::TSystem::Poison, HandlePoison);
570566
default:
571567
MaybeIssues = TIssues{TIssue{TStringBuilder() << "An event with unknown type has been received: '" << etype << "'"}};
572568
break;
@@ -603,12 +599,14 @@ class TS3FileQueueActor : public TActorBootstrapped<TS3FileQueueActor> {
603599
}
604600
}
605601

602+
void HandlePoison() {
603+
AnswerPendingRequests();
604+
PassAway();
605+
}
606+
606607
void PassAway() override {
608+
PrintBackTrace();
607609
LOG_D("TS3FileQueueActor", "PassAway");
608-
609-
AnswerPendingRequests();
610-
Objects.clear();
611-
Directories.clear();
612610
TBase::PassAway();
613611
}
614612

@@ -1004,7 +1002,7 @@ class TS3ReadActor : public TActorBootstrapped<TS3ReadActor>, public IDqComputeA
10041002
return FilesRemained && (*FilesRemained == 0);
10051003
}
10061004

1007-
STRICT_STFUNC(StateFunc,
1005+
STRICT_STFUNC_EXC(StateFunc,
10081006
hFunc(TEvPrivate::TEvReadResult, Handle);
10091007
hFunc(TEvPrivate::TEvReadError, Handle);
10101008
hFunc(TEvS3FileQueue::TEvObjectPathBatch, HandleObjectPathBatch);
@@ -1013,11 +1011,19 @@ class TS3ReadActor : public TActorBootstrapped<TS3ReadActor>, public IDqComputeA
10131011
hFunc(NYql::NDq::TEvRetryQueuePrivate::TEvRetry, Handle);
10141012
hFunc(NActors::TEvInterconnect::TEvNodeDisconnected, Handle);
10151013
hFunc(NActors::TEvInterconnect::TEvNodeConnected, Handle);
1014+
hFunc(NActors::TEvents::TEvUndelivered, Handle);
1015+
, catch (const std::exception& e) {
1016+
TIssues issues{TIssue{TStringBuilder() << "An unknown exception has occurred: '" << e.what() << "'"}};
1017+
Send(ComputeActorId, new TEvAsyncInputError(InputIndex, issues, NYql::NDqProto::StatusIds::INTERNAL_ERROR));
1018+
}
10161019
)
10171020

10181021
void HandleObjectPathBatch(TEvS3FileQueue::TEvObjectPathBatch::TPtr& objectPathBatch) {
1022+
if (!FileQueueEvents.OnEventReceived(objectPathBatch)) {
1023+
return;
1024+
}
1025+
10191026
Y_ENSURE(IsWaitingFileQueueResponse);
1020-
FileQueueEvents.OnEventReceived(objectPathBatch);
10211027
IsWaitingFileQueueResponse = false;
10221028
auto& objectBatch = objectPathBatch->Get()->Record;
10231029
ListedFiles += objectBatch.GetObjectPaths().size();
@@ -1038,7 +1044,10 @@ class TS3ReadActor : public TActorBootstrapped<TS3ReadActor>, public IDqComputeA
10381044
}
10391045
}
10401046
void HandleObjectPathReadError(TEvS3FileQueue::TEvObjectPathReadError::TPtr& result) {
1041-
FileQueueEvents.OnEventReceived(result);
1047+
if (!FileQueueEvents.OnEventReceived(result)) {
1048+
return;
1049+
}
1050+
10421051
IsFileQueueEmpty = true;
10431052
if (!IsConfirmedFileQueueFinish) {
10441053
SendPathBatchRequest();
@@ -1177,10 +1186,18 @@ class TS3ReadActor : public TActorBootstrapped<TS3ReadActor>, public IDqComputeA
11771186
}
11781187

11791188
void Handle(NActors::TEvInterconnect::TEvNodeConnected::TPtr& ev) {
1180-
LOG_T("TS3ReadActor","Handle connected FileQueue " << ev->Get()->NodeId);
1189+
LOG_T("TS3ReadActor", "Handle connected FileQueue " << ev->Get()->NodeId);
11811190
FileQueueEvents.HandleNodeConnected(ev->Get()->NodeId);
11821191
}
11831192

1193+
void Handle(NActors::TEvents::TEvUndelivered::TPtr& ev) {
1194+
LOG_T("TS3ReadActor", "Handle undelivered FileQueue ");
1195+
if (!FileQueueEvents.HandleUndelivered(ev)) {
1196+
TIssues issues{TIssue{TStringBuilder() << "FileQueue was lost"}};
1197+
Send(ComputeActorId, new TEvAsyncInputError(InputIndex, issues, NYql::NDqProto::StatusIds::INTERNAL_ERROR));
1198+
}
1199+
}
1200+
11841201
// IActor & IDqComputeActorAsyncInput
11851202
void PassAway() override { // Is called from Compute Actor
11861203
LOG_D("TS3ReadActor", "PassAway");
@@ -2117,13 +2134,17 @@ class TS3ReadCoroImpl : public TActorCoroImpl {
21172134
LOG_CORO_D("RunCoroBlockArrowParserOverFile - FINISHED");
21182135
}
21192136

2120-
STRICT_STFUNC(StateFunc,
2137+
STRICT_STFUNC_EXC(StateFunc,
21212138
hFunc(TEvPrivate::TEvReadStarted, Handle);
21222139
hFunc(TEvPrivate::TEvDataPart, Handle);
21232140
hFunc(TEvPrivate::TEvReadFinished, Handle);
21242141
hFunc(TEvPrivate::TEvContinue, Handle);
21252142
hFunc(TEvPrivate::TEvReadResult2, Handle);
21262143
hFunc(NActors::TEvents::TEvPoison, Handle);
2144+
, catch (const std::exception& e) {
2145+
TIssues issues{TIssue{TStringBuilder() << "An unknown exception has occurred: '" << e.what() << "'"}};
2146+
Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvAsyncInputError(InputIndex, issues, NYql::NDqProto::StatusIds::INTERNAL_ERROR));
2147+
}
21272148
)
21282149

21292150
void ProcessOneEvent() {
@@ -2836,7 +2857,7 @@ class TS3StreamReadActor : public TActorBootstrapped<TS3StreamReadActor>, public
28362857
TActorBootstrapped<TS3StreamReadActor>::PassAway();
28372858
}
28382859

2839-
STRICT_STFUNC(StateFunc,
2860+
STRICT_STFUNC_EXC(StateFunc,
28402861
hFunc(TEvPrivate::TEvRetryEventFunc, HandleRetry);
28412862
hFunc(TEvPrivate::TEvNextBlock, HandleNextBlock);
28422863
hFunc(TEvPrivate::TEvNextRecordBatch, HandleNextRecordBatch);
@@ -2847,11 +2868,19 @@ class TS3StreamReadActor : public TActorBootstrapped<TS3StreamReadActor>, public
28472868
hFunc(NYql::NDq::TEvRetryQueuePrivate::TEvRetry, Handle);
28482869
hFunc(NActors::TEvInterconnect::TEvNodeDisconnected, Handle);
28492870
hFunc(NActors::TEvInterconnect::TEvNodeConnected, Handle);
2871+
hFunc(NActors::TEvents::TEvUndelivered, Handle);
2872+
, catch (const std::exception& e) {
2873+
TIssues issues{TIssue{TStringBuilder() << "An unknown exception has occurred: '" << e.what() << "'"}};
2874+
Send(ComputeActorId, new TEvAsyncInputError(InputIndex, issues, NYql::NDqProto::StatusIds::INTERNAL_ERROR));
2875+
}
28502876
)
28512877

28522878
void HandleObjectPathBatch(TEvS3FileQueue::TEvObjectPathBatch::TPtr& objectPathBatch) {
2879+
if (!FileQueueEvents.OnEventReceived(objectPathBatch)) {
2880+
return;
2881+
}
2882+
28532883
Y_ENSURE(IsWaitingFileQueueResponse);
2854-
FileQueueEvents.OnEventReceived(objectPathBatch);
28552884
IsWaitingFileQueueResponse = false;
28562885
auto& objectBatch = objectPathBatch->Get()->Record;
28572886
ListedFiles += objectBatch.GetObjectPaths().size();
@@ -2877,7 +2906,10 @@ class TS3StreamReadActor : public TActorBootstrapped<TS3StreamReadActor>, public
28772906
}
28782907

28792908
void HandleObjectPathReadError(TEvS3FileQueue::TEvObjectPathReadError::TPtr& result) {
2880-
FileQueueEvents.OnEventReceived(result);
2909+
if (!FileQueueEvents.OnEventReceived(result)) {
2910+
return;
2911+
}
2912+
28812913
IsFileQueueEmpty = true;
28822914
if (!IsConfirmedFileQueueFinish) {
28832915
LOG_T("TS3StreamReadActor", "Sending finish confirmation to FileQueue");
@@ -2981,8 +3013,7 @@ class TS3StreamReadActor : public TActorBootstrapped<TS3StreamReadActor>, public
29813013
FileQueueEvents.OnEventReceived(ev);
29823014
}
29833015

2984-
void Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvRetry::TPtr& ev) {
2985-
Y_UNUSED(ev);
3016+
void Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvRetry::TPtr&) {
29863017
FileQueueEvents.Retry();
29873018
}
29883019

@@ -2992,10 +3023,17 @@ class TS3StreamReadActor : public TActorBootstrapped<TS3StreamReadActor>, public
29923023
}
29933024

29943025
void Handle(NActors::TEvInterconnect::TEvNodeConnected::TPtr& ev) {
2995-
LOG_T("TS3StreamReadActor","Handle connected FileQueue " << ev->Get()->NodeId);
3026+
LOG_T("TS3StreamReadActor", "Handle connected FileQueue " << ev->Get()->NodeId);
29963027
FileQueueEvents.HandleNodeConnected(ev->Get()->NodeId);
29973028
}
29983029

3030+
void Handle(NActors::TEvents::TEvUndelivered::TPtr& ev) {
3031+
LOG_T("TS3StreamReadActor", "Handle undelivered FileQueue ");
3032+
if (!FileQueueEvents.HandleUndelivered(ev)) {
3033+
TIssues issues{TIssue{TStringBuilder() << "FileQueue was lost"}};
3034+
Send(ComputeActorId, new TEvAsyncInputError(InputIndex, issues, NYql::NDqProto::StatusIds::INTERNAL_ERROR));
3035+
}
3036+
}
29993037
bool LastFileWasProcessed() const {
30003038
return Blocks.empty() && (ListedFiles == CompletedFiles) && IsFileQueueEmpty;
30013039
}

0 commit comments

Comments
 (0)