@@ -104,14 +104,12 @@ THolder<TProposeResponse> TSchemeShard::IgniteOperation(TProposeRequest& request
104104 }
105105
106106 TOperation::TPtr operation = new TOperation (txId);
107- Operations[txId] = operation; // record is erased at ApplyOnExecute if all parts are done at propose
108107
109108 for (const auto & transaction : record.GetTransaction ()) {
110109 auto quotaResult = operation->ConsumeQuota (transaction, context);
111110 if (quotaResult.Status != NKikimrScheme::StatusSuccess) {
112111 response.Reset (new TProposeResponse (quotaResult.Status , ui64 (txId), ui64 (selfId)));
113112 response->SetError (quotaResult.Status , quotaResult.Reason );
114- Operations.erase (txId);
115113 return std::move (response);
116114 }
117115 }
@@ -131,7 +129,6 @@ THolder<TProposeResponse> TSchemeShard::IgniteOperation(TProposeRequest& request
131129 if (splitResult.Status != NKikimrScheme::StatusSuccess) {
132130 response.Reset (new TProposeResponse (splitResult.Status , ui64 (txId), ui64 (selfId)));
133131 response->SetError (splitResult.Status , splitResult.Reason );
134- Operations.erase (txId);
135132 return std::move (response);
136133 }
137134
@@ -140,11 +137,15 @@ THolder<TProposeResponse> TSchemeShard::IgniteOperation(TProposeRequest& request
140137
141138 const TString owner = record.HasOwner () ? record.GetOwner () : BUILTIN_ACL_ROOT;
142139
140+ bool prevProposeUndoSafe = true ;
141+
142+ Operations[txId] = operation; // record is erased at ApplyOnExecute if all parts are done at propose
143+
143144 for (const auto & transaction : transactions) {
144145 auto parts = operation->ConstructParts (transaction, context);
145146
146147 if (parts.size () > 1 ) {
147- // les't allow altering impl index tables as part of consistent operation
148+ // allow altering impl index tables as part of consistent operation
148149 context.IsAllowedPrivateTables = true ;
149150 }
150151
@@ -198,25 +199,77 @@ THolder<TProposeResponse> TSchemeShard::IgniteOperation(TProposeRequest& request
198199 << " , with reason: " << response->Record .GetReason ()
199200 << " , tx message: " << SecureDebugString (record));
200201
201- context.OnComplete = {}; // recreate
202- context.DbChanges = {};
202+ AbortOperationPropose (txId, context);
203203
204- for (auto & toAbort : operation->Parts ) {
205- toAbort->AbortPropose (context);
206- }
204+ return std::move (response);
205+ }
207206
208- context. MemChanges . UnDo (context. SS );
209- context.OnComplete . ApplyOnExecute (context. SS , context. GetTxc (), context. Ctx );
210- Operations. erase (txId) ;
207+ // Check suboperations for undo safety. Log first unsafe suboperation in the schema transaction.
208+ if (prevProposeUndoSafe && ! context.IsUndoChangesSafe ()) {
209+ prevProposeUndoSafe = false ;
211210
212- return std::move (response);
211+ LOG_WARN_S (context.Ctx , NKikimrServices::FLAT_TX_SCHEMESHARD,
212+ " Operation part proposed ok, but propose itself is undo unsafe"
213+ << " , suboperation type: " << NKikimrSchemeOp::EOperationType_Name (part->GetTransaction ().GetOperationType ())
214+ << " , opId: " << part->GetOperationId ()
215+ << " , at schemeshard: " << selfId
216+ );
213217 }
214218 }
215219 }
216220
217221 return std::move (response);
218222}
219223
224+ void TSchemeShard::AbortOperationPropose (const TTxId txId, TOperationContext& context) {
225+ Y_ABORT_UNLESS (Operations.contains (txId));
226+ TOperation::TPtr operation = Operations.at (txId);
227+
228+ // Drop operation side effects, undo memory changes
229+ // (Local db changes were already applied)
230+ context.OnComplete = {};
231+ context.DbChanges = {};
232+
233+ for (auto & i : operation->Parts ) {
234+ i->AbortPropose (context);
235+ }
236+
237+ context.MemChanges .UnDo (context.SS );
238+
239+ // And remove aborted operation from existence
240+ Operations.erase (txId);
241+ }
242+
243+ void AbortOperation (TOperationContext& context, const TTxId txId, const TString& reason) {
244+ LOG_ERROR_S (context.Ctx , NKikimrServices::FLAT_TX_SCHEMESHARD, " TTxOperationPropose Execute"
245+ << " , txId: " << txId
246+ << " , operation is rejected and all changes reverted"
247+ << " , " << reason
248+ << " , at schemeshard: " << context.SS ->SelfTabletId ()
249+ );
250+
251+ context.GetTxc ().DB .RollbackChanges ();
252+ context.SS ->AbortOperationPropose (txId, context);
253+ }
254+
255+ bool IsCommitRedoSizeOverLimit (TString* reason, TOperationContext& context) {
256+ // MaxCommitRedoMB is the ICB control shared with NTabletFlatExecutor::TExecutor.
257+ // We subtract from MaxCommitRedoMB additional 1MB for anything extra
258+ // that executor/tablet may (or may not) add under the hood
259+ const ui64 limitBytes = (context.SS ->MaxCommitRedoMB - 1 ) << 20 ; // MB to bytes
260+ const ui64 commitRedoBytes = context.GetTxc ().DB .GetCommitRedoBytes ();
261+ if (commitRedoBytes >= limitBytes) {
262+ *reason = TStringBuilder ()
263+ << " local tx commit redo size generated by IgniteOperation() is more than allowed limit: "
264+ << " commit redo size " << commitRedoBytes
265+ << " , limit " << limitBytes
266+ << " , excess " << (commitRedoBytes - limitBytes)
267+ ;
268+ return true ;
269+ }
270+ return false ;
271+ }
272+
220273struct TSchemeShard ::TTxOperationPropose: public NTabletFlatExecutor::TTransactionBase<TSchemeShard> {
221274 using TBase = NTabletFlatExecutor::TTransactionBase<TSchemeShard>;
222275
@@ -236,6 +289,7 @@ struct TSchemeShard::TTxOperationPropose: public NTabletFlatExecutor::TTransacti
236289
237290 bool Execute (NTabletFlatExecutor::TTransactionContext& txc, const TActorContext& ctx) override {
238291 TTabletId selfId = Self->SelfTabletId ();
292+ auto txId = TTxId (Request->Get ()->Record .GetTxId ());
239293
240294 LOG_DEBUG_S (ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
241295 " TTxOperationPropose Execute"
@@ -246,7 +300,6 @@ struct TSchemeShard::TTxOperationPropose: public NTabletFlatExecutor::TTransacti
246300
247301 auto [userToken, tokenParseError] = ParseUserToken (Request->Get ()->Record .GetUserToken ());
248302 if (tokenParseError) {
249- auto txId = Request->Get ()->Record .GetTxId ();
250303 Response = MakeHolder<TProposeResponse>(NKikimrScheme::StatusInvalidParameter, ui64 (txId), ui64 (selfId), " Failed to parse user token" );
251304 return true ;
252305 }
@@ -258,10 +311,52 @@ struct TSchemeShard::TTxOperationPropose: public NTabletFlatExecutor::TTransacti
258311 TStorageChanges dbChanges;
259312 TOperationContext context{Self, txc, ctx, OnComplete, memChanges, dbChanges, std::move (userToken)};
260313
314+ // NOTE: Successful IgniteOperation will leave created operation in Self->Operations and accumulated changes in the context.
315+ // Unsuccessful IgniteOperation will leave no operation and context will also be clean.
261316 Response = Self->IgniteOperation (*Request->Get (), context);
262317
263- OnComplete.ApplyOnExecute (Self, txc, ctx);
318+ // NOTE: Successfully created operation also must be checked for the size of this local tx.
319+ //
320+ // Limitation on a commit redo size of local transactions is imposed at the tablet executor level
321+ // (See ydb/core/tablet_flat/flat_executor.cpp, NTabletFlatExecutor::TExecutor::ExecuteTransaction()).
322+ // And a tablet violating that limit is considered broken and will be stopped unconditionally and immediately.
323+ //
324+ // So even if operation was ignited successfully, it's local tx size still must be checked
325+ // as a precaution measure to avoid infinite loop of schemeshard restarting, attempting to propose
326+ // persisted operation again, hitting commit redo size limit and restarting again.
327+ //
328+ // On unsuccessful check, local tx should be rolled back, operation should be rejected and
329+ // all accumulated changes dropped or reverted.
330+ //
331+
332+ // Actually build commit redo (dbChanges could be empty)
264333 dbChanges.Apply (Self, txc, ctx);
334+
335+ if (Self->Operations .contains (txId)) {
336+ Y_ABORT_UNLESS (Response->IsDone () || Response->IsAccepted () || Response->IsConditionalAccepted ());
337+
338+ // Check local tx commit redo size
339+ TString reason;
340+ if (IsCommitRedoSizeOverLimit (&reason, context)) {
341+ Response = MakeHolder<TProposeResponse>(NKikimrScheme::StatusSchemeError, ui64 (txId), ui64 (selfId), reason);
342+
343+ AbortOperation (context, txId, reason);
344+
345+ if (!context.IsUndoChangesSafe ()) {
346+ LOG_ERROR_S (context.Ctx , NKikimrServices::FLAT_TX_SCHEMESHARD, " TTxOperationPropose Execute"
347+ << " , opId: " << txId
348+ << " , operation should be rejected and all changes be reverted"
349+ << " , but context.IsUndoChangesSafe is false, which means some direct writes have been done"
350+ << " , message: " << SecureDebugString (Request->Get ()->Record )
351+ << " , at schemeshard: " << context.SS ->SelfTabletId ()
352+ );
353+ }
354+ }
355+ }
356+
357+ // Apply accumulated changes (changes could be empty)
358+ OnComplete.ApplyOnExecute (Self, txc, ctx);
359+
265360 return true ;
266361 }
267362
0 commit comments