Skip to content

Commit ea10ee3

Browse files
committed
Fix review issues
1 parent 5371fa7 commit ea10ee3

File tree

2 files changed

+42
-10
lines changed

2 files changed

+42
-10
lines changed

ydb/core/tx/datashard/datashard.cpp

Lines changed: 27 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -864,19 +864,36 @@ void TDataShard::PersistChangeRecord(NIceDb::TNiceDb& db, const TChangeRecord& r
864864
TSchemaSnapshotKey(res.first->second.TableId, res.first->second.SchemaVersion));
865865
}
866866

867-
db.GetDatabase().OnRollback([this, order = record.GetOrder()] {
868-
auto it = ChangesQueue.find(order);
869-
Y_VERIFY_S(it != ChangesQueue.end(), "Cannot find change record: " << order);
867+
const auto key = TCommittingChangeRecordsKey::FromRecord(record);
868+
if (!CommittingChangeRecords.contains(key)) {
869+
db.GetDatabase().OnCommit([this, key] {
870+
auto it = CommittingChangeRecords.find(key);
871+
Y_ABORT_UNLESS(it != CommittingChangeRecords.end());
872+
CommittingChangeRecords.erase(it);
873+
});
874+
db.GetDatabase().OnRollback([this, key] {
875+
auto it = CommittingChangeRecords.find(key);
876+
Y_ABORT_UNLESS(it != CommittingChangeRecords.end());
877+
878+
for (const auto order : it->second) {
879+
auto cIt = ChangesQueue.find(order);
880+
Y_VERIFY_S(cIt != ChangesQueue.end(), "Cannot find change record: " << order);
881+
882+
if (cIt->second.SchemaSnapshotAcquired) {
883+
const auto snapshotKey = TSchemaSnapshotKey(cIt->second.TableId, cIt->second.SchemaVersion);
884+
if (const auto last = SchemaSnapshotManager.ReleaseReference(snapshotKey)) {
885+
ScheduleRemoveSchemaSnapshot(snapshotKey);
886+
}
887+
}
870888

871-
if (it->second.SchemaSnapshotAcquired) {
872-
const auto snapshotKey = TSchemaSnapshotKey(it->second.TableId, it->second.SchemaVersion);
873-
if (const auto last = SchemaSnapshotManager.ReleaseReference(snapshotKey)) {
874-
ScheduleRemoveSchemaSnapshot(snapshotKey);
889+
ChangesQueue.erase(cIt);
875890
}
876-
}
877891

878-
ChangesQueue.erase(it);
879-
});
892+
CommittingChangeRecords.erase(it);
893+
});
894+
}
895+
896+
CommittingChangeRecords[key].push_back(record.GetOrder());
880897
} else {
881898
auto& state = LockChangeRecords[lockId];
882899
Y_ABORT_UNLESS(state.Changes.empty() || state.Changes.back().LockOffset < record.GetLockOffset(),

ydb/core/tx/datashard/datashard_impl.h

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2861,6 +2861,20 @@ class TDataShard
28612861
bool OutChangeSenderSuspended = false;
28622862
THolder<IChangeRecordSerializer> ChangeRecordDebugSerializer;
28632863

2864+
struct TCommittingChangeRecordsKey: std::tuple<ui64, ui64, ui64> {
2865+
explicit TCommittingChangeRecordsKey(ui64 group, ui64 step, ui64 txId)
2866+
: std::tuple<ui64, ui64, ui64>(group, step, txId)
2867+
{}
2868+
2869+
static TCommittingChangeRecordsKey FromRecord(const TChangeRecord& record) {
2870+
return TCommittingChangeRecordsKey(record.GetGroup(), record.GetStep(), record.GetTxId());
2871+
}
2872+
2873+
explicit operator size_t() const noexcept {
2874+
return THash<std::tuple<ui64, ui64, ui64>>()(*this);
2875+
}
2876+
};
2877+
28642878
struct TUncommittedLockChangeRecords {
28652879
TVector<IDataShardChangeCollector::TChange> Changes;
28662880
size_t PersistentCount = 0;
@@ -2876,6 +2890,7 @@ class TDataShard
28762890
size_t Count = 0;
28772891
};
28782892

2893+
THashMap<TCommittingChangeRecordsKey, TVector<ui64>> CommittingChangeRecords;
28792894
THashMap<ui64, TUncommittedLockChangeRecords> LockChangeRecords; // ui64 is lock id
28802895
THashMap<ui64, TCommittedLockChangeRecords> CommittedLockChangeRecords; // ui64 is lock id
28812896
TVector<ui64> PendingLockChangeRecordsToRemove;

0 commit comments

Comments
 (0)