Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 21 additions & 3 deletions ydb/core/tx/datashard/cdc_stream_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ class TDataShard::TTxCdcStreamScanProgress
TTxType GetTxType() const override { return TXTYPE_CDC_STREAM_SCAN_PROGRESS; }

bool Execute(TTransactionContext& txc, const TActorContext& ctx) override {
const auto& ev = *Request->Get();
auto& ev = *Request->Get();
const auto& tablePathId = ev.TablePathId;
const auto& streamPathId = ev.StreamPathId;
const auto& readVersion = ev.ReadVersion;
Expand All @@ -238,7 +238,25 @@ class TDataShard::TTxCdcStreamScanProgress
}

ChangeRecords.clear();
if (Self->CheckChangesQueueOverflow()) {

if (!ev.ReservationCookie) {
ev.ReservationCookie = Self->ReserveChangeQueueCapacity(ev.Rows.size());
}

if (!ev.ReservationCookie) {
LOG_I("Cannot reserve change queue capacity");
Reschedule = true;
return true;
}

if (Self->GetFreeChangeQueueCapacity(ev.ReservationCookie) < ev.Rows.size()) {
LOG_I("Not enough change queue capacity");
Reschedule = true;
return true;
}

if (Self->CheckChangesQueueOverflow(ev.ReservationCookie)) {
LOG_I("Change queue overflow");
Reschedule = true;
return true;
}
Expand Down Expand Up @@ -335,7 +353,7 @@ class TDataShard::TTxCdcStreamScanProgress
LOG_I("Enqueue " << ChangeRecords.size() << " change record(s)"
<< ": streamPathId# " << Request->Get()->StreamPathId);

Self->EnqueueChangeRecords(std::move(ChangeRecords));
Self->EnqueueChangeRecords(std::move(ChangeRecords), Request->Get()->ReservationCookie);
ctx.Send(Request->Sender, Response.Release());
} else if (Reschedule) {
LOG_I("Re-schedule progress tx"
Expand Down
63 changes: 57 additions & 6 deletions ydb/core/tx/datashard/datashard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -899,6 +899,13 @@ void TDataShard::RemoveChangeRecord(NIceDb::TNiceDb& db, ui64 order) {
}
}

if (auto rIt = ChangeQueueReservations.find(record.ReservationCookie); rIt != ChangeQueueReservations.end()) {
--ChangeQueueReservedCapacity;
if (!--rIt->second) {
ChangeQueueReservations.erase(rIt);
}
}

UpdateChangeExchangeLag(AppData()->TimeProvider->Now());
ChangesQueue.erase(it);

Expand All @@ -908,7 +915,7 @@ void TDataShard::RemoveChangeRecord(NIceDb::TNiceDb& db, ui64 order) {
CheckChangesQueueNoOverflow();
}

void TDataShard::EnqueueChangeRecords(TVector<IDataShardChangeCollector::TChange>&& records) {
void TDataShard::EnqueueChangeRecords(TVector<IDataShardChangeCollector::TChange>&& records, ui64 cookie) {
if (!records) {
return;
}
Expand All @@ -933,7 +940,7 @@ void TDataShard::EnqueueChangeRecords(TVector<IDataShardChangeCollector::TChange
auto res = ChangesQueue.emplace(
std::piecewise_construct,
std::forward_as_tuple(record.Order),
std::forward_as_tuple(record, now)
std::forward_as_tuple(record, now, cookie)
);
if (res.second) {
ChangesList.PushBack(&res.first->second);
Expand All @@ -956,6 +963,38 @@ void TDataShard::EnqueueChangeRecords(TVector<IDataShardChangeCollector::TChange
Send(OutChangeSender, new NChangeExchange::TEvChangeExchange::TEvEnqueueRecords(std::move(forward)));
}

ui32 TDataShard::GetFreeChangeQueueCapacity(ui64 cookie) {
const auto sizeLimit = AppData()->DataShardConfig.GetChangesQueueItemsLimit();
if (sizeLimit < ChangesQueue.size()) {
return 0;
}

const auto free = Min(sizeLimit - ChangesQueue.size(), Max(sizeLimit / 2, 1ul));

ui32 reserved = ChangeQueueReservedCapacity;
if (auto it = ChangeQueueReservations.find(cookie); it != ChangeQueueReservations.end()) {
reserved -= it->second;
}

if (free < reserved) {
return 0;
}

return free - reserved;
}

ui64 TDataShard::ReserveChangeQueueCapacity(ui32 capacity) {
const auto sizeLimit = AppData()->DataShardConfig.GetChangesQueueItemsLimit();
if (Max(sizeLimit / 2, 1ul) < ChangeQueueReservedCapacity) {
return 0;
}

const auto cookie = NextChangeQueueReservationCookie++;
ChangeQueueReservations.emplace(cookie, capacity);
ChangeQueueReservedCapacity += capacity;
return cookie;
}

void TDataShard::UpdateChangeExchangeLag(TInstant now) {
if (!ChangesList.Empty()) {
const auto* front = ChangesList.Front();
Expand Down Expand Up @@ -3391,19 +3430,31 @@ bool TDataShard::CheckTxNeedWait(const TEvDataShard::TEvProposeTransaction::TPtr
return false;
}

bool TDataShard::CheckChangesQueueOverflow() const {
bool TDataShard::CheckChangesQueueOverflow(ui64 cookie) const {
const auto* appData = AppData();
const auto sizeLimit = appData->DataShardConfig.GetChangesQueueItemsLimit();
const auto bytesLimit = appData->DataShardConfig.GetChangesQueueBytesLimit();
return ChangesQueue.size() >= sizeLimit || ChangesQueueBytes >= bytesLimit;

ui32 reserved = ChangeQueueReservedCapacity;
if (auto it = ChangeQueueReservations.find(cookie); it != ChangeQueueReservations.end()) {
reserved -= it->second;
}

return (ChangesQueue.size() + reserved) >= sizeLimit || ChangesQueueBytes >= bytesLimit;
}

void TDataShard::CheckChangesQueueNoOverflow() {
void TDataShard::CheckChangesQueueNoOverflow(ui64 cookie) {
if (OverloadSubscribersByReason[RejectReasonIndex(ERejectReason::ChangesQueueOverflow)]) {
const auto* appData = AppData();
const auto sizeLimit = appData->DataShardConfig.GetChangesQueueItemsLimit();
const auto bytesLimit = appData->DataShardConfig.GetChangesQueueBytesLimit();
if (ChangesQueue.size() < sizeLimit && ChangesQueueBytes < bytesLimit) {

ui32 reserved = ChangeQueueReservedCapacity;
if (auto it = ChangeQueueReservations.find(cookie); it != ChangeQueueReservations.end()) {
reserved -= it->second;
}

if ((ChangesQueue.size() + reserved) < sizeLimit && ChangesQueueBytes < bytesLimit) {
NotifyOverloadSubscribers(ERejectReason::ChangesQueueOverflow);
}
}
Expand Down
21 changes: 15 additions & 6 deletions ydb/core/tx/datashard/datashard_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,7 @@ class TDataShard
const TRowVersion ReadVersion;
const TVector<ui32> ValueTags;
TVector<std::pair<TSerializedCellVec, TSerializedCellVec>> Rows;
ui64 ReservationCookie = 0;
const TCdcStreamScanManager::TStats Stats;
};

Expand Down Expand Up @@ -1837,7 +1838,9 @@ class TDataShard
void MoveChangeRecord(NIceDb::TNiceDb& db, ui64 order, const TPathId& pathId);
void MoveChangeRecord(NIceDb::TNiceDb& db, ui64 lockId, ui64 lockOffset, const TPathId& pathId);
void RemoveChangeRecord(NIceDb::TNiceDb& db, ui64 order);
void EnqueueChangeRecords(TVector<IDataShardChangeCollector::TChange>&& records);
void EnqueueChangeRecords(TVector<IDataShardChangeCollector::TChange>&& records, ui64 cookie = 0);
ui32 GetFreeChangeQueueCapacity(ui64 cookie);
ui64 ReserveChangeQueueCapacity(ui32 capacity);
void UpdateChangeExchangeLag(TInstant now);
void CreateChangeSender(const TActorContext& ctx);
void KillChangeSender(const TActorContext& ctx);
Expand Down Expand Up @@ -1976,8 +1979,8 @@ class TDataShard
void WaitPredictedPlanStep(ui64 step);
void SchedulePlanPredictedTxs();

bool CheckChangesQueueOverflow() const;
void CheckChangesQueueNoOverflow();
bool CheckChangesQueueOverflow(ui64 cookie = 0) const;
void CheckChangesQueueNoOverflow(ui64 cookie = 0);

void DeleteReadIterator(TReadIteratorsMap::iterator it);
void CancelReadIterators(Ydb::StatusIds::StatusCode code, const TString& issue, const TActorContext& ctx);
Expand Down Expand Up @@ -2709,9 +2712,11 @@ class TDataShard
TInstant EnqueuedAt;
ui64 LockId;
ui64 LockOffset;
ui64 ReservationCookie;

explicit TEnqueuedRecord(ui64 bodySize, const TPathId& tableId,
ui64 schemaVersion, TInstant created, TInstant enqueued, ui64 lockId = 0, ui64 lockOffset = 0)
ui64 schemaVersion, TInstant created, TInstant enqueued,
ui64 lockId = 0, ui64 lockOffset = 0, ui64 cookie = 0)
: BodySize(bodySize)
, TableId(tableId)
, SchemaVersion(schemaVersion)
Expand All @@ -2720,12 +2725,13 @@ class TDataShard
, EnqueuedAt(enqueued)
, LockId(lockId)
, LockOffset(lockOffset)
, ReservationCookie(cookie)
{
}

explicit TEnqueuedRecord(const IDataShardChangeCollector::TChange& record, TInstant now)
explicit TEnqueuedRecord(const IDataShardChangeCollector::TChange& record, TInstant now, ui64 cookie)
: TEnqueuedRecord(record.BodySize, record.TableId, record.SchemaVersion, record.CreatedAt(), now,
record.LockId, record.LockOffset)
record.LockId, record.LockOffset, cookie)
{
}
};
Expand All @@ -2745,6 +2751,9 @@ class TDataShard
THashMap<ui64, TEnqueuedRecord> ChangesQueue; // ui64 is order
TIntrusiveList<TEnqueuedRecord, TEnqueuedRecordTag> ChangesList;
ui64 ChangesQueueBytes = 0;
THashMap<ui64, ui32> ChangeQueueReservations;
ui64 NextChangeQueueReservationCookie = 1;
ui32 ChangeQueueReservedCapacity = 0;
TActorId OutChangeSender;
bool OutChangeSenderSuspended = false;

Expand Down