@@ -899,6 +899,13 @@ void TDataShard::RemoveChangeRecord(NIceDb::TNiceDb& db, ui64 order) {
899899 }
900900 }
901901
902+ if (auto rIt = ChangeQueueReservations.find (record.ReservationCookie ); rIt != ChangeQueueReservations.end ()) {
903+ --ChangeQueueReservedCapacity;
904+ if (!--rIt->second ) {
905+ ChangeQueueReservations.erase (rIt);
906+ }
907+ }
908+
902909 UpdateChangeExchangeLag (AppData ()->TimeProvider ->Now ());
903910 ChangesQueue.erase (it);
904911
@@ -908,7 +915,7 @@ void TDataShard::RemoveChangeRecord(NIceDb::TNiceDb& db, ui64 order) {
908915 CheckChangesQueueNoOverflow ();
909916}
910917
911- void TDataShard::EnqueueChangeRecords (TVector<IDataShardChangeCollector::TChange>&& records) {
918+ void TDataShard::EnqueueChangeRecords (TVector<IDataShardChangeCollector::TChange>&& records, ui64 cookie ) {
912919 if (!records) {
913920 return ;
914921 }
@@ -933,7 +940,7 @@ void TDataShard::EnqueueChangeRecords(TVector<IDataShardChangeCollector::TChange
933940 auto res = ChangesQueue.emplace (
934941 std::piecewise_construct,
935942 std::forward_as_tuple (record.Order ),
936- std::forward_as_tuple (record, now)
943+ std::forward_as_tuple (record, now, cookie )
937944 );
938945 if (res.second ) {
939946 ChangesList.PushBack (&res.first ->second );
@@ -956,6 +963,38 @@ void TDataShard::EnqueueChangeRecords(TVector<IDataShardChangeCollector::TChange
956963 Send (OutChangeSender, new NChangeExchange::TEvChangeExchange::TEvEnqueueRecords (std::move (forward)));
957964}
958965
966+ ui32 TDataShard::GetFreeChangeQueueCapacity (ui64 cookie) {
967+ const auto sizeLimit = AppData ()->DataShardConfig .GetChangesQueueItemsLimit ();
968+ if (sizeLimit < ChangesQueue.size ()) {
969+ return 0 ;
970+ }
971+
972+ const auto free = Min (sizeLimit - ChangesQueue.size (), Max (sizeLimit / 2 , 1ul ));
973+
974+ ui32 reserved = ChangeQueueReservedCapacity;
975+ if (auto it = ChangeQueueReservations.find (cookie); it != ChangeQueueReservations.end ()) {
976+ reserved -= it->second ;
977+ }
978+
979+ if (free < reserved) {
980+ return 0 ;
981+ }
982+
983+ return free - reserved;
984+ }
985+
986+ ui64 TDataShard::ReserveChangeQueueCapacity (ui32 capacity) {
987+ const auto sizeLimit = AppData ()->DataShardConfig .GetChangesQueueItemsLimit ();
988+ if (Max (sizeLimit / 2 , 1ul ) < ChangeQueueReservedCapacity) {
989+ return 0 ;
990+ }
991+
992+ const auto cookie = NextChangeQueueReservationCookie++;
993+ ChangeQueueReservations.emplace (cookie, capacity);
994+ ChangeQueueReservedCapacity += capacity;
995+ return cookie;
996+ }
997+
959998void TDataShard::UpdateChangeExchangeLag (TInstant now) {
960999 if (!ChangesList.Empty ()) {
9611000 const auto * front = ChangesList.Front ();
@@ -3391,19 +3430,31 @@ bool TDataShard::CheckTxNeedWait(const TEvDataShard::TEvProposeTransaction::TPtr
33913430 return false ;
33923431}
33933432
3394- bool TDataShard::CheckChangesQueueOverflow () const {
3433+ bool TDataShard::CheckChangesQueueOverflow (ui64 cookie ) const {
33953434 const auto * appData = AppData ();
33963435 const auto sizeLimit = appData->DataShardConfig .GetChangesQueueItemsLimit ();
33973436 const auto bytesLimit = appData->DataShardConfig .GetChangesQueueBytesLimit ();
3398- return ChangesQueue.size () >= sizeLimit || ChangesQueueBytes >= bytesLimit;
3437+
3438+ ui32 reserved = ChangeQueueReservedCapacity;
3439+ if (auto it = ChangeQueueReservations.find (cookie); it != ChangeQueueReservations.end ()) {
3440+ reserved -= it->second ;
3441+ }
3442+
3443+ return (ChangesQueue.size () + reserved) >= sizeLimit || ChangesQueueBytes >= bytesLimit;
33993444}
34003445
3401- void TDataShard::CheckChangesQueueNoOverflow () {
3446+ void TDataShard::CheckChangesQueueNoOverflow (ui64 cookie ) {
34023447 if (OverloadSubscribersByReason[RejectReasonIndex (ERejectReason::ChangesQueueOverflow)]) {
34033448 const auto * appData = AppData ();
34043449 const auto sizeLimit = appData->DataShardConfig .GetChangesQueueItemsLimit ();
34053450 const auto bytesLimit = appData->DataShardConfig .GetChangesQueueBytesLimit ();
3406- if (ChangesQueue.size () < sizeLimit && ChangesQueueBytes < bytesLimit) {
3451+
3452+ ui32 reserved = ChangeQueueReservedCapacity;
3453+ if (auto it = ChangeQueueReservations.find (cookie); it != ChangeQueueReservations.end ()) {
3454+ reserved -= it->second ;
3455+ }
3456+
3457+ if ((ChangesQueue.size () + reserved) < sizeLimit && ChangesQueueBytes < bytesLimit) {
34073458 NotifyOverloadSubscribers (ERejectReason::ChangesQueueOverflow);
34083459 }
34093460 }
0 commit comments