Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions ydb/core/tx/datashard/datashard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3582,7 +3582,10 @@ void TDataShard::Handle(TEvPipeCache::TEvDeliveryProblem::TPtr& ev, const TActor
}

void TDataShard::AckRSToDeletedTablet(ui64 tabletId, TPersistentTablet& state, const TActorContext& ctx) {
for (ui64 seqno : state.OutReadSets) {
auto seqnos = std::move(state.OutReadSets);
state.OutReadSets.clear();

for (ui64 seqno : seqnos) {
LOG_DEBUG(ctx, NKikimrServices::TX_DATASHARD, "Pipe reset to dead tablet %" PRIu64 " caused ack of readset %" PRIu64
" at tablet %" PRIu64, tabletId, seqno, TabletID());

Expand All @@ -3595,7 +3598,6 @@ void TDataShard::AckRSToDeletedTablet(ui64 tabletId, TPersistentTablet& state, c
PlanQueue.Progress(ctx);
}
}
state.OutReadSets.clear();

if (OutReadSets.HasExpectations(tabletId)) {
AbortExpectationsFromDeletedTablet(tabletId, OutReadSets.RemoveExpectations(tabletId));
Expand All @@ -3612,12 +3614,10 @@ void TDataShard::AbortExpectationsFromDeletedTablet(ui64 tabletId, THashMap<ui64
}

void TDataShard::RestartPipeRS(ui64 tabletId, TPersistentTablet& state, const TActorContext& ctx) {
for (auto seqno : state.OutReadSets) {
if (seqno == Max<ui64>()) {
OutReadSets.ResendExpectations(tabletId, ctx);
continue;
}
auto seqnos = std::move(state.OutReadSets);
state.OutReadSets.clear();

for (auto seqno : seqnos) {
LOG_DEBUG(ctx, NKikimrServices::TX_DATASHARD, "Pipe reset to tablet %" PRIu64 " caused resend of readset %" PRIu64
" at tablet %" PRIu64, tabletId, seqno, TabletID());

Expand Down
103 changes: 65 additions & 38 deletions ydb/core/tx/datashard/datashard_outreadset.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
namespace NKikimr {
namespace NDataShard {

static constexpr size_t SmallReadSetCacheLimit = 8;

void TOutReadSets::UpdateMonCounter() const {
Self->SetCounter(COUNTER_OUT_READSETS_IN_FLIGHT, CurrentReadSets.size());
}
Expand All @@ -14,7 +16,7 @@ bool TOutReadSets::LoadReadSets(NIceDb::TNiceDb& db) {
using Schema = TDataShard::Schema;

CurrentReadSets.clear(); // For idempotency
CurrentReadSetInfos.clear();
CurrentReadSetKeys.clear();

// TODO[serxa]: this should be Range but it is not working right now
auto rowset = db.Table<Schema::OutReadSets>().GreaterOrEqual(0).Select<
Expand All @@ -23,7 +25,8 @@ bool TOutReadSets::LoadReadSets(NIceDb::TNiceDb& db) {
Schema::OutReadSets::TxId,
Schema::OutReadSets::Origin,
Schema::OutReadSets::From,
Schema::OutReadSets::To>();
Schema::OutReadSets::To,
Schema::OutReadSets::Body>();
if (!rowset.IsReady())
return false;
while (!rowset.EndOfSet()) {
Expand All @@ -33,19 +36,22 @@ bool TOutReadSets::LoadReadSets(NIceDb::TNiceDb& db) {
ui64 origin = rowset.GetValue<Schema::OutReadSets::Origin>();
ui64 source = rowset.GetValue<Schema::OutReadSets::From>();
ui64 target = rowset.GetValue<Schema::OutReadSets::To>();
TString body = rowset.GetValue<Schema::OutReadSets::Body>();

TReadSetInfo rsInfo;
rsInfo.TxId = txId;
rsInfo.Step = step;
rsInfo.Origin = origin;
rsInfo.From = source;
rsInfo.To = target;
// Cache it regardless of size, since we're going to send it soon
rsInfo.Body = std::move(body);

Y_ABORT_UNLESS(!CurrentReadSets.contains(seqNo));
Y_ABORT_UNLESS(!CurrentReadSetInfos.contains(rsInfo));
Y_ABORT_UNLESS(!CurrentReadSetKeys.contains(rsInfo));

CurrentReadSets[seqNo] = rsInfo;
CurrentReadSetInfos[rsInfo] = seqNo;
CurrentReadSetKeys[rsInfo] = seqNo;
CurrentReadSets[seqNo] = std::move(rsInfo);

if (!rowset.Next())
return false;
Expand All @@ -59,15 +65,13 @@ void TOutReadSets::SaveReadSet(NIceDb::TNiceDb& db, ui64 seqNo, ui64 step, const
using Schema = TDataShard::Schema;

Y_ABORT_UNLESS(!CurrentReadSets.contains(seqNo));
Y_ABORT_UNLESS(!CurrentReadSetInfos.contains(rsKey));
Y_ABORT_UNLESS(!CurrentReadSetKeys.contains(rsKey));

TReadSetInfo rsInfo(rsKey);
rsInfo.Step = step;

CurrentReadSetInfos[rsKey] = seqNo;
CurrentReadSets[seqNo] = rsInfo;

UpdateMonCounter();
if (body.size() <= SmallReadSetCacheLimit) {
rsInfo.Body = body;
}

db.Table<Schema::OutReadSets>().Key(seqNo).Update(
NIceDb::TUpdate<Schema::OutReadSets::Step>(rsInfo.Step),
Expand All @@ -76,6 +80,11 @@ void TOutReadSets::SaveReadSet(NIceDb::TNiceDb& db, ui64 seqNo, ui64 step, const
NIceDb::TUpdate<Schema::OutReadSets::From>(rsInfo.From),
NIceDb::TUpdate<Schema::OutReadSets::To>(rsInfo.To),
NIceDb::TUpdate<Schema::OutReadSets::Body>(body));

CurrentReadSetKeys[rsKey] = seqNo;
CurrentReadSets[seqNo] = std::move(rsInfo);

UpdateMonCounter();
}

void TOutReadSets::RemoveReadSet(NIceDb::TNiceDb& db, ui64 seqNo) {
Expand All @@ -85,7 +94,7 @@ void TOutReadSets::RemoveReadSet(NIceDb::TNiceDb& db, ui64 seqNo) {

auto it = CurrentReadSets.find(seqNo);
if (it != CurrentReadSets.end()) {
CurrentReadSetInfos.erase(it->second);
CurrentReadSetKeys.erase(it->second);
CurrentReadSets.erase(it);
}
}
Expand All @@ -97,14 +106,19 @@ TReadSetInfo TOutReadSets::ReplaceReadSet(NIceDb::TNiceDb& db, ui64 seqNo, const
if (it != CurrentReadSets.end()) {
db.Table<Schema::OutReadSets>().Key(seqNo).Update(
NIceDb::TUpdate<Schema::OutReadSets::Body>(body));
if (body.size() <= SmallReadSetCacheLimit) {
it->second.Body = body;
} else {
it->second.Body.reset();
}
return it->second;
} else {
return TReadSetInfo();
}
}

void TOutReadSets::AckForDeletedDestination(ui64 tabletId, ui64 seqNo, const TActorContext &ctx) {
const TReadSetKey* rsInfo = CurrentReadSets.FindPtr(seqNo);
const TReadSetKey* rsInfo = CurrentReadSets.FindPtr(seqNo);

if (!rsInfo) {
LOG_DEBUG(ctx, NKikimrServices::TX_DATASHARD,
Expand Down Expand Up @@ -136,14 +150,18 @@ void TOutReadSets::SaveAck(const TActorContext &ctx, TAutoPtr<TEvTxProcessing::T
Self->TabletID(), sender, dest, consumer, txId);

ReadSetAcks.emplace_back(ev.Release());
AckedSeqno.insert(seqno);

if (CurrentReadSets.contains(seqno)) {
TReadSetKey rsInfo(txId, Self->TabletID(), sender, dest);
Y_ABORT_UNLESS(CurrentReadSetInfos[rsInfo] == seqno);
TReadSetKey rsKey(txId, Self->TabletID(), sender, dest);
Y_ABORT_UNLESS(CurrentReadSetKeys[rsKey] == seqno);

CurrentReadSetKeys.erase(rsKey);
CurrentReadSets.erase(seqno);
CurrentReadSetInfos.erase(rsInfo);
}

// We don't need to resend this readset anymore
if (auto it = Self->PersistentTablets.find(dest); it != Self->PersistentTablets.end()) {
it->second.OutReadSets.erase(seqno);
}
}

Expand All @@ -162,15 +180,8 @@ void TOutReadSets::Cleanup(NIceDb::TNiceDb& db, const TActorContext& ctx) {
Self->TabletID(), sender, dest, consumer, seqno, txId);

RemoveReadSet(db, seqno);

if (auto it = Self->PersistentTablets.find(ev.Record.GetTabletDest());
it != Self->PersistentTablets.end())
{
it->second.OutReadSets.erase(seqno);
}
}
ReadSetAcks.clear();
AckedSeqno.clear();

UpdateMonCounter();
}
Expand Down Expand Up @@ -210,28 +221,44 @@ void TOutReadSets::ReleaseOnHoldReadSets(const std::vector<ui64>& seqNos, const

bool TOutReadSets::ResendRS(NTabletFlatExecutor::TTransactionContext &txc, const TActorContext &ctx, ui64 seqNo) {
using Schema = TDataShard::Schema;

NIceDb::TNiceDb db(txc.DB);
if (AckedSeqno.contains(seqNo)) {

auto* info = CurrentReadSets.FindPtr(seqNo);
if (!info) {
// Do not resend if we've already got ACK back, but not applied it to DB
// Also, it is a good place to actually apply ACK(s)

txc.DB.NoMoreReadsForTx();
Cleanup(db, ctx);
return true;
}

auto rowset = db.Table<Schema::OutReadSets>().Key(seqNo).Select();
if (!rowset.IsReady())
return false;
if (!rowset.IsValid())
return true;

ui64 step = rowset.GetValue<Schema::OutReadSets::Step>();
ui64 txId = rowset.GetValue<Schema::OutReadSets::TxId>();
ui64 from = rowset.GetValue<Schema::OutReadSets::From>();
ui64 to = rowset.GetValue<Schema::OutReadSets::To>();
TString body = rowset.GetValue<Schema::OutReadSets::Body>();
ui64 step = info->Step;
ui64 txId = info->TxId;
ui64 from = info->From;
ui64 to = info->To;
TString body;

if (info->Body) {
// We have readset body cached
if (info->Body->size() <= SmallReadSetCacheLimit) {
body = *info->Body;
} else {
// Don't keep it in memory while in transit
body = std::move(*info->Body);
info->Body.reset();
}
} else {
auto rowset = db.Table<Schema::OutReadSets>().Key(seqNo).Select();
if (!rowset.IsReady())
return false;
if (!rowset.IsValid())
return true;
body = rowset.GetValue<Schema::OutReadSets::Body>();
if (body.size() <= SmallReadSetCacheLimit) {
// Cache small readset body
info->Body = body;
}
}

txc.DB.NoMoreReadsForTx();

Expand Down
6 changes: 3 additions & 3 deletions ydb/core/tx/datashard/datashard_outreadset.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ struct TReadSetKey {
struct TReadSetInfo : TReadSetKey {
ui64 Step = 0;
bool OnHold = false;
std::optional<TString> Body;

TReadSetInfo() = default;

Expand Down Expand Up @@ -74,7 +75,7 @@ class TOutReadSets {

bool Empty() const { return CurrentReadSets.empty() && Expectations.empty(); }
bool HasAcks() const { return ! ReadSetAcks.empty(); }
bool Has(const TReadSetKey& rsKey) const { return CurrentReadSetInfos.contains(rsKey); }
bool Has(const TReadSetKey& rsKey) const { return CurrentReadSetKeys.contains(rsKey); }

ui64 CountReadSets() const { return CurrentReadSets.size(); }
ui64 CountAcks() const { return ReadSetAcks.size(); }
Expand Down Expand Up @@ -109,8 +110,7 @@ class TOutReadSets {
private:
TDataShard * Self;
THashMap<ui64, TReadSetInfo> CurrentReadSets; // SeqNo -> Info
THashMap<TReadSetKey, ui64> CurrentReadSetInfos; // Info -> SeqNo
THashSet<ui64> AckedSeqno;
THashMap<TReadSetKey, ui64> CurrentReadSetKeys; // Key -> SeqNo
TVector<TIntrusivePtr<TEvTxProcessing::TEvReadSetAck>> ReadSetAcks;
// Target -> TxId -> Step
THashMap<ui64, THashMap<ui64, ui64>> Expectations;
Expand Down
Loading