Skip to content

Commit 4e46663

Browse files
authored
Merge 510f6a7 into 8c72ada
2 parents 8c72ada + 510f6a7 commit 4e46663

File tree

4 files changed

+45
-13
lines changed

4 files changed

+45
-13
lines changed

ydb/core/fq/libs/compute/ydb/resources_cleaner_actor.cpp

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
#include <ydb/core/fq/libs/compute/common/run_actor_params.h>
88
#include <ydb/core/fq/libs/compute/ydb/events/events.h>
99
#include <ydb/core/fq/libs/ydb/ydb.h>
10+
#include <ydb/core/util/backoff.h>
1011
#include <ydb/library/services/services.pb.h>
1112

1213
#include <ydb/public/sdk/cpp/client/ydb_query/client.h>
@@ -65,14 +66,19 @@ class TResourcesCleanerActor : public TBaseComputeActor<TResourcesCleanerActor>
6566
, Connector(connector)
6667
, OperationId(operationId)
6768
, Counters(GetStepCountersSubgroup())
69+
, BackoffTimer(20, 1000)
6870
{}
6971

7072
static constexpr char ActorName[] = "FQ_RESOURCES_CLEANER_ACTOR";
7173

74+
void SendForgetOperation(const TDuration& delay = TDuration::Zero()) {
75+
Register(new TRetryActor<TEvYdbCompute::TEvForgetOperationRequest, TEvYdbCompute::TEvForgetOperationResponse, NYdb::TOperation::TOperationId>(Counters.GetCounters(ERequestType::RT_FORGET_OPERATION), delay, SelfId(), Connector, OperationId));
76+
}
77+
7278
void Start() {
7379
LOG_I("Start resources cleaner actor. Compute state: " << FederatedQuery::QueryMeta::ComputeStatus_Name(Params.Status));
7480
Become(&TResourcesCleanerActor::StateFunc);
75-
Register(new TRetryActor<TEvYdbCompute::TEvForgetOperationRequest, TEvYdbCompute::TEvForgetOperationResponse, NYdb::TOperation::TOperationId>(Counters.GetCounters(ERequestType::RT_FORGET_OPERATION), SelfId(), Connector, OperationId));
81+
SendForgetOperation();
7682
}
7783

