Skip to content

Commit 6cf5aad

Browse files
committed
Fix test
1 parent ea10ee3 commit 6cf5aad

File tree

4 files changed

+13
-9
lines changed

4 files changed

+13
-9
lines changed

ydb/core/tx/datashard/datashard.cpp

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1060,7 +1060,6 @@ void TDataShard::RemoveChangeRecord(NIceDb::TNiceDb& db, ui64 order) {
10601060

10611061
auto it = ChangesQueue.find(order);
10621062
if (it == ChangesQueue.end()) {
1063-
Y_VERIFY_DEBUG_S(false, "Trying to remove non-enqueud record: " << order);
10641063
return;
10651064
}
10661065

@@ -1111,7 +1110,7 @@ void TDataShard::RemoveChangeRecord(NIceDb::TNiceDb& db, ui64 order) {
11111110
CheckChangesQueueNoOverflow();
11121111
}
11131112

1114-
void TDataShard::EnqueueChangeRecords(TVector<IDataShardChangeCollector::TChange>&& records, ui64 cookie) {
1113+
void TDataShard::EnqueueChangeRecords(TVector<IDataShardChangeCollector::TChange>&& records, ui64 cookie, bool afterMove) {
11151114
if (!records) {
11161115
return;
11171116
}
@@ -1131,10 +1130,13 @@ void TDataShard::EnqueueChangeRecords(TVector<IDataShardChangeCollector::TChange
11311130
const auto now = AppData()->TimeProvider->Now();
11321131
TVector<NChangeExchange::TEvChangeExchange::TEvEnqueueRecords::TRecordInfo> forward(Reserve(records.size()));
11331132
for (const auto& record : records) {
1134-
forward.emplace_back(record.Order, record.PathId, record.BodySize);
1135-
11361133
auto it = ChangesQueue.find(record.Order);
1137-
Y_ABORT_UNLESS(it != ChangesQueue.end());
1134+
if (it == ChangesQueue.end()) {
1135+
Y_ABORT_UNLESS(afterMove);
1136+
continue;
1137+
}
1138+
1139+
forward.emplace_back(record.Order, record.PathId, record.BodySize);
11381140

11391141
it->second.EnqueuedAt = now;
11401142
it->second.ReservationCookie = cookie;
@@ -1143,8 +1145,9 @@ void TDataShard::EnqueueChangeRecords(TVector<IDataShardChangeCollector::TChange
11431145
Y_ABORT_UNLESS(ChangesQueueBytes <= (Max<ui64>() - record.BodySize));
11441146
ChangesQueueBytes += record.BodySize;
11451147
}
1146-
1148+
11471149
if (auto it = ChangeQueueReservations.find(cookie); it != ChangeQueueReservations.end()) {
1150+
Y_ABORT_UNLESS(!afterMove);
11481151
ChangeQueueReservedCapacity -= it->second;
11491152
ChangeQueueReservedCapacity += records.size();
11501153
}

ydb/core/tx/datashard/datashard_impl.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1912,7 +1912,8 @@ class TDataShard
19121912
void MoveChangeRecord(NIceDb::TNiceDb& db, ui64 order, const TPathId& pathId);
19131913
void MoveChangeRecord(NIceDb::TNiceDb& db, ui64 lockId, ui64 lockOffset, const TPathId& pathId);
19141914
void RemoveChangeRecord(NIceDb::TNiceDb& db, ui64 order);
1915-
void EnqueueChangeRecords(TVector<IDataShardChangeCollector::TChange>&& records, ui64 cookie = 0);
1915+
// TODO(ilnaz): remove 'afterMove' after #6541
1916+
void EnqueueChangeRecords(TVector<IDataShardChangeCollector::TChange>&& records, ui64 cookie = 0, bool afterMove = false);
19161917
ui32 GetFreeChangeQueueCapacity(ui64 cookie);
19171918
ui64 ReserveChangeQueueCapacity(ui32 capacity);
19181919
void UpdateChangeExchangeLag(TInstant now);

ydb/core/tx/datashard/move_index_unit.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ class TMoveIndexUnit : public TExecutionUnit {
106106
void Complete(TOperation::TPtr, const TActorContext& ctx) override {
107107
DataShard.CreateChangeSender(ctx);
108108
DataShard.MaybeActivateChangeSender(ctx);
109-
DataShard.EnqueueChangeRecords(std::move(ChangeRecords));
109+
DataShard.EnqueueChangeRecords(std::move(ChangeRecords), 0, true);
110110
}
111111
};
112112

ydb/core/tx/datashard/move_table_unit.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ class TMoveTableUnit : public TExecutionUnit {
106106
void Complete(TOperation::TPtr, const TActorContext& ctx) override {
107107
DataShard.CreateChangeSender(ctx);
108108
DataShard.MaybeActivateChangeSender(ctx);
109-
DataShard.EnqueueChangeRecords(std::move(ChangeRecords));
109+
DataShard.EnqueueChangeRecords(std::move(ChangeRecords), 0, true);
110110
}
111111
};
112112

0 commit comments

Comments
 (0)