@@ -855,6 +855,39 @@ void TDataShard::PersistChangeRecord(NIceDb::TNiceDb& db, const TChangeRecord& r
855855 NIceDb::TUpdate<Schema::ChangeRecordDetails::Kind>(record.GetKind ()),
856856 NIceDb::TUpdate<Schema::ChangeRecordDetails::Body>(record.GetBody ()),
857857 NIceDb::TUpdate<Schema::ChangeRecordDetails::Source>(record.GetSource ()));
858+
859+ auto res = ChangesQueue.emplace (record.GetOrder (), record);
860+ Y_VERIFY_S (res.second , " Duplicate change record: " << record.GetOrder ());
861+
862+ if (res.first ->second .SchemaVersion ) {
863+ res.first ->second .SchemaSnapshotAcquired = SchemaSnapshotManager.AcquireReference (
864+ TSchemaSnapshotKey (res.first ->second .TableId , res.first ->second .SchemaVersion ));
865+ }
866+
867+ if (CommittingChangeRecords.empty ()) {
868+ db.GetDatabase ().OnCommit ([this ] {
869+ CommittingChangeRecords.clear ();
870+ });
871+ db.GetDatabase ().OnRollback ([this ] {
872+ for (const auto order : CommittingChangeRecords) {
873+ auto cIt = ChangesQueue.find (order);
874+ Y_VERIFY_S (cIt != ChangesQueue.end (), " Cannot find change record: " << order);
875+
876+ if (cIt->second .SchemaSnapshotAcquired ) {
877+ const auto snapshotKey = TSchemaSnapshotKey (cIt->second .TableId , cIt->second .SchemaVersion );
878+ if (const auto last = SchemaSnapshotManager.ReleaseReference (snapshotKey)) {
879+ ScheduleRemoveSchemaSnapshot (snapshotKey);
880+ }
881+ }
882+
883+ ChangesQueue.erase (cIt);
884+ }
885+
886+ CommittingChangeRecords.clear ();
887+ });
888+ }
889+
890+ CommittingChangeRecords.push_back (record.GetOrder ());
858891 } else {
859892 auto & state = LockChangeRecords[lockId];
860893 Y_ABORT_UNLESS (state.Changes .empty () || state.Changes .back ().LockOffset < record.GetLockOffset (),
@@ -934,6 +967,14 @@ void TDataShard::CommitLockChangeRecords(NIceDb::TNiceDb& db, ui64 lockId, ui64
934967 committed.Step = rowVersion.Step ;
935968 committed.TxId = rowVersion.TxId ;
936969 collected.push_back (committed);
970+
971+ auto res = ChangesQueue.emplace (committed.Order , committed);
972+ Y_VERIFY_S (res.second , " Duplicate change record: " << committed.Order );
973+
974+ if (res.first ->second .SchemaVersion ) {
975+ res.first ->second .SchemaSnapshotAcquired = SchemaSnapshotManager.AcquireReference (
976+ TSchemaSnapshotKey (res.first ->second .TableId , res.first ->second .SchemaVersion ));
977+ }
937978 }
938979
939980 Y_VERIFY_S (!CommittedLockChangeRecords.contains (lockId), " Cannot commit lock " << lockId << " more than once" );
@@ -960,7 +1001,26 @@ void TDataShard::CommitLockChangeRecords(NIceDb::TNiceDb& db, ui64 lockId, ui64
9601001 LockChangeRecords.erase (it);
9611002 });
9621003 db.GetDatabase ().OnRollback ([this , lockId]() {
963- CommittedLockChangeRecords.erase (lockId);
1004+ auto it = CommittedLockChangeRecords.find (lockId);
1005+ Y_VERIFY_S (it != CommittedLockChangeRecords.end (), " Unexpected failure to find lockId# " << lockId);
1006+
1007+ for (size_t i = 0 ; i < it->second .Count ; ++i) {
1008+ const ui64 order = it->second .Order + i;
1009+
1010+ auto cIt = ChangesQueue.find (order);
1011+ Y_VERIFY_S (cIt != ChangesQueue.end (), " Cannot find change record: " << order);
1012+
1013+ if (cIt->second .SchemaSnapshotAcquired ) {
1014+ const auto snapshotKey = TSchemaSnapshotKey (cIt->second .TableId , cIt->second .SchemaVersion );
1015+ if (const auto last = SchemaSnapshotManager.ReleaseReference (snapshotKey)) {
1016+ ScheduleRemoveSchemaSnapshot (snapshotKey);
1017+ }
1018+ }
1019+
1020+ ChangesQueue.erase (cIt);
1021+ }
1022+
1023+ CommittedLockChangeRecords.erase (it);
9641024 });
9651025}
9661026
@@ -994,7 +1054,6 @@ void TDataShard::RemoveChangeRecord(NIceDb::TNiceDb& db, ui64 order) {
9941054
9951055 auto it = ChangesQueue.find (order);
9961056 if (it == ChangesQueue.end ()) {
997- Y_VERIFY_DEBUG_S (false , " Trying to remove non-enqueud record: " << order);
9981057 return ;
9991058 }
10001059
@@ -1022,23 +1081,9 @@ void TDataShard::RemoveChangeRecord(NIceDb::TNiceDb& db, ui64 order) {
10221081 ChangesQueueBytes -= record.BodySize ;
10231082
10241083 if (record.SchemaSnapshotAcquired ) {
1025- Y_ABORT_UNLESS (record.TableId );
1026- auto tableIt = TableInfos.find (record.TableId .LocalPathId );
1027-
1028- if (tableIt != TableInfos.end ()) {
1029- const auto snapshotKey = TSchemaSnapshotKey (record.TableId , record.SchemaVersion );
1030- const bool last = SchemaSnapshotManager.ReleaseReference (snapshotKey);
1031-
1032- if (last) {
1033- const auto * snapshot = SchemaSnapshotManager.FindSnapshot (snapshotKey);
1034- Y_ABORT_UNLESS (snapshot);
1035-
1036- if (snapshot->Schema ->GetTableSchemaVersion () < tableIt->second ->GetTableSchemaVersion ()) {
1037- SchemaSnapshotManager.RemoveShapshot (db, snapshotKey);
1038- }
1039- }
1040- } else {
1041- Y_DEBUG_ABORT_UNLESS (State == TShardState::PreOffline);
1084+ const auto snapshotKey = TSchemaSnapshotKey (record.TableId , record.SchemaVersion );
1085+ if (const bool last = SchemaSnapshotManager.ReleaseReference (snapshotKey)) {
1086+ ScheduleRemoveSchemaSnapshot (snapshotKey);
10421087 }
10431088 }
10441089
@@ -1059,7 +1104,7 @@ void TDataShard::RemoveChangeRecord(NIceDb::TNiceDb& db, ui64 order) {
10591104 CheckChangesQueueNoOverflow ();
10601105}
10611106
1062- void TDataShard::EnqueueChangeRecords (TVector<IDataShardChangeCollector::TChange>&& records, ui64 cookie) {
1107+ void TDataShard::EnqueueChangeRecords (TVector<IDataShardChangeCollector::TChange>&& records, ui64 cookie, bool afterMove ) {
10631108 if (!records) {
10641109 return ;
10651110 }
@@ -1079,27 +1124,24 @@ void TDataShard::EnqueueChangeRecords(TVector<IDataShardChangeCollector::TChange
10791124 const auto now = AppData ()->TimeProvider ->Now ();
10801125 TVector<NChangeExchange::TEvChangeExchange::TEvEnqueueRecords::TRecordInfo> forward (Reserve (records.size ()));
10811126 for (const auto & record : records) {
1082- forward.emplace_back (record.Order , record.PathId , record.BodySize );
1127+ auto it = ChangesQueue.find (record.Order );
1128+ if (it == ChangesQueue.end ()) {
1129+ Y_ABORT_UNLESS (afterMove);
1130+ continue ;
1131+ }
10831132
1084- auto res = ChangesQueue.emplace (
1085- std::piecewise_construct,
1086- std::forward_as_tuple (record.Order ),
1087- std::forward_as_tuple (record, now, cookie)
1088- );
1089- if (res.second ) {
1090- ChangesList.PushBack (&res.first ->second );
1133+ forward.emplace_back (record.Order , record.PathId , record.BodySize );
10911134
1092- Y_ABORT_UNLESS (ChangesQueueBytes <= (Max<ui64>() - record.BodySize ));
1093- ChangesQueueBytes += record.BodySize ;
1135+ it->second .EnqueuedAt = now;
1136+ it->second .ReservationCookie = cookie;
1137+ ChangesList.PushBack (&it->second );
10941138
1095- if (record.SchemaVersion ) {
1096- res.first ->second .SchemaSnapshotAcquired = SchemaSnapshotManager.AcquireReference (
1097- TSchemaSnapshotKey (record.TableId , record.SchemaVersion ));
1098- }
1099- }
1139+ Y_ABORT_UNLESS (ChangesQueueBytes <= (Max<ui64>() - record.BodySize ));
1140+ ChangesQueueBytes += record.BodySize ;
11001141 }
1101-
1142+
11021143 if (auto it = ChangeQueueReservations.find (cookie); it != ChangeQueueReservations.end ()) {
1144+ Y_ABORT_UNLESS (!afterMove);
11031145 ChangeQueueReservedCapacity -= it->second ;
11041146 ChangeQueueReservedCapacity += records.size ();
11051147 }
@@ -1265,6 +1307,14 @@ bool TDataShard::LoadChangeRecords(NIceDb::TNiceDb& db, TVector<IDataShardChange
12651307 .SchemaVersion = schemaVersion,
12661308 });
12671309
1310+ auto res = ChangesQueue.emplace (records.back ().Order , records.back ());
1311+ Y_VERIFY_S (res.second , " Duplicate change record: " << records.back ().Order );
1312+
1313+ if (res.first ->second .SchemaVersion ) {
1314+ res.first ->second .SchemaSnapshotAcquired = SchemaSnapshotManager.AcquireReference (
1315+ TSchemaSnapshotKey (res.first ->second .TableId , res.first ->second .SchemaVersion ));
1316+ }
1317+
12681318 if (!rowset.Next ()) {
12691319 return false ;
12701320 }
@@ -1363,6 +1413,14 @@ bool TDataShard::LoadChangeRecordCommits(NIceDb::TNiceDb& db, TVector<IDataShard
13631413 });
13641414 entry.Count ++;
13651415 needSort = true ;
1416+
1417+ auto res = ChangesQueue.emplace (records.back ().Order , records.back ());
1418+ Y_VERIFY_S (res.second , " Duplicate change record: " << records.back ().Order );
1419+
1420+ if (res.first ->second .SchemaVersion ) {
1421+ res.first ->second .SchemaSnapshotAcquired = SchemaSnapshotManager.AcquireReference (
1422+ TSchemaSnapshotKey (res.first ->second .TableId , res.first ->second .SchemaVersion ));
1423+ }
13661424 }
13671425
13681426 LockChangeRecords.erase (lockId);
@@ -1421,6 +1479,51 @@ void TDataShard::ScheduleRemoveAbandonedLockChanges() {
14211479 }
14221480}
14231481
1482+ void TDataShard::ScheduleRemoveSchemaSnapshot (const TSchemaSnapshotKey& key) {
1483+ Y_ABORT_UNLESS (!SchemaSnapshotManager.HasReference (key));
1484+
1485+ const auto * snapshot = SchemaSnapshotManager.FindSnapshot (key);
1486+ Y_ABORT_UNLESS (snapshot);
1487+
1488+ auto it = TableInfos.find (key.PathId );
1489+ if (it == TableInfos.end ()) {
1490+ Y_DEBUG_ABORT_UNLESS (State == TShardState::PreOffline);
1491+ return ;
1492+ }
1493+
1494+ if (snapshot->Schema ->GetTableSchemaVersion () < it->second ->GetTableSchemaVersion ()) {
1495+ bool wasEmpty = PendingSchemaSnapshotsToGc.empty ();
1496+ PendingSchemaSnapshotsToGc.push_back (key);
1497+ if (wasEmpty) {
1498+ Send (SelfId (), new TEvPrivate::TEvRemoveSchemaSnapshots);
1499+ }
1500+ }
1501+ }
1502+
1503+ void TDataShard::ScheduleRemoveAbandonedSchemaSnapshots () {
1504+ bool wasEmpty = PendingSchemaSnapshotsToGc.empty ();
1505+
1506+ for (const auto & [key, snapshot] : SchemaSnapshotManager.GetSnapshots ()) {
1507+ auto it = TableInfos.find (key.PathId );
1508+ if (it == TableInfos.end ()) {
1509+ Y_DEBUG_ABORT_UNLESS (State == TShardState::PreOffline);
1510+ break ;
1511+ }
1512+ if (SchemaSnapshotManager.HasReference (key)) {
1513+ continue ;
1514+ }
1515+ if (snapshot.Schema ->GetTableSchemaVersion () >= it->second ->GetTableSchemaVersion ()) {
1516+ continue ;
1517+ }
1518+
1519+ PendingSchemaSnapshotsToGc.push_back (key);
1520+ }
1521+
1522+ if (wasEmpty && !PendingSchemaSnapshotsToGc.empty ()) {
1523+ Send (SelfId (), new TEvPrivate::TEvRemoveSchemaSnapshots);
1524+ }
1525+ }
1526+
14241527void TDataShard::PersistSchemeTxResult (NIceDb::TNiceDb &db, const TSchemaOperation &op) {
14251528 db.Table <Schema::SchemaOperations>().Key (op.TxId ).Update (
14261529 NIceDb::TUpdate<Schema::SchemaOperations::Success>(op.Success ),
@@ -1649,8 +1752,18 @@ void TDataShard::AddSchemaSnapshot(const TPathId& pathId, ui64 tableSchemaVersio
16491752 Y_ABORT_UNLESS (TableInfos.contains (pathId.LocalPathId ));
16501753 auto tableInfo = TableInfos[pathId.LocalPathId ];
16511754
1652- const auto key = TSchemaSnapshotKey (pathId. OwnerId , pathId. LocalPathId , tableSchemaVersion);
1755+ const auto key = TSchemaSnapshotKey (pathId, tableSchemaVersion);
16531756 SchemaSnapshotManager.AddSnapshot (txc.DB , key, TSchemaSnapshot (tableInfo, step, txId));
1757+
1758+ const auto & snapshots = SchemaSnapshotManager.GetSnapshots ();
1759+ for (auto it = snapshots.lower_bound (TSchemaSnapshotKey (pathId, 1 )); it != snapshots.end (); ++it) {
1760+ if (it->first == key) {
1761+ break ;
1762+ }
1763+ if (!SchemaSnapshotManager.HasReference (it->first )) {
1764+ ScheduleRemoveSchemaSnapshot (it->first );
1765+ }
1766+ }
16541767}
16551768
16561769void TDataShard::PersistLastLoanTableTid (NIceDb::TNiceDb& db, ui32 localTid) {
0 commit comments