@@ -855,6 +855,28 @@ void TDataShard::PersistChangeRecord(NIceDb::TNiceDb& db, const TChangeRecord& r
855855 NIceDb::TUpdate<Schema::ChangeRecordDetails::Kind>(record.GetKind ()),
856856 NIceDb::TUpdate<Schema::ChangeRecordDetails::Body>(record.GetBody ()),
857857 NIceDb::TUpdate<Schema::ChangeRecordDetails::Source>(record.GetSource ()));
858+
859+ auto res = ChangesQueue.emplace (record.GetOrder (), record);
860+ Y_VERIFY_S (res.second , " Duplicate change record: " << record.GetOrder ());
861+
862+ if (res.first ->second .SchemaVersion ) {
863+ res.first ->second .SchemaSnapshotAcquired = SchemaSnapshotManager.AcquireReference (
864+ TSchemaSnapshotKey (res.first ->second .TableId , res.first ->second .SchemaVersion ));
865+ }
866+
867+ db.GetDatabase ().OnRollback ([this , order = record.GetOrder ()] {
868+ auto it = ChangesQueue.find (order);
869+ Y_VERIFY_S (it != ChangesQueue.end (), " Cannot find change record: " << order);
870+
871+ if (it->second .SchemaSnapshotAcquired ) {
872+ const auto snapshotKey = TSchemaSnapshotKey (it->second .TableId , it->second .SchemaVersion );
873+ if (const auto last = SchemaSnapshotManager.ReleaseReference (snapshotKey)) {
874+ ScheduleRemoveSchemaSnapshot (snapshotKey);
875+ }
876+ }
877+
878+ ChangesQueue.erase (it);
879+ });
858880 } else {
859881 auto & state = LockChangeRecords[lockId];
860882 Y_ABORT_UNLESS (state.Changes .empty () || state.Changes .back ().LockOffset < record.GetLockOffset (),
@@ -934,6 +956,14 @@ void TDataShard::CommitLockChangeRecords(NIceDb::TNiceDb& db, ui64 lockId, ui64
934956 committed.Step = rowVersion.Step ;
935957 committed.TxId = rowVersion.TxId ;
936958 collected.push_back (committed);
959+
960+ auto res = ChangesQueue.emplace (committed.Order , committed);
961+ Y_VERIFY_S (res.second , " Duplicate change record: " << committed.Order );
962+
963+ if (res.first ->second .SchemaVersion ) {
964+ res.first ->second .SchemaSnapshotAcquired = SchemaSnapshotManager.AcquireReference (
965+ TSchemaSnapshotKey (res.first ->second .TableId , res.first ->second .SchemaVersion ));
966+ }
937967 }
938968
939969 Y_VERIFY_S (!CommittedLockChangeRecords.contains (lockId), " Cannot commit lock " << lockId << " more than once" );
@@ -960,7 +990,26 @@ void TDataShard::CommitLockChangeRecords(NIceDb::TNiceDb& db, ui64 lockId, ui64
960990 LockChangeRecords.erase (it);
961991 });
962992 db.GetDatabase ().OnRollback ([this , lockId]() {
963- CommittedLockChangeRecords.erase (lockId);
993+ auto it = CommittedLockChangeRecords.find (lockId);
994+ Y_VERIFY_S (it != CommittedLockChangeRecords.end (), " Unexpected failure to find lockId# " << lockId);
995+
996+ for (size_t i = 0 ; i < it->second .Count ; ++i) {
997+ const ui64 order = it->second .Order + i;
998+
999+ auto cIt = ChangesQueue.find (order);
1000+ Y_VERIFY_S (cIt != ChangesQueue.end (), " Cannot find change record: " << order);
1001+
1002+ if (cIt->second .SchemaSnapshotAcquired ) {
1003+ const auto snapshotKey = TSchemaSnapshotKey (cIt->second .TableId , cIt->second .SchemaVersion );
1004+ if (const auto last = SchemaSnapshotManager.ReleaseReference (snapshotKey)) {
1005+ ScheduleRemoveSchemaSnapshot (snapshotKey);
1006+ }
1007+ }
1008+
1009+ ChangesQueue.erase (cIt);
1010+ }
1011+
1012+ CommittedLockChangeRecords.erase (it);
9641013 });
9651014}
9661015
@@ -1022,23 +1071,9 @@ void TDataShard::RemoveChangeRecord(NIceDb::TNiceDb& db, ui64 order) {
10221071 ChangesQueueBytes -= record.BodySize ;
10231072
10241073 if (record.SchemaSnapshotAcquired ) {
1025- Y_ABORT_UNLESS (record.TableId );
1026- auto tableIt = TableInfos.find (record.TableId .LocalPathId );
1027-
1028- if (tableIt != TableInfos.end ()) {
1029- const auto snapshotKey = TSchemaSnapshotKey (record.TableId , record.SchemaVersion );
1030- const bool last = SchemaSnapshotManager.ReleaseReference (snapshotKey);
1031-
1032- if (last) {
1033- const auto * snapshot = SchemaSnapshotManager.FindSnapshot (snapshotKey);
1034- Y_ABORT_UNLESS (snapshot);
1035-
1036- if (snapshot->Schema ->GetTableSchemaVersion () < tableIt->second ->GetTableSchemaVersion ()) {
1037- SchemaSnapshotManager.RemoveShapshot (db, snapshotKey);
1038- }
1039- }
1040- } else {
1041- Y_DEBUG_ABORT_UNLESS (State == TShardState::PreOffline);
1074+ const auto snapshotKey = TSchemaSnapshotKey (record.TableId , record.SchemaVersion );
1075+ if (const bool last = SchemaSnapshotManager.ReleaseReference (snapshotKey)) {
1076+ ScheduleRemoveSchemaSnapshot (snapshotKey);
10421077 }
10431078 }
10441079
@@ -1081,22 +1116,15 @@ void TDataShard::EnqueueChangeRecords(TVector<IDataShardChangeCollector::TChange
10811116 for (const auto & record : records) {
10821117 forward.emplace_back (record.Order , record.PathId , record.BodySize );
10831118
1084- auto res = ChangesQueue.emplace (
1085- std::piecewise_construct,
1086- std::forward_as_tuple (record.Order ),
1087- std::forward_as_tuple (record, now, cookie)
1088- );
1089- if (res.second ) {
1090- ChangesList.PushBack (&res.first ->second );
1119+ auto it = ChangesQueue.find (record.Order );
1120+ Y_ABORT_UNLESS (it != ChangesQueue.end ());
10911121
1092- Y_ABORT_UNLESS (ChangesQueueBytes <= (Max<ui64>() - record.BodySize ));
1093- ChangesQueueBytes += record.BodySize ;
1122+ it->second .EnqueuedAt = now;
1123+ it->second .ReservationCookie = cookie;
1124+ ChangesList.PushBack (&it->second );
10941125
1095- if (record.SchemaVersion ) {
1096- res.first ->second .SchemaSnapshotAcquired = SchemaSnapshotManager.AcquireReference (
1097- TSchemaSnapshotKey (record.TableId , record.SchemaVersion ));
1098- }
1099- }
1126+ Y_ABORT_UNLESS (ChangesQueueBytes <= (Max<ui64>() - record.BodySize ));
1127+ ChangesQueueBytes += record.BodySize ;
11001128 }
11011129
11021130 if (auto it = ChangeQueueReservations.find (cookie); it != ChangeQueueReservations.end ()) {
@@ -1265,6 +1293,12 @@ bool TDataShard::LoadChangeRecords(NIceDb::TNiceDb& db, TVector<IDataShardChange
12651293 .SchemaVersion = schemaVersion,
12661294 });
12671295
1296+ auto res = ChangesQueue.emplace (records.back ().Order , records.back ());
1297+ if (res.second && res.first ->second .SchemaVersion ) {
1298+ res.first ->second .SchemaSnapshotAcquired = SchemaSnapshotManager.AcquireReference (
1299+ TSchemaSnapshotKey (res.first ->second .TableId , res.first ->second .SchemaVersion ));
1300+ }
1301+
12681302 if (!rowset.Next ()) {
12691303 return false ;
12701304 }
@@ -1363,6 +1397,12 @@ bool TDataShard::LoadChangeRecordCommits(NIceDb::TNiceDb& db, TVector<IDataShard
13631397 });
13641398 entry.Count ++;
13651399 needSort = true ;
1400+
1401+ auto res = ChangesQueue.emplace (records.back ().Order , records.back ());
1402+ if (res.second && res.first ->second .SchemaVersion ) {
1403+ res.first ->second .SchemaSnapshotAcquired = SchemaSnapshotManager.AcquireReference (
1404+ TSchemaSnapshotKey (res.first ->second .TableId , res.first ->second .SchemaVersion ));
1405+ }
13661406 }
13671407
13681408 LockChangeRecords.erase (lockId);
@@ -1421,6 +1461,46 @@ void TDataShard::ScheduleRemoveAbandonedLockChanges() {
14211461 }
14221462}
14231463
1464+ void TDataShard::ScheduleRemoveSchemaSnapshot (const TSchemaSnapshotKey& key) {
1465+ Y_ABORT_UNLESS (!SchemaSnapshotManager.HasReference (key));
1466+
1467+ const auto * snapshot = SchemaSnapshotManager.FindSnapshot (key);
1468+ Y_ABORT_UNLESS (snapshot);
1469+
1470+ auto it = TableInfos.find (key.PathId );
1471+ if (it == TableInfos.end ()) {
1472+ Y_DEBUG_ABORT_UNLESS (State == TShardState::PreOffline);
1473+ return ;
1474+ }
1475+
1476+ if (snapshot->Schema ->GetTableSchemaVersion () < it->second ->GetTableSchemaVersion ()) {
1477+ bool wasEmpty = PendingSchemaSnapshotsToRemove.empty ();
1478+ PendingSchemaSnapshotsToRemove.push_back (key);
1479+ if (wasEmpty) {
1480+ Send (SelfId (), new TEvPrivate::TEvRemoveSchemaSnapshots);
1481+ }
1482+ }
1483+ }
1484+
1485+ void TDataShard::ScheduleRemoveAbandonedSchemaSnapshots () {
1486+ bool wasEmpty = PendingSchemaSnapshotsToRemove.empty ();
1487+
1488+ for (const auto & [key, snapshot] : SchemaSnapshotManager.GetSnapshots ()) {
1489+ auto it = TableInfos.find (key.PathId );
1490+ if (it == TableInfos.end ()) {
1491+ Y_DEBUG_ABORT_UNLESS (State == TShardState::PreOffline);
1492+ break ;
1493+ }
1494+ if (snapshot.Schema ->GetTableSchemaVersion () < it->second ->GetTableSchemaVersion ()) {
1495+ PendingSchemaSnapshotsToRemove.push_back (key);
1496+ }
1497+ }
1498+
1499+ if (wasEmpty && !PendingSchemaSnapshotsToRemove.empty ()) {
1500+ Send (SelfId (), new TEvPrivate::TEvRemoveSchemaSnapshots);
1501+ }
1502+ }
1503+
14241504void TDataShard::PersistSchemeTxResult (NIceDb::TNiceDb &db, const TSchemaOperation &op) {
14251505 db.Table <Schema::SchemaOperations>().Key (op.TxId ).Update (
14261506 NIceDb::TUpdate<Schema::SchemaOperations::Success>(op.Success ),
0 commit comments