@@ -855,6 +855,45 @@ void TDataShard::PersistChangeRecord(NIceDb::TNiceDb& db, const TChangeRecord& r
855855 NIceDb::TUpdate<Schema::ChangeRecordDetails::Kind>(record.GetKind ()),
856856 NIceDb::TUpdate<Schema::ChangeRecordDetails::Body>(record.GetBody ()),
857857 NIceDb::TUpdate<Schema::ChangeRecordDetails::Source>(record.GetSource ()));
858+
859+ auto res = ChangesQueue.emplace (record.GetOrder (), record);
860+ Y_VERIFY_S (res.second , " Duplicate change record: " << record.GetOrder ());
861+
862+ if (res.first ->second .SchemaVersion ) {
863+ res.first ->second .SchemaSnapshotAcquired = SchemaSnapshotManager.AcquireReference (
864+ TSchemaSnapshotKey (res.first ->second .TableId , res.first ->second .SchemaVersion ));
865+ }
866+
867+ const auto key = TCommittingChangeRecordsKey::FromRecord (record);
868+ if (!CommittingChangeRecords.contains (key)) {
869+ db.GetDatabase ().OnCommit ([this , key] {
870+ auto it = CommittingChangeRecords.find (key);
871+ Y_ABORT_UNLESS (it != CommittingChangeRecords.end ());
872+ CommittingChangeRecords.erase (it);
873+ });
874+ db.GetDatabase ().OnRollback ([this , key] {
875+ auto it = CommittingChangeRecords.find (key);
876+ Y_ABORT_UNLESS (it != CommittingChangeRecords.end ());
877+
878+ for (const auto order : it->second ) {
879+ auto cIt = ChangesQueue.find (order);
880+ Y_VERIFY_S (cIt != ChangesQueue.end (), " Cannot find change record: " << order);
881+
882+ if (cIt->second .SchemaSnapshotAcquired ) {
883+ const auto snapshotKey = TSchemaSnapshotKey (cIt->second .TableId , cIt->second .SchemaVersion );
884+ if (const auto last = SchemaSnapshotManager.ReleaseReference (snapshotKey)) {
885+ ScheduleRemoveSchemaSnapshot (snapshotKey);
886+ }
887+ }
888+
889+ ChangesQueue.erase (cIt);
890+ }
891+
892+ CommittingChangeRecords.erase (it);
893+ });
894+ }
895+
896+ CommittingChangeRecords[key].push_back (record.GetOrder ());
858897 } else {
859898 auto & state = LockChangeRecords[lockId];
860899 Y_ABORT_UNLESS (state.Changes .empty () || state.Changes .back ().LockOffset < record.GetLockOffset (),
@@ -934,6 +973,14 @@ void TDataShard::CommitLockChangeRecords(NIceDb::TNiceDb& db, ui64 lockId, ui64
934973 committed.Step = rowVersion.Step ;
935974 committed.TxId = rowVersion.TxId ;
936975 collected.push_back (committed);
976+
977+ auto res = ChangesQueue.emplace (committed.Order , committed);
978+ Y_VERIFY_S (res.second , " Duplicate change record: " << committed.Order );
979+
980+ if (res.first ->second .SchemaVersion ) {
981+ res.first ->second .SchemaSnapshotAcquired = SchemaSnapshotManager.AcquireReference (
982+ TSchemaSnapshotKey (res.first ->second .TableId , res.first ->second .SchemaVersion ));
983+ }
937984 }
938985
939986 Y_VERIFY_S (!CommittedLockChangeRecords.contains (lockId), " Cannot commit lock " << lockId << " more than once" );
@@ -960,7 +1007,26 @@ void TDataShard::CommitLockChangeRecords(NIceDb::TNiceDb& db, ui64 lockId, ui64
9601007 LockChangeRecords.erase (it);
9611008 });
9621009 db.GetDatabase ().OnRollback ([this , lockId]() {
963- CommittedLockChangeRecords.erase (lockId);
1010+ auto it = CommittedLockChangeRecords.find (lockId);
1011+ Y_VERIFY_S (it != CommittedLockChangeRecords.end (), " Unexpected failure to find lockId# " << lockId);
1012+
1013+ for (size_t i = 0 ; i < it->second .Count ; ++i) {
1014+ const ui64 order = it->second .Order + i;
1015+
1016+ auto cIt = ChangesQueue.find (order);
1017+ Y_VERIFY_S (cIt != ChangesQueue.end (), " Cannot find change record: " << order);
1018+
1019+ if (cIt->second .SchemaSnapshotAcquired ) {
1020+ const auto snapshotKey = TSchemaSnapshotKey (cIt->second .TableId , cIt->second .SchemaVersion );
1021+ if (const auto last = SchemaSnapshotManager.ReleaseReference (snapshotKey)) {
1022+ ScheduleRemoveSchemaSnapshot (snapshotKey);
1023+ }
1024+ }
1025+
1026+ ChangesQueue.erase (cIt);
1027+ }
1028+
1029+ CommittedLockChangeRecords.erase (it);
9641030 });
9651031}
9661032
@@ -1022,23 +1088,9 @@ void TDataShard::RemoveChangeRecord(NIceDb::TNiceDb& db, ui64 order) {
10221088 ChangesQueueBytes -= record.BodySize ;
10231089
10241090 if (record.SchemaSnapshotAcquired ) {
1025- Y_ABORT_UNLESS (record.TableId );
1026- auto tableIt = TableInfos.find (record.TableId .LocalPathId );
1027-
1028- if (tableIt != TableInfos.end ()) {
1029- const auto snapshotKey = TSchemaSnapshotKey (record.TableId , record.SchemaVersion );
1030- const bool last = SchemaSnapshotManager.ReleaseReference (snapshotKey);
1031-
1032- if (last) {
1033- const auto * snapshot = SchemaSnapshotManager.FindSnapshot (snapshotKey);
1034- Y_ABORT_UNLESS (snapshot);
1035-
1036- if (snapshot->Schema ->GetTableSchemaVersion () < tableIt->second ->GetTableSchemaVersion ()) {
1037- SchemaSnapshotManager.RemoveShapshot (db, snapshotKey);
1038- }
1039- }
1040- } else {
1041- Y_DEBUG_ABORT_UNLESS (State == TShardState::PreOffline);
1091+ const auto snapshotKey = TSchemaSnapshotKey (record.TableId , record.SchemaVersion );
1092+ if (const bool last = SchemaSnapshotManager.ReleaseReference (snapshotKey)) {
1093+ ScheduleRemoveSchemaSnapshot (snapshotKey);
10421094 }
10431095 }
10441096
@@ -1081,22 +1133,15 @@ void TDataShard::EnqueueChangeRecords(TVector<IDataShardChangeCollector::TChange
10811133 for (const auto & record : records) {
10821134 forward.emplace_back (record.Order , record.PathId , record.BodySize );
10831135
1084- auto res = ChangesQueue.emplace (
1085- std::piecewise_construct,
1086- std::forward_as_tuple (record.Order ),
1087- std::forward_as_tuple (record, now, cookie)
1088- );
1089- if (res.second ) {
1090- ChangesList.PushBack (&res.first ->second );
1136+ auto it = ChangesQueue.find (record.Order );
1137+ Y_ABORT_UNLESS (it != ChangesQueue.end ());
10911138
1092- Y_ABORT_UNLESS (ChangesQueueBytes <= (Max<ui64>() - record.BodySize ));
1093- ChangesQueueBytes += record.BodySize ;
1139+ it->second .EnqueuedAt = now;
1140+ it->second .ReservationCookie = cookie;
1141+ ChangesList.PushBack (&it->second );
10941142
1095- if (record.SchemaVersion ) {
1096- res.first ->second .SchemaSnapshotAcquired = SchemaSnapshotManager.AcquireReference (
1097- TSchemaSnapshotKey (record.TableId , record.SchemaVersion ));
1098- }
1099- }
1143+ Y_ABORT_UNLESS (ChangesQueueBytes <= (Max<ui64>() - record.BodySize ));
1144+ ChangesQueueBytes += record.BodySize ;
11001145 }
11011146
11021147 if (auto it = ChangeQueueReservations.find (cookie); it != ChangeQueueReservations.end ()) {
@@ -1265,6 +1310,14 @@ bool TDataShard::LoadChangeRecords(NIceDb::TNiceDb& db, TVector<IDataShardChange
12651310 .SchemaVersion = schemaVersion,
12661311 });
12671312
1313+ auto res = ChangesQueue.emplace (records.back ().Order , records.back ());
1314+ Y_VERIFY_S (res.second , " Duplicate change record: " << records.back ().Order );
1315+
1316+ if (res.first ->second .SchemaVersion ) {
1317+ res.first ->second .SchemaSnapshotAcquired = SchemaSnapshotManager.AcquireReference (
1318+ TSchemaSnapshotKey (res.first ->second .TableId , res.first ->second .SchemaVersion ));
1319+ }
1320+
12681321 if (!rowset.Next ()) {
12691322 return false ;
12701323 }
@@ -1363,6 +1416,14 @@ bool TDataShard::LoadChangeRecordCommits(NIceDb::TNiceDb& db, TVector<IDataShard
13631416 });
13641417 entry.Count ++;
13651418 needSort = true ;
1419+
1420+ auto res = ChangesQueue.emplace (records.back ().Order , records.back ());
1421+ Y_VERIFY_S (res.second , " Duplicate change record: " << records.back ().Order );
1422+
1423+ if (res.first ->second .SchemaVersion ) {
1424+ res.first ->second .SchemaSnapshotAcquired = SchemaSnapshotManager.AcquireReference (
1425+ TSchemaSnapshotKey (res.first ->second .TableId , res.first ->second .SchemaVersion ));
1426+ }
13661427 }
13671428
13681429 LockChangeRecords.erase (lockId);
@@ -1421,6 +1482,51 @@ void TDataShard::ScheduleRemoveAbandonedLockChanges() {
14211482 }
14221483}
14231484
1485+ void TDataShard::ScheduleRemoveSchemaSnapshot (const TSchemaSnapshotKey& key) {
1486+ Y_ABORT_UNLESS (!SchemaSnapshotManager.HasReference (key));
1487+
1488+ const auto * snapshot = SchemaSnapshotManager.FindSnapshot (key);
1489+ Y_ABORT_UNLESS (snapshot);
1490+
1491+ auto it = TableInfos.find (key.PathId );
1492+ if (it == TableInfos.end ()) {
1493+ Y_DEBUG_ABORT_UNLESS (State == TShardState::PreOffline);
1494+ return ;
1495+ }
1496+
1497+ if (snapshot->Schema ->GetTableSchemaVersion () < it->second ->GetTableSchemaVersion ()) {
1498+ bool wasEmpty = PendingSchemaSnapshotsToRemove.empty ();
1499+ PendingSchemaSnapshotsToRemove.push_back (key);
1500+ if (wasEmpty) {
1501+ Send (SelfId (), new TEvPrivate::TEvRemoveSchemaSnapshots);
1502+ }
1503+ }
1504+ }
1505+
1506+ void TDataShard::ScheduleRemoveAbandonedSchemaSnapshots () {
1507+ bool wasEmpty = PendingSchemaSnapshotsToRemove.empty ();
1508+
1509+ for (const auto & [key, snapshot] : SchemaSnapshotManager.GetSnapshots ()) {
1510+ auto it = TableInfos.find (key.PathId );
1511+ if (it == TableInfos.end ()) {
1512+ Y_DEBUG_ABORT_UNLESS (State == TShardState::PreOffline);
1513+ break ;
1514+ }
1515+ if (SchemaSnapshotManager.HasReference (key)) {
1516+ continue ;
1517+ }
1518+ if (snapshot.Schema ->GetTableSchemaVersion () >= it->second ->GetTableSchemaVersion ()) {
1519+ continue ;
1520+ }
1521+
1522+ PendingSchemaSnapshotsToRemove.push_back (key);
1523+ }
1524+
1525+ if (wasEmpty && !PendingSchemaSnapshotsToRemove.empty ()) {
1526+ Send (SelfId (), new TEvPrivate::TEvRemoveSchemaSnapshots);
1527+ }
1528+ }
1529+
14241530void TDataShard::PersistSchemeTxResult (NIceDb::TNiceDb &db, const TSchemaOperation &op) {
14251531 db.Table <Schema::SchemaOperations>().Key (op.TxId ).Update (
14261532 NIceDb::TUpdate<Schema::SchemaOperations::Success>(op.Success ),
@@ -1649,8 +1755,18 @@ void TDataShard::AddSchemaSnapshot(const TPathId& pathId, ui64 tableSchemaVersio
16491755 Y_ABORT_UNLESS (TableInfos.contains (pathId.LocalPathId ));
16501756 auto tableInfo = TableInfos[pathId.LocalPathId ];
16511757
1652- const auto key = TSchemaSnapshotKey (pathId. OwnerId , pathId. LocalPathId , tableSchemaVersion);
1758+ const auto key = TSchemaSnapshotKey (pathId, tableSchemaVersion);
16531759 SchemaSnapshotManager.AddSnapshot (txc.DB , key, TSchemaSnapshot (tableInfo, step, txId));
1760+
1761+ const auto & snapshots = SchemaSnapshotManager.GetSnapshots ();
1762+ for (auto it = snapshots.lower_bound (TSchemaSnapshotKey (pathId, 1 )); it != snapshots.end (); ++it) {
1763+ if (it->first == key) {
1764+ break ;
1765+ }
1766+ if (!SchemaSnapshotManager.HasReference (it->first )) {
1767+ ScheduleRemoveSchemaSnapshot (it->first );
1768+ }
1769+ }
16541770}
16551771
16561772void TDataShard::PersistLastLoanTableTid (NIceDb::TNiceDb& db, ui32 localTid) {
0 commit comments