@@ -855,6 +855,26 @@ void TDataShard::PersistChangeRecord(NIceDb::TNiceDb& db, const TChangeRecord& r
855855 NIceDb::TUpdate<Schema::ChangeRecordDetails::Kind>(record.GetKind ()),
856856 NIceDb::TUpdate<Schema::ChangeRecordDetails::Body>(record.GetBody ()),
857857 NIceDb::TUpdate<Schema::ChangeRecordDetails::Source>(record.GetSource ()));
858+
859+ auto res = ChangesQueue.emplace (record.GetOrder (), record);
860+ Y_VERIFY_S (res.second , " Duplicate change record: " << record.GetOrder ());
861+
862+ if (res.first ->second .SchemaVersion ) {
863+ res.first ->second .SchemaSnapshotAcquired = SchemaSnapshotManager.AcquireReference (
864+ TSchemaSnapshotKey (res.first ->second .TableId , res.first ->second .SchemaVersion ));
865+ }
866+
867+ db.GetDatabase ().OnRollback ([this , order = record.GetOrder ()] {
868+ auto it = ChangesQueue.find (order);
869+ Y_VERIFY_S (it != ChangesQueue.end (), " Cannot find change record: " << order);
870+
871+ if (it->second .SchemaSnapshotAcquired ) {
872+ SchemaSnapshotManager.ReleaseReference (
873+ TSchemaSnapshotKey (it->second .TableId , it->second .SchemaVersion ));
874+ }
875+
876+ ChangesQueue.erase (it);
877+ });
858878 } else {
859879 auto & state = LockChangeRecords[lockId];
860880 Y_ABORT_UNLESS (state.Changes .empty () || state.Changes .back ().LockOffset < record.GetLockOffset (),
@@ -934,6 +954,14 @@ void TDataShard::CommitLockChangeRecords(NIceDb::TNiceDb& db, ui64 lockId, ui64
934954 committed.Step = rowVersion.Step ;
935955 committed.TxId = rowVersion.TxId ;
936956 collected.push_back (committed);
957+
958+ auto res = ChangesQueue.emplace (committed.Order , committed);
959+ Y_VERIFY_S (res.second , " Duplicate change record: " << committed.Order );
960+
961+ if (res.first ->second .SchemaVersion ) {
962+ res.first ->second .SchemaSnapshotAcquired = SchemaSnapshotManager.AcquireReference (
963+ TSchemaSnapshotKey (res.first ->second .TableId , res.first ->second .SchemaVersion ));
964+ }
937965 }
938966
939967 Y_VERIFY_S (!CommittedLockChangeRecords.contains (lockId), " Cannot commit lock " << lockId << " more than once" );
@@ -960,7 +988,24 @@ void TDataShard::CommitLockChangeRecords(NIceDb::TNiceDb& db, ui64 lockId, ui64
960988 LockChangeRecords.erase (it);
961989 });
962990 db.GetDatabase ().OnRollback ([this , lockId]() {
963- CommittedLockChangeRecords.erase (lockId);
991+ auto it = CommittedLockChangeRecords.find (lockId);
992+ Y_VERIFY_S (it != CommittedLockChangeRecords.end (), " Unexpected failure to find lockId# " << lockId);
993+
994+ for (size_t i = 0 ; i < it->second .Count ; ++i) {
995+ const ui64 order = it->second .Order + i;
996+
997+ auto cIt = ChangesQueue.find (order);
998+ Y_VERIFY_S (cIt != ChangesQueue.end (), " Cannot find change record: " << order);
999+
1000+ if (cIt->second .SchemaSnapshotAcquired ) {
1001+ SchemaSnapshotManager.ReleaseReference (
1002+ TSchemaSnapshotKey (cIt->second .TableId , cIt->second .SchemaVersion ));
1003+ }
1004+
1005+ ChangesQueue.erase (cIt);
1006+ }
1007+
1008+ CommittedLockChangeRecords.erase (it);
9641009 });
9651010}
9661011
@@ -1081,22 +1126,15 @@ void TDataShard::EnqueueChangeRecords(TVector<IDataShardChangeCollector::TChange
10811126 for (const auto & record : records) {
10821127 forward.emplace_back (record.Order , record.PathId , record.BodySize );
10831128
1084- auto res = ChangesQueue.emplace (
1085- std::piecewise_construct,
1086- std::forward_as_tuple (record.Order ),
1087- std::forward_as_tuple (record, now, cookie)
1088- );
1089- if (res.second ) {
1090- ChangesList.PushBack (&res.first ->second );
1129+ auto it = ChangesQueue.find (record.Order );
1130+ Y_ABORT_UNLESS (it != ChangesQueue.end ());
10911131
1092- Y_ABORT_UNLESS (ChangesQueueBytes <= (Max<ui64>() - record.BodySize ));
1093- ChangesQueueBytes += record.BodySize ;
1132+ it->second .EnqueuedAt = now;
1133+ it->second .ReservationCookie = cookie;
1134+ ChangesList.PushBack (&it->second );
10941135
1095- if (record.SchemaVersion ) {
1096- res.first ->second .SchemaSnapshotAcquired = SchemaSnapshotManager.AcquireReference (
1097- TSchemaSnapshotKey (record.TableId , record.SchemaVersion ));
1098- }
1099- }
1136+ Y_ABORT_UNLESS (ChangesQueueBytes <= (Max<ui64>() - record.BodySize ));
1137+ ChangesQueueBytes += record.BodySize ;
11001138 }
11011139
11021140 if (auto it = ChangeQueueReservations.find (cookie); it != ChangeQueueReservations.end ()) {
0 commit comments