1010
1111namespace NKikimr ::NMetadata::NModifications {
1212
13+ class TModificationStage {
14+ private:
15+ YDB_ACCESSOR_DEF (Ydb::Table::ExecuteDataQueryRequest, Request);
16+
17+ public:
18+ using TPtr = std::shared_ptr<TModificationStage>;
19+
20+ virtual TConclusionStatus HandleResult (const Ydb::Table::ExecuteQueryResult& /* result*/ ) const {
21+ return TConclusionStatus::Success ();
22+ }
23+
24+ virtual TConclusionStatus HandleError (const NRequest::TEvRequestFailed& ev) const {
25+ return TConclusionStatus::Fail (ev.GetErrorMessage ());
26+ }
27+
28+ void SetCommit () {
29+ Request.mutable_tx_control ()->set_commit_tx (true );
30+ }
31+
32+ TModificationStage (Ydb::Table::ExecuteDataQueryRequest request)
33+ : Request(std::move(request)) {
34+ }
35+
36+ virtual ~TModificationStage () = default ;
37+ };
38+
1339template <class TObject >
1440class TModifyObjectsActor : public NActors ::TActorBootstrapped<TModifyObjectsActor<TObject>> {
1541private:
@@ -19,17 +45,44 @@ class TModifyObjectsActor: public NActors::TActorBootstrapped<TModifyObjectsActo
1945 const TString TransactionId;
2046 const NACLib::TUserToken SystemUserToken;
2147 const std::optional<NACLib::TUserToken> UserToken;
48+
49+ void FillRequestSettings (Ydb::Table::ExecuteDataQueryRequest& request) {
50+ request.set_session_id (SessionId);
51+ request.mutable_tx_control ()->set_tx_id (TransactionId);
52+ }
53+
54+ void AdvanceStage () {
55+ AFL_VERIFY (!Stages.empty ());
56+ Stages.pop_front ();
57+ if (Stages.size ()) {
58+ TBase::Register (
59+ new NRequest::TYDBCallbackRequest<NRequest::TDialogYQLRequest>(Stages.front ()->GetRequest (), SystemUserToken, TBase::SelfId ()));
60+ } else {
61+ Controller->OnModificationFinished ();
62+ TBase::PassAway ();
63+ }
64+ }
65+
2266protected:
23- std::deque<NRequest::TDialogYQLRequest::TRequest> Requests ;
67+ std::deque<TModificationStage::TPtr> Stages ;
2468 NInternal::TTableRecords Objects;
2569 virtual Ydb::Table::ExecuteDataQueryRequest BuildModifyQuery () const = 0;
2670 virtual TString GetModifyType () const = 0;
71+ virtual TModificationStage::TPtr DoBuildRequestDirect (Ydb::Table::ExecuteDataQueryRequest query) const {
72+ return std::make_shared<TModificationStage>(std::move (query));
73+ }
74+
75+ void BuildPreconditionStages (const std::vector<TModificationStage::TPtr>& stages) {
76+ for (auto && stage : stages) {
77+ FillRequestSettings (stage->MutableRequest ());
78+ Stages.emplace_back (std::move (stage));
79+ }
80+ }
2781
2882 void BuildRequestDirect () {
2983 Ydb::Table::ExecuteDataQueryRequest request = BuildModifyQuery ();
30- request.set_session_id (SessionId);
31- request.mutable_tx_control ()->set_tx_id (TransactionId);
32- Requests.emplace_back (std::move (request));
84+ FillRequestSettings (request);
85+ Stages.emplace_back (DoBuildRequestDirect (request));
3386 }
3487
3588 void BuildRequestHistory () {
@@ -42,31 +95,39 @@ class TModifyObjectsActor: public NActors::TActorBootstrapped<TModifyObjectsActo
4295 Objects.AddColumn (NInternal::TYDBColumn::UInt64 (" historyInstant" ), NInternal::TYDBValue::UInt64 (TActivationContext::Now ().MicroSeconds ()));
4396 Objects.AddColumn (NInternal::TYDBColumn::Utf8 (" historyAction" ), NInternal::TYDBValue::Utf8 (GetModifyType ()));
4497 Ydb::Table::ExecuteDataQueryRequest request = Objects.BuildInsertQuery (TObject::GetBehaviour ()->GetStorageHistoryTablePath ());
45- request.set_session_id (SessionId);
46- request.mutable_tx_control ()->set_tx_id (TransactionId);
47- Requests.emplace_back (std::move (request));
98+ FillRequestSettings (request);
99+ Stages.emplace_back (std::make_shared<TModificationStage>(std::move (request)));
48100 }
49101
50- void Handle (NRequest::TEvRequestResult<NRequest::TDialogYQLRequest>::TPtr& /* ev*/ ) {
51- if (Requests.size ()) {
52- TBase::Register (new NRequest::TYDBCallbackRequest<NRequest::TDialogYQLRequest>(
53- Requests.front (), SystemUserToken, TBase::SelfId ()));
54- Requests.pop_front ();
55- } else {
56- Controller->OnModificationFinished ();
102+ void Handle (NRequest::TEvRequestResult<NRequest::TDialogYQLRequest>::TPtr& ev) {
103+ const auto & operation = ev->Get ()->GetResult ().operation ();
104+ AFL_VERIFY (operation.ready ());
105+
106+ Ydb::Table::ExecuteQueryResult result;
107+ operation.result ().UnpackTo (&result);
108+
109+ if (auto status = Stages.front ()->HandleResult (result); status.IsFail ()) {
110+ Controller->OnModificationProblem (status.GetErrorMessage ());
57111 TBase::PassAway ();
112+ return ;
58113 }
114+
115+ AdvanceStage ();
59116 }
60117
61- virtual void Handle (NRequest::TEvRequestFailed::TPtr& ev) {
118+ void Handle (NRequest::TEvRequestFailed::TPtr& ev) {
62119 auto g = TBase::PassAwayGuard ();
63- Controller->OnModificationProblem (" cannot execute yql request for " + GetModifyType () +
64- " objects: " + ev->Get ()->GetErrorMessage ());
120+ if (auto status = Stages.front ()->HandleError (*ev->Get ()); status.IsFail ()) {
121+ Controller->OnModificationProblem (status.GetErrorMessage ());
122+ } else {
123+ Controller->OnModificationFinished ();
124+ }
65125 }
66126
67127public:
68- TModifyObjectsActor (NInternal::TTableRecords&& objects, const NACLib::TUserToken& systemUserToken, IModificationObjectsController::TPtr controller, const TString& sessionId,
69- const TString& transactionId, const std::optional<NACLib::TUserToken>& userToken)
128+ TModifyObjectsActor (NInternal::TTableRecords&& objects, const NACLib::TUserToken& systemUserToken,
129+ IModificationObjectsController::TPtr controller, const TString& sessionId, const TString& transactionId,
130+ const std::optional<NACLib::TUserToken>& userToken, const std::vector<TModificationStage::TPtr>& preconditions)
70131 : Controller(controller)
71132 , SessionId(sessionId)
72133 , TransactionId(transactionId)
@@ -75,6 +136,7 @@ class TModifyObjectsActor: public NActors::TActorBootstrapped<TModifyObjectsActo
75136 , Objects(std::move(objects))
76137
77138 {
139+ BuildPreconditionStages (preconditions);
78140 Y_ABORT_UNLESS (SessionId);
79141 }
80142
@@ -91,12 +153,10 @@ class TModifyObjectsActor: public NActors::TActorBootstrapped<TModifyObjectsActo
91153 TBase::Become (&TModifyObjectsActor::StateMain);
92154 BuildRequestDirect ();
93155 BuildRequestHistory ();
94- Y_ABORT_UNLESS (Requests .size ());
95- Requests .back (). mutable_tx_control ()-> set_commit_tx ( true );
156+ Y_ABORT_UNLESS (Stages .size ());
157+ Stages .back ()-> SetCommit ( );
96158
97- TBase::Register (new NRequest::TYDBCallbackRequest<NRequest::TDialogYQLRequest>(
98- Requests.front (), SystemUserToken, TBase::SelfId ()));
99- Requests.pop_front ();
159+ TBase::Register (new NRequest::TYDBCallbackRequest<NRequest::TDialogYQLRequest>(Stages.front ()->GetRequest (), SystemUserToken, TBase::SelfId ()));
100160 }
101161};
102162
@@ -148,6 +208,23 @@ class TDeleteObjectsActor: public TModifyObjectsActor<TObject> {
148208 using TBase::TBase;
149209};
150210
211+ class TStageInsertObjects : public NModifications ::TModificationStage {
212+ private:
213+ const bool ExistingOk;
214+
215+ public:
216+ TConclusionStatus HandleError (const NRequest::TEvRequestFailed& ev) const override {
217+ if (ExistingOk && ev.GetStatus () == Ydb::StatusIds::PRECONDITION_FAILED) {
218+ return TConclusionStatus::Success ();
219+ }
220+ return TConclusionStatus::Fail (ev.GetErrorMessage ());
221+ }
222+
223+ TStageInsertObjects (Ydb::Table::ExecuteDataQueryRequest request, const bool existingOk)
224+ : TModificationStage(std::move(request)), ExistingOk(existingOk) {
225+ }
226+ };
227+
151228template <class TObject >
152229class TInsertObjectsActor : public TModifyObjectsActor <TObject> {
153230private:
@@ -161,19 +238,13 @@ class TInsertObjectsActor: public TModifyObjectsActor<TObject> {
161238 return " insert" ;
162239 }
163240
164- void Handle (NRequest::TEvRequestFailed::TPtr& ev) override {
165- if (ev->Get ()->GetStatus () == Ydb::StatusIds::PRECONDITION_FAILED && ExistingOk) {
166- NRequest::TDialogYQLRequest::TResponse resp;
167- this ->Send (this ->SelfId (), new NRequest::TEvRequestResult<NRequest::TDialogYQLRequest>(std::move (resp)));
168- this ->Requests .clear (); // Remove history request
169- return ;
170- }
171- TBase::Handle (ev);
241+ TModificationStage::TPtr DoBuildRequestDirect (Ydb::Table::ExecuteDataQueryRequest query) const override {
242+ return std::make_shared<TStageInsertObjects>(std::move (query), ExistingOk);
172243 }
173244public:
174245 TInsertObjectsActor (NInternal::TTableRecords&& objects, const NACLib::TUserToken& systemUserToken, IModificationObjectsController::TPtr controller, const TString& sessionId,
175- const TString& transactionId, const std::optional<NACLib::TUserToken>& userToken, bool existingOk)
176- : TBase(std::move(objects), systemUserToken, std::move(controller), sessionId, transactionId, userToken)
246+ const TString& transactionId, const std::optional<NACLib::TUserToken>& userToken, const std::vector<TModificationStage::TPtr>& preconditions, bool existingOk)
247+ : TBase(std::move(objects), systemUserToken, std::move(controller), sessionId, transactionId, userToken, preconditions )
177248 , ExistingOk(existingOk)
178249 {
179250 }
0 commit comments