@@ -855,6 +855,45 @@ void TDataShard::PersistChangeRecord(NIceDb::TNiceDb& db, const TChangeRecord& r
855855 NIceDb::TUpdate<Schema::ChangeRecordDetails::Kind>(record.GetKind ()),
856856 NIceDb::TUpdate<Schema::ChangeRecordDetails::Body>(record.GetBody ()),
857857 NIceDb::TUpdate<Schema::ChangeRecordDetails::Source>(record.GetSource ()));
858+
859+ auto res = ChangesQueue.emplace (record.GetOrder (), record);
860+ Y_VERIFY_S (res.second , " Duplicate change record: " << record.GetOrder ());
861+
862+ if (res.first ->second .SchemaVersion ) {
863+ res.first ->second .SchemaSnapshotAcquired = SchemaSnapshotManager.AcquireReference (
864+ TSchemaSnapshotKey (res.first ->second .TableId , res.first ->second .SchemaVersion ));
865+ }
866+
867+ const auto key = TCommittingChangeRecordsKey::FromRecord (record);
868+ if (!CommittingChangeRecords.contains (key)) {
869+ db.GetDatabase ().OnCommit ([this , key] {
870+ auto it = CommittingChangeRecords.find (key);
871+ Y_ABORT_UNLESS (it != CommittingChangeRecords.end ());
872+ CommittingChangeRecords.erase (it);
873+ });
874+ db.GetDatabase ().OnRollback ([this , key] {
875+ auto it = CommittingChangeRecords.find (key);
876+ Y_ABORT_UNLESS (it != CommittingChangeRecords.end ());
877+
878+ for (const auto order : it->second ) {
879+ auto cIt = ChangesQueue.find (order);
880+ Y_VERIFY_S (cIt != ChangesQueue.end (), " Cannot find change record: " << order);
881+
882+ if (cIt->second .SchemaSnapshotAcquired ) {
883+ const auto snapshotKey = TSchemaSnapshotKey (cIt->second .TableId , cIt->second .SchemaVersion );
884+ if (const auto last = SchemaSnapshotManager.ReleaseReference (snapshotKey)) {
885+ ScheduleRemoveSchemaSnapshot (snapshotKey);
886+ }
887+ }
888+
889+ ChangesQueue.erase (cIt);
890+ }
891+
892+ CommittingChangeRecords.erase (it);
893+ });
894+ }
895+
896+ CommittingChangeRecords[key].push_back (record.GetOrder ());
858897 } else {
859898 auto & state = LockChangeRecords[lockId];
860899 Y_ABORT_UNLESS (state.Changes .empty () || state.Changes .back ().LockOffset < record.GetLockOffset (),
@@ -934,6 +973,14 @@ void TDataShard::CommitLockChangeRecords(NIceDb::TNiceDb& db, ui64 lockId, ui64
934973 committed.Step = rowVersion.Step ;
935974 committed.TxId = rowVersion.TxId ;
936975 collected.push_back (committed);
976+
977+ auto res = ChangesQueue.emplace (committed.Order , committed);
978+ Y_VERIFY_S (res.second , " Duplicate change record: " << committed.Order );
979+
980+ if (res.first ->second .SchemaVersion ) {
981+ res.first ->second .SchemaSnapshotAcquired = SchemaSnapshotManager.AcquireReference (
982+ TSchemaSnapshotKey (res.first ->second .TableId , res.first ->second .SchemaVersion ));
983+ }
937984 }
938985
939986 Y_VERIFY_S (!CommittedLockChangeRecords.contains (lockId), " Cannot commit lock " << lockId << " more than once" );
@@ -960,7 +1007,26 @@ void TDataShard::CommitLockChangeRecords(NIceDb::TNiceDb& db, ui64 lockId, ui64
9601007 LockChangeRecords.erase (it);
9611008 });
9621009 db.GetDatabase ().OnRollback ([this , lockId]() {
963- CommittedLockChangeRecords.erase (lockId);
1010+ auto it = CommittedLockChangeRecords.find (lockId);
1011+ Y_VERIFY_S (it != CommittedLockChangeRecords.end (), " Unexpected failure to find lockId# " << lockId);
1012+
1013+ for (size_t i = 0 ; i < it->second .Count ; ++i) {
1014+ const ui64 order = it->second .Order + i;
1015+
1016+ auto cIt = ChangesQueue.find (order);
1017+ Y_VERIFY_S (cIt != ChangesQueue.end (), " Cannot find change record: " << order);
1018+
1019+ if (cIt->second .SchemaSnapshotAcquired ) {
1020+ const auto snapshotKey = TSchemaSnapshotKey (cIt->second .TableId , cIt->second .SchemaVersion );
1021+ if (const auto last = SchemaSnapshotManager.ReleaseReference (snapshotKey)) {
1022+ ScheduleRemoveSchemaSnapshot (snapshotKey);
1023+ }
1024+ }
1025+
1026+ ChangesQueue.erase (cIt);
1027+ }
1028+
1029+ CommittedLockChangeRecords.erase (it);
9641030 });
9651031}
9661032
@@ -994,7 +1060,6 @@ void TDataShard::RemoveChangeRecord(NIceDb::TNiceDb& db, ui64 order) {
9941060
9951061 auto it = ChangesQueue.find (order);
9961062 if (it == ChangesQueue.end ()) {
997- Y_VERIFY_DEBUG_S (false , " Trying to remove non-enqueud record: " << order);
9981063 return ;
9991064 }
10001065
@@ -1022,23 +1087,9 @@ void TDataShard::RemoveChangeRecord(NIceDb::TNiceDb& db, ui64 order) {
10221087 ChangesQueueBytes -= record.BodySize ;
10231088
10241089 if (record.SchemaSnapshotAcquired ) {
1025- Y_ABORT_UNLESS (record.TableId );
1026- auto tableIt = TableInfos.find (record.TableId .LocalPathId );
1027-
1028- if (tableIt != TableInfos.end ()) {
1029- const auto snapshotKey = TSchemaSnapshotKey (record.TableId , record.SchemaVersion );
1030- const bool last = SchemaSnapshotManager.ReleaseReference (snapshotKey);
1031-
1032- if (last) {
1033- const auto * snapshot = SchemaSnapshotManager.FindSnapshot (snapshotKey);
1034- Y_ABORT_UNLESS (snapshot);
1035-
1036- if (snapshot->Schema ->GetTableSchemaVersion () < tableIt->second ->GetTableSchemaVersion ()) {
1037- SchemaSnapshotManager.RemoveShapshot (db, snapshotKey);
1038- }
1039- }
1040- } else {
1041- Y_DEBUG_ABORT_UNLESS (State == TShardState::PreOffline);
1090+ const auto snapshotKey = TSchemaSnapshotKey (record.TableId , record.SchemaVersion );
1091+ if (const bool last = SchemaSnapshotManager.ReleaseReference (snapshotKey)) {
1092+ ScheduleRemoveSchemaSnapshot (snapshotKey);
10421093 }
10431094 }
10441095
@@ -1059,7 +1110,7 @@ void TDataShard::RemoveChangeRecord(NIceDb::TNiceDb& db, ui64 order) {
10591110 CheckChangesQueueNoOverflow ();
10601111}
10611112
1062- void TDataShard::EnqueueChangeRecords (TVector<IDataShardChangeCollector::TChange>&& records, ui64 cookie) {
1113+ void TDataShard::EnqueueChangeRecords (TVector<IDataShardChangeCollector::TChange>&& records, ui64 cookie, bool afterMove ) {
10631114 if (!records) {
10641115 return ;
10651116 }
@@ -1079,27 +1130,24 @@ void TDataShard::EnqueueChangeRecords(TVector<IDataShardChangeCollector::TChange
10791130 const auto now = AppData ()->TimeProvider ->Now ();
10801131 TVector<NChangeExchange::TEvChangeExchange::TEvEnqueueRecords::TRecordInfo> forward (Reserve (records.size ()));
10811132 for (const auto & record : records) {
1082- forward.emplace_back (record.Order , record.PathId , record.BodySize );
1133+ auto it = ChangesQueue.find (record.Order );
1134+ if (it == ChangesQueue.end ()) {
1135+ Y_ABORT_UNLESS (afterMove);
1136+ continue ;
1137+ }
10831138
1084- auto res = ChangesQueue.emplace (
1085- std::piecewise_construct,
1086- std::forward_as_tuple (record.Order ),
1087- std::forward_as_tuple (record, now, cookie)
1088- );
1089- if (res.second ) {
1090- ChangesList.PushBack (&res.first ->second );
1139+ forward.emplace_back (record.Order , record.PathId , record.BodySize );
10911140
1092- Y_ABORT_UNLESS (ChangesQueueBytes <= (Max<ui64>() - record.BodySize ));
1093- ChangesQueueBytes += record.BodySize ;
1141+ it->second .EnqueuedAt = now;
1142+ it->second .ReservationCookie = cookie;
1143+ ChangesList.PushBack (&it->second );
10941144
1095- if (record.SchemaVersion ) {
1096- res.first ->second .SchemaSnapshotAcquired = SchemaSnapshotManager.AcquireReference (
1097- TSchemaSnapshotKey (record.TableId , record.SchemaVersion ));
1098- }
1099- }
1145+ Y_ABORT_UNLESS (ChangesQueueBytes <= (Max<ui64>() - record.BodySize ));
1146+ ChangesQueueBytes += record.BodySize ;
11001147 }
1101-
1148+
11021149 if (auto it = ChangeQueueReservations.find (cookie); it != ChangeQueueReservations.end ()) {
1150+ Y_ABORT_UNLESS (!afterMove);
11031151 ChangeQueueReservedCapacity -= it->second ;
11041152 ChangeQueueReservedCapacity += records.size ();
11051153 }
@@ -1265,6 +1313,14 @@ bool TDataShard::LoadChangeRecords(NIceDb::TNiceDb& db, TVector<IDataShardChange
12651313 .SchemaVersion = schemaVersion,
12661314 });
12671315
1316+ auto res = ChangesQueue.emplace (records.back ().Order , records.back ());
1317+ Y_VERIFY_S (res.second , " Duplicate change record: " << records.back ().Order );
1318+
1319+ if (res.first ->second .SchemaVersion ) {
1320+ res.first ->second .SchemaSnapshotAcquired = SchemaSnapshotManager.AcquireReference (
1321+ TSchemaSnapshotKey (res.first ->second .TableId , res.first ->second .SchemaVersion ));
1322+ }
1323+
12681324 if (!rowset.Next ()) {
12691325 return false ;
12701326 }
@@ -1363,6 +1419,14 @@ bool TDataShard::LoadChangeRecordCommits(NIceDb::TNiceDb& db, TVector<IDataShard
13631419 });
13641420 entry.Count ++;
13651421 needSort = true ;
1422+
1423+ auto res = ChangesQueue.emplace (records.back ().Order , records.back ());
1424+ Y_VERIFY_S (res.second , " Duplicate change record: " << records.back ().Order );
1425+
1426+ if (res.first ->second .SchemaVersion ) {
1427+ res.first ->second .SchemaSnapshotAcquired = SchemaSnapshotManager.AcquireReference (
1428+ TSchemaSnapshotKey (res.first ->second .TableId , res.first ->second .SchemaVersion ));
1429+ }
13661430 }
13671431
13681432 LockChangeRecords.erase (lockId);
@@ -1421,6 +1485,51 @@ void TDataShard::ScheduleRemoveAbandonedLockChanges() {
14211485 }
14221486}
14231487
1488+ void TDataShard::ScheduleRemoveSchemaSnapshot (const TSchemaSnapshotKey& key) {
1489+ Y_ABORT_UNLESS (!SchemaSnapshotManager.HasReference (key));
1490+
1491+ const auto * snapshot = SchemaSnapshotManager.FindSnapshot (key);
1492+ Y_ABORT_UNLESS (snapshot);
1493+
1494+ auto it = TableInfos.find (key.PathId );
1495+ if (it == TableInfos.end ()) {
1496+ Y_DEBUG_ABORT_UNLESS (State == TShardState::PreOffline);
1497+ return ;
1498+ }
1499+
1500+ if (snapshot->Schema ->GetTableSchemaVersion () < it->second ->GetTableSchemaVersion ()) {
1501+ bool wasEmpty = PendingSchemaSnapshotsToRemove.empty ();
1502+ PendingSchemaSnapshotsToRemove.push_back (key);
1503+ if (wasEmpty) {
1504+ Send (SelfId (), new TEvPrivate::TEvRemoveSchemaSnapshots);
1505+ }
1506+ }
1507+ }
1508+
1509+ void TDataShard::ScheduleRemoveAbandonedSchemaSnapshots () {
1510+ bool wasEmpty = PendingSchemaSnapshotsToRemove.empty ();
1511+
1512+ for (const auto & [key, snapshot] : SchemaSnapshotManager.GetSnapshots ()) {
1513+ auto it = TableInfos.find (key.PathId );
1514+ if (it == TableInfos.end ()) {
1515+ Y_DEBUG_ABORT_UNLESS (State == TShardState::PreOffline);
1516+ break ;
1517+ }
1518+ if (SchemaSnapshotManager.HasReference (key)) {
1519+ continue ;
1520+ }
1521+ if (snapshot.Schema ->GetTableSchemaVersion () >= it->second ->GetTableSchemaVersion ()) {
1522+ continue ;
1523+ }
1524+
1525+ PendingSchemaSnapshotsToRemove.push_back (key);
1526+ }
1527+
1528+ if (wasEmpty && !PendingSchemaSnapshotsToRemove.empty ()) {
1529+ Send (SelfId (), new TEvPrivate::TEvRemoveSchemaSnapshots);
1530+ }
1531+ }
1532+
14241533void TDataShard::PersistSchemeTxResult (NIceDb::TNiceDb &db, const TSchemaOperation &op) {
14251534 db.Table <Schema::SchemaOperations>().Key (op.TxId ).Update (
14261535 NIceDb::TUpdate<Schema::SchemaOperations::Success>(op.Success ),
@@ -1649,8 +1758,18 @@ void TDataShard::AddSchemaSnapshot(const TPathId& pathId, ui64 tableSchemaVersio
16491758 Y_ABORT_UNLESS (TableInfos.contains (pathId.LocalPathId ));
16501759 auto tableInfo = TableInfos[pathId.LocalPathId ];
16511760
1652- const auto key = TSchemaSnapshotKey (pathId. OwnerId , pathId. LocalPathId , tableSchemaVersion);
1761+ const auto key = TSchemaSnapshotKey (pathId, tableSchemaVersion);
16531762 SchemaSnapshotManager.AddSnapshot (txc.DB , key, TSchemaSnapshot (tableInfo, step, txId));
1763+
1764+ const auto & snapshots = SchemaSnapshotManager.GetSnapshots ();
1765+ for (auto it = snapshots.lower_bound (TSchemaSnapshotKey (pathId, 1 )); it != snapshots.end (); ++it) {
1766+ if (it->first == key) {
1767+ break ;
1768+ }
1769+ if (!SchemaSnapshotManager.HasReference (it->first )) {
1770+ ScheduleRemoveSchemaSnapshot (it->first );
1771+ }
1772+ }
16541773}
16551774
16561775void TDataShard::PersistLastLoanTableTid (NIceDb::TNiceDb& db, ui32 localTid) {
0 commit comments