Skip to content

YQ-2883 fix retries for recover point COMPLETING #2107

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions ydb/core/fq/libs/compute/ydb/actors_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,9 @@ struct TActorFactory : public IActorFactory {
std::unique_ptr<NActors::IActor> CreateResultWriter(const NActors::TActorId& parent,
const NActors::TActorId& connector,
const NActors::TActorId& pinger,
const NKikimr::NOperationId::TOperationId& operationId) const override {
return CreateResultWriterActor(Params, parent, connector, pinger, operationId, Counters);
const NKikimr::NOperationId::TOperationId& operationId,
bool operationEntryExpected) const override {
return CreateResultWriterActor(Params, parent, connector, pinger, operationId, operationEntryExpected, Counters);
}

std::unique_ptr<NActors::IActor> CreateResourcesCleaner(const NActors::TActorId& parent,
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/fq/libs/compute/ydb/actors_factory.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ struct IActorFactory : public TThrRefBase {
virtual std::unique_ptr<NActors::IActor> CreateResultWriter(const NActors::TActorId& parent,
const NActors::TActorId& connector,
const NActors::TActorId& pinger,
const NKikimr::NOperationId::TOperationId& operationId) const = 0;
const NKikimr::NOperationId::TOperationId& operationId,
bool operationEntryExpected) const = 0;
virtual std::unique_ptr<NActors::IActor> CreateResourcesCleaner(const NActors::TActorId& parent,
const NActors::TActorId& connector,
const NYdb::TOperation::TOperationId& operationId) const = 0;
Expand Down
14 changes: 12 additions & 2 deletions ydb/core/fq/libs/compute/ydb/result_writer_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -202,13 +202,14 @@ class TResultWriterActor : public TBaseComputeActor<TResultWriterActor> {
}
};

TResultWriterActor(const TRunActorParams& params, const TActorId& parent, const TActorId& connector, const TActorId& pinger, const NKikimr::NOperationId::TOperationId& operationId, const ::NYql::NCommon::TServiceCounters& queryCounters)
TResultWriterActor(const TRunActorParams& params, const TActorId& parent, const TActorId& connector, const TActorId& pinger, const NKikimr::NOperationId::TOperationId& operationId, bool operationEntryExpected, const ::NYql::NCommon::TServiceCounters& queryCounters)
: TBaseComputeActor(queryCounters, "ResultWriter")
, Params(params)
, Parent(parent)
, Connector(connector)
, Pinger(pinger)
, OperationId(operationId)
, OperationEntryExpected(operationEntryExpected)
, Counters(GetStepCountersSubgroup())
{}

Expand Down Expand Up @@ -246,6 +247,13 @@ class TResultWriterActor : public TBaseComputeActor<TResultWriterActor> {

void Handle(const TEvYdbCompute::TEvGetOperationResponse::TPtr& ev) {
const auto& response = *ev.Get()->Get();
if (!OperationEntryExpected && response.Status == NYdb::EStatus::NOT_FOUND) {
LOG_I("Operation has been already removed");
Send(Parent, new TEvYdbCompute::TEvResultWriterResponse({}, NYdb::EStatus::SUCCESS));
CompleteAndPassAway();
return;
}

if (response.Status != NYdb::EStatus::SUCCESS) {
LOG_E("Can't get operation: " << ev->Get()->Issues.ToOneLineString());
Send(Parent, new TEvYdbCompute::TEvResultWriterResponse(ev->Get()->Issues, ev->Get()->Status));
Expand Down Expand Up @@ -314,6 +322,7 @@ class TResultWriterActor : public TBaseComputeActor<TResultWriterActor> {
TActorId Connector;
TActorId Pinger;
NKikimr::NOperationId::TOperationId OperationId;
const bool OperationEntryExpected;
TCounters Counters;
TInstant StartTime;
TString FetchToken;
Expand All @@ -325,8 +334,9 @@ std::unique_ptr<NActors::IActor> CreateResultWriterActor(const TRunActorParams&
const TActorId& connector,
const TActorId& pinger,
const NKikimr::NOperationId::TOperationId& operationId,
bool operationEntryExpected,
const ::NYql::NCommon::TServiceCounters& queryCounters) {
return std::make_unique<TResultWriterActor>(params, parent, connector, pinger, operationId, queryCounters);
return std::make_unique<TResultWriterActor>(params, parent, connector, pinger, operationId, operationEntryExpected, queryCounters);
}

}
1 change: 1 addition & 0 deletions ydb/core/fq/libs/compute/ydb/result_writer_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ std::unique_ptr<NActors::IActor> CreateResultWriterActor(const TRunActorParams&
const NActors::TActorId& connector,
const NActors::TActorId& pinger,
const NKikimr::NOperationId::TOperationId& operationId,
bool operationEntryExpected,
const ::NYql::NCommon::TServiceCounters& queryCounters);

}
4 changes: 2 additions & 2 deletions ydb/core/fq/libs/compute/ydb/ydb_run_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ class TYdbRunActor : public NActors::TActorBootstrapped<TYdbRunActor> {
Params.Status = response.ComputeStatus;
LOG_I("StatusTrackerResponse (success) " << response.Status << " ExecStatus: " << static_cast<int>(response.ExecStatus) << " Issues: " << response.Issues.ToOneLineString());
if (response.ExecStatus == NYdb::NQuery::EExecStatus::Completed) {
Register(ActorFactory->CreateResultWriter(SelfId(), Connector, Pinger, Params.OperationId).release());
Register(ActorFactory->CreateResultWriter(SelfId(), Connector, Pinger, Params.OperationId, true).release());
} else {
CreateResourcesCleaner();
}
Expand Down Expand Up @@ -192,7 +192,7 @@ class TYdbRunActor : public NActors::TActorBootstrapped<TYdbRunActor> {
break;
case FederatedQuery::QueryMeta::COMPLETING:
if (Params.OperationId.GetKind() != Ydb::TOperationId::UNUSED) {
Register(ActorFactory->CreateResultWriter(SelfId(), Connector, Pinger, Params.OperationId).release());
Register(ActorFactory->CreateResultWriter(SelfId(), Connector, Pinger, Params.OperationId, false).release());
} else {
CreateFinalizer(Params.Status);
}
Expand Down