7884
STRICT_STFUNC(StateFunc,
@@ -81,6 +87,10 @@ class TResourcesCleanerActor : public TBaseComputeActor<TResourcesCleanerActor>
8187

8288
void Handle(const TEvYdbCompute::TEvForgetOperationResponse::TPtr& ev) {
8389
const auto& response = *ev.Get()->Get();
90+
if (response.Status == NYdb::EStatus::TIMEOUT) {
91+
SendForgetOperation(TDuration::MilliSeconds(BackoffTimer.NextBackoffMs()));
92+
return;
93+
}
8494
if (response.Status != NYdb::EStatus::SUCCESS && response.Status != NYdb::EStatus::NOT_FOUND) {
8595
LOG_E("Can't forget operation: " << ev->Get()->Issues.ToOneLineString());
8696
Send(Parent, new TEvYdbCompute::TEvResourcesCleanerResponse(ev->Get()->Issues, ev->Get()->Status));
@@ -98,6 +108,7 @@ class TResourcesCleanerActor : public TBaseComputeActor<TResourcesCleanerActor>
98108
TActorId Connector;
99109
NYdb::TOperation::TOperationId OperationId;
100110
TCounters Counters;
111+
NKikimr::TBackoffTimer BackoffTimer;
101112
};
102113

103114
std::unique_ptr<NActors::IActor> CreateResourcesCleanerActor(const TRunActorParams& params,

ydb/core/kqp/proxy_service/kqp_script_executions.cpp

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -478,6 +478,8 @@ class TScriptLeaseUpdater : public TQueryBase {
478478

479479
class TScriptLeaseUpdateActor : public TActorBootstrapped<TScriptLeaseUpdateActor> {
480480
public:
481+
using TLeaseUpdateRetryActor = TQueryRetryActor<TScriptLeaseUpdater, TEvScriptLeaseUpdateResponse, TString, TString, TDuration>;
482+
481483
TScriptLeaseUpdateActor(const TActorId& runScriptActorId, const TString& database, const TString& executionId, TDuration leaseDuration, TIntrusivePtr<TKqpCounters> counters)
482484
: RunScriptActorId(runScriptActorId)
483485
, Database(database)
@@ -488,7 +490,11 @@ class TScriptLeaseUpdateActor : public TActorBootstrapped<TScriptLeaseUpdateActo
488490
{}
489491

490492
void Bootstrap() {
491-
Register(new TQueryRetryActor<TScriptLeaseUpdater, TEvScriptLeaseUpdateResponse, TString, TString, TDuration>(SelfId(), Database, ExecutionId, LeaseDuration, LeaseDuration / 2));
493+
Register(new TLeaseUpdateRetryActor(
494+
SelfId(),
495+
TLeaseUpdateRetryActor::IRetryPolicy::GetExponentialBackoffPolicy(TLeaseUpdateRetryActor::Retryable, TDuration::MilliSeconds(10), TDuration::MilliSeconds(200), TDuration::Seconds(1), std::numeric_limits<size_t>::max(), LeaseDuration / 2),
496+
Database, ExecutionId, LeaseDuration
497+
));
492498
Become(&TScriptLeaseUpdateActor::StateFunc);
493499
}
494500

@@ -826,7 +832,6 @@ class TCheckLeaseStatusActor : public TCheckLeaseStatusActorBase {
826832

827833
class TForgetScriptExecutionOperationQueryActor : public TQueryBase {
828834
static constexpr i64 MAX_NUMBER_ROWS_IN_BATCH = 100000;
829-
static constexpr TDuration MINIMAL_DEADLINE_TIME = TDuration::Seconds(1);
830835

831836
struct TResultSetDescription {
832837
i64 MaxRowId;
@@ -837,7 +842,7 @@ class TForgetScriptExecutionOperationQueryActor : public TQueryBase {
837842
TForgetScriptExecutionOperationQueryActor(const TString& executionId, const TString& database, TInstant operationDeadline)
838843
: ExecutionId(executionId)
839844
, Database(database)
840-
, Deadline(operationDeadline - MINIMAL_DEADLINE_TIME)
845+
, Deadline(operationDeadline)
841846
{}
842847

843848
void OnRunQuery() override {
@@ -964,10 +969,14 @@ class TForgetScriptExecutionOperationQueryActor : public TQueryBase {
964969
Send(Owner, new TEvForgetScriptExecutionOperationResponse(status, std::move(issues)));
965970
}
966971

972+
static NYql::TIssues ForgetOperationTimeoutIssues() {
973+
return { NYql::TIssue("Forget script execution operation timeout") };
974+
}
975+
967976
private:
968977
bool CheckDeadline() {
969978
if (TInstant::Now() >= Deadline) {
970-
Finish(Ydb::StatusIds::TIMEOUT, "Forget script execution operation timeout");
979+
Finish(Ydb::StatusIds::TIMEOUT, ForgetOperationTimeoutIssues());
971980
return false;
972981
}
973982
return true;
@@ -982,6 +991,8 @@ class TForgetScriptExecutionOperationQueryActor : public TQueryBase {
982991

983992
class TForgetScriptExecutionOperationActor : public TActorBootstrapped<TForgetScriptExecutionOperationActor> {
984993
public:
994+
using TForgetOperationRetryActor = TQueryRetryActor<TForgetScriptExecutionOperationQueryActor, TEvForgetScriptExecutionOperationResponse, TString, TString, TInstant>;
995+
985996
explicit TForgetScriptExecutionOperationActor(TEvForgetScriptExecutionOperation::TPtr ev)
986997
: Request(std::move(ev))
987998
{}
@@ -1017,7 +1028,18 @@ class TForgetScriptExecutionOperationActor : public TActorBootstrapped<TForgetSc
10171028
}
10181029
}
10191030

1020-
Register(new TForgetScriptExecutionOperationQueryActor(ExecutionId, Request->Get()->Database, Request->Get()->Deadline));
1031+
TDuration minDelay = TDuration::MilliSeconds(10);
1032+
TDuration maxTime = Request->Get()->Deadline - TInstant::Now() - TDuration::Seconds(1);
1033+
if (maxTime <= minDelay) {
1034+
Reply(Ydb::StatusIds::TIMEOUT, TForgetScriptExecutionOperationQueryActor::ForgetOperationTimeoutIssues());
1035+
return;
1036+
}
1037+
1038+
Register(new TForgetOperationRetryActor(
1039+
SelfId(),
1040+
TForgetOperationRetryActor::IRetryPolicy::GetExponentialBackoffPolicy(TForgetOperationRetryActor::Retryable, minDelay, TDuration::MilliSeconds(200), TDuration::Seconds(1), std::numeric_limits<size_t>::max(), maxTime),
1041+
ExecutionId, Request->Get()->Database, TInstant::Now() + maxTime
1042+
));
10211043
}
10221044

10231045
void Handle(TEvForgetScriptExecutionOperationResponse::TPtr& ev) {

ydb/core/kqp/ut/service/kqp_qs_scripts_ut.cpp

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -394,14 +394,13 @@ Y_UNIT_TEST_SUITE(KqpQueryServiceScripts) {
394394
i32 successCount = 0;
395395
for (auto& f : forgetFutures) {
396396
auto forgetStatus = f.ExtractValueSync();
397-
UNIT_ASSERT_C(forgetStatus.GetStatus() == NYdb::EStatus::SUCCESS || forgetStatus.GetStatus() == NYdb::EStatus::NOT_FOUND ||
398-
forgetStatus.GetStatus() == NYdb::EStatus::ABORTED, forgetStatus.GetIssues().ToString());
397+
UNIT_ASSERT_C(forgetStatus.GetStatus() == NYdb::EStatus::SUCCESS || forgetStatus.GetStatus() == NYdb::EStatus::NOT_FOUND, forgetStatus.GetIssues().ToString());
399398
if (forgetStatus.GetStatus() == NYdb::EStatus::SUCCESS) {
400399
++successCount;
401400
}
402401
}
403402

404-
UNIT_ASSERT(successCount == 1);
403+
UNIT_ASSERT(successCount >= 1);
405404

406405
auto op = opClient.Get<NYdb::NQuery::TScriptExecutionOperation>(scriptExecutionOperation.Id()).ExtractValueSync();
407406
auto forgetStatus = opClient.Forget(scriptExecutionOperation.Id()).ExtractValueSync();

ydb/library/query_actor/query_actor.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -175,12 +175,11 @@ class TQueryRetryActor : public NActors::TActorBootstrapped<TQueryRetryActor<TQu
175175
using TBase = NActors::TActorBootstrapped<TQueryRetryActor<TQueryActor, TResponse, TArgs...>>;
176176
using IRetryPolicy = IRetryPolicy<Ydb::StatusIds::StatusCode>;
177177

178-
explicit TQueryRetryActor(const NActors::TActorId& replyActorId, const TArgs&... args, TDuration maxRetryTime = TDuration::Seconds(1))
178+
explicit TQueryRetryActor(const NActors::TActorId& replyActorId, const TArgs&... args)
179179
: ReplyActorId(replyActorId)
180180
, RetryPolicy(IRetryPolicy::GetExponentialBackoffPolicy(
181-
Retryable, TDuration::MilliSeconds(10),
182-
TDuration::MilliSeconds(200), TDuration::Seconds(1),
183-
std::numeric_limits<size_t>::max(), maxRetryTime
181+
Retryable, TDuration::MilliSeconds(10),
182+
TDuration::MilliSeconds(200), TDuration::Seconds(30), 5
184183
))
185184
, CreateQueryActor([=]() {
186185
return new TQueryActor(args...);
@@ -193,6 +192,7 @@ class TQueryRetryActor : public NActors::TActorBootstrapped<TQueryRetryActor<TQu
193192
, CreateQueryActor([=]() {
194193
return new TQueryActor(args...);
195194
})
195+
, RetryState(RetryPolicy->CreateRetryState())
196196
{}
197197

198198
void StartQueryActor() const {

0 commit comments

Comments
 (0)