Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions ydb/core/protos/data_events.proto
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,16 @@ message TKqpLocks {
Rollback = 3; // Rollback buffered changes and erase locks
}
optional ELocksOp Op = 4;

// An optional arbiter for readsets. When specified, all sending shards
// send the commit decision to a single arbiter shard, and all receiving
// shards wait for the commit decision from a single arbiter shard. The
// shard marked as the arbiter is responsible in aggregating all commit
// decisions from sending shards and sending the final commit decision to
// receiving shards.
// This may only be used with generic readsets without any other data and
// currently limited to volatile transactions.
optional uint64 ArbiterShard = 5;
}

message TTableId {
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/protos/tx_datashard.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1882,6 +1882,10 @@ message TTxVolatileDetails {

// When true all preceding transactions are dependencies
optional bool CommitOrdered = 8;

// When true marks an arbiter for other participants
// Arbiters hold outgoing readsets until the transaction is decided
optional bool IsArbiter = 9;
}

// Sent by datashard when some overload reason stopped being relevant
Expand Down
6 changes: 5 additions & 1 deletion ydb/core/tablet/tablet_pipe_client_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -183,13 +183,17 @@ namespace NTabletPipe {
}

private:
void MoveToPool(ui64 tabletId, const TClientCacheEntry& currentClient) {
void MoveToPool(ui64 tabletId, TClientCacheEntry& currentClient) {
TClientCacheEntry* insertedClient;
if (!PoolContainer->Insert(tabletId, currentClient, insertedClient)) {
Y_DEBUG_ABORT_UNLESS(!insertedClient->Client);
*insertedClient = currentClient;
}

// Note: client was moved to pool, make sure it's not closed by
// the eviction callback
currentClient.Client = {};

Container->Erase(tabletId);
}

Expand Down
37 changes: 37 additions & 0 deletions ydb/core/tx/datashard/datashard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,29 @@ void TDataShard::SendRegistrationRequestTimeCast(const TActorContext &ctx) {
}
}

class TDataShard::TSendArbiterReadSets final : public IVolatileTxCallback {
public:
TSendArbiterReadSets(TDataShard* self, TVector<THolder<TEvTxProcessing::TEvReadSet>>&& readSets)
: Self(self)
, ReadSets(std::move(readSets))
{}

void OnCommit(ui64) override {
// The transaction is persistent and committed
// Arbiter must now send its outgoing readsets
Self->SendReadSets(TActivationContext::ActorContextFor(Self->SelfId()), std::move(ReadSets));
}

void OnAbort(ui64) override {
// ReadSets are persistently replaced on abort and sent by volatile tx manager
// Previously generated readsets must be ignored
}

private:
TDataShard* Self;
TVector<THolder<TEvTxProcessing::TEvReadSet>> ReadSets;
};

void TDataShard::PrepareAndSaveOutReadSets(ui64 step,
ui64 txId,
const TMap<std::pair<ui64, ui64>, TString>& txOutReadSets,
Expand All @@ -450,6 +473,11 @@ void TDataShard::PrepareAndSaveOutReadSets(ui64 step,
if (txOutReadSets.empty())
return;

auto* info = VolatileTxManager.FindByTxId(txId);
if (info && !(info->IsArbiter && info->State != EVolatileTxState::Committed)) {
info = nullptr;
}

ui64 prevSeqno = NextSeqno;
for (auto& kv : txOutReadSets) {
ui64 source = kv.first.first;
Expand All @@ -459,12 +487,21 @@ void TDataShard::PrepareAndSaveOutReadSets(ui64 step,
ui64 seqno = NextSeqno++;
OutReadSets.SaveReadSet(db, seqno, step, rsKey, kv.second);
preparedRS.push_back(PrepareReadSet(step, txId, source, target, kv.second, seqno));
if (info) {
// ReadSet seqnos that must be replaced on abort
info->ArbiterReadSets.push_back(seqno);
}
}
}

if (NextSeqno != prevSeqno) {
PersistSys(db, Schema::Sys_NextSeqno, NextSeqno);
}

if (info) {
VolatileTxManager.AttachVolatileTxCallback(txId, new TSendArbiterReadSets(this, std::move(preparedRS)));
preparedRS.clear();
}
}

void TDataShard::SendDelayedAcks(const TActorContext& ctx, TVector<THolder<IEventHandle>>& delayedAcks) const {
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/datashard/datashard__init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,7 @@ bool TDataShard::TTxInit::ReadEverything(TTransactionContext &txc) {
if (!Self->VolatileTxManager.Load(db)) {
return false;
}
Self->OutReadSets.HoldArbiterReadSets();
}

if (Self->State != TShardState::Offline && txc.DB.GetScheme().GetTableInfo(Schema::CdcStreamScans::TableId)) {
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/datashard/datashard_common_upload.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ bool TCommonUploadOps<TEvRequest, TEvResponse>::Execute(TDataShard* self, TTrans
/* participants */ { },
groupProvider.GetCurrentChangeGroup(),
/* ordered */ false,
/* arbiter */ false,
txc);
// Note: transaction is already committed, no additional waiting needed
}
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/datashard/datashard_direct_erase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ TDirectTxErase::EStatus TDirectTxErase::CheckedExecute(
/* participants */ { },
groupProvider ? groupProvider->GetCurrentChangeGroup() : std::nullopt,
/* ordered */ false,
/* arbiter */ false,
*params.Txc);
// Note: transaction is already committed, no additional waiting needed
}
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/datashard/datashard_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,7 @@ class TDataShard

class TWaitVolatileDependencies;
class TSendVolatileResult;
class TSendArbiterReadSets;

struct TEvPrivate {
enum EEv {
Expand Down
25 changes: 25 additions & 0 deletions ydb/core/tx/datashard/datashard_kqp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -764,6 +764,14 @@ bool KqpValidateLocks(ui64 origin, TActiveTransaction* tx, TSysLocks& sysLocks)
return true;
}

bool KqpLocksHasArbiter(const NKikimrDataEvents::TKqpLocks* kqpLocks) {
return kqpLocks && kqpLocks->GetArbiterShard() != 0;
}

bool KqpLocksIsArbiter(ui64 tabletId, const NKikimrDataEvents::TKqpLocks* kqpLocks) {
return KqpLocksHasArbiter(kqpLocks) && kqpLocks->GetArbiterShard() == tabletId;
}

bool KqpValidateVolatileTx(ui64 origin, TActiveTransaction* tx, TSysLocks& sysLocks) {
auto& kqpLocks = tx->GetDataTx()->GetKqpLocks();

Expand All @@ -779,6 +787,9 @@ bool KqpValidateVolatileTx(ui64 origin, TActiveTransaction* tx, TSysLocks& sysLo
Y_ABORT_UNLESS(tx->OutReadSets().empty());
Y_ABORT_UNLESS(tx->AwaitingDecisions().empty());

const bool hasArbiter = KqpLocksHasArbiter(&kqpLocks);
const bool isArbiter = KqpLocksIsArbiter(origin, &kqpLocks);

// Note: usually all shards send locks, since they either have side effects or need to validate locks
// However it is technically possible to have pure-read shards, that don't contribute to the final decision
bool sendLocks = SendLocks(kqpLocks, origin);
Expand Down Expand Up @@ -808,6 +819,11 @@ bool KqpValidateVolatileTx(ui64 origin, TActiveTransaction* tx, TSysLocks& sysLo
continue;
}

if (hasArbiter && !isArbiter && dstTabletId != kqpLocks.GetArbiterShard()) {
// Non-arbiter shards only send locks to the arbiter
continue;
}

LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "Send commit decision from "
<< origin << " to " << dstTabletId);

Expand All @@ -821,6 +837,8 @@ bool KqpValidateVolatileTx(ui64 origin, TActiveTransaction* tx, TSysLocks& sysLo

tx->OutReadSets()[key] = std::move(bodyStr);
}
} else {
Y_ABORT_UNLESS(!isArbiter, "Arbiter is not in the sending shards set");
}

bool receiveLocks = ReceiveLocks(kqpLocks, origin);
Expand All @@ -833,6 +851,11 @@ bool KqpValidateVolatileTx(ui64 origin, TActiveTransaction* tx, TSysLocks& sysLo
continue;
}

if (hasArbiter && !isArbiter && srcTabletId != kqpLocks.GetArbiterShard()) {
// Non-arbiter shards only await decision from the arbiter
continue;
}

LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "Will wait for volatile decision from "
<< srcTabletId << " to " << origin);

Expand Down Expand Up @@ -891,6 +914,8 @@ bool KqpValidateVolatileTx(ui64 origin, TActiveTransaction* tx, TSysLocks& sysLo

return false;
}
} else {
Y_ABORT_UNLESS(!isArbiter, "Arbiter is not in the receiving shards set");
}

return true;
Expand Down
3 changes: 3 additions & 0 deletions ydb/core/tx/datashard/datashard_kqp.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ void KqpPrepareInReadsets(TInputOpData::TInReadSets& inReadSets,
bool KqpValidateLocks(ui64 origin, TActiveTransaction* tx, TSysLocks& sysLocks);
bool KqpValidateVolatileTx(ui64 origin, TActiveTransaction* tx, TSysLocks& sysLocks);

bool KqpLocksHasArbiter(const NKikimrDataEvents::TKqpLocks* kqpLocks);
bool KqpLocksIsArbiter(ui64 tabletId, const NKikimrDataEvents::TKqpLocks* kqpLocks);

void KqpEraseLocks(ui64 origin, TActiveTransaction* tx, TSysLocks& sysLocks);
void KqpCommitLocks(ui64 origin, TActiveTransaction* tx, const TRowVersion& writeVersion, TDataShard& dataShard);

Expand Down
81 changes: 72 additions & 9 deletions ydb/core/tx/datashard/datashard_outreadset.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ bool TOutReadSets::LoadReadSets(NIceDb::TNiceDb& db) {
// TODO[serxa]: this should be Range but it is not working right now
auto rowset = db.Table<Schema::OutReadSets>().GreaterOrEqual(0).Select<
Schema::OutReadSets::Seqno,
Schema::OutReadSets::Step,
Schema::OutReadSets::TxId,
Schema::OutReadSets::Origin,
Schema::OutReadSets::From,
Expand All @@ -27,12 +28,18 @@ bool TOutReadSets::LoadReadSets(NIceDb::TNiceDb& db) {
return false;
while (!rowset.EndOfSet()) {
ui64 seqNo = rowset.GetValue<Schema::OutReadSets::Seqno>();
ui64 step = rowset.GetValue<Schema::OutReadSets::Step>();
ui64 txId = rowset.GetValue<Schema::OutReadSets::TxId>();
ui64 origin = rowset.GetValue<Schema::OutReadSets::Origin>();
ui64 source = rowset.GetValue<Schema::OutReadSets::From>();
ui64 target = rowset.GetValue<Schema::OutReadSets::To>();

TReadSetKey rsInfo(txId, origin, source, target);
TReadSetInfo rsInfo;
rsInfo.TxId = txId;
rsInfo.Step = step;
rsInfo.Origin = origin;
rsInfo.From = source;
rsInfo.To = target;

Y_ABORT_UNLESS(!CurrentReadSets.contains(seqNo));
Y_ABORT_UNLESS(!CurrentReadSetInfos.contains(rsInfo));
Expand All @@ -48,26 +55,54 @@ bool TOutReadSets::LoadReadSets(NIceDb::TNiceDb& db) {
return true;
}

void TOutReadSets::SaveReadSet(NIceDb::TNiceDb& db, ui64 seqNo, ui64 step, const TReadSetKey& rsInfo, TString body) {
void TOutReadSets::SaveReadSet(NIceDb::TNiceDb& db, ui64 seqNo, ui64 step, const TReadSetKey& rsKey, const TString& body) {
using Schema = TDataShard::Schema;

Y_ABORT_UNLESS(!CurrentReadSets.contains(seqNo));
Y_ABORT_UNLESS(!CurrentReadSetInfos.contains(rsInfo));
Y_ABORT_UNLESS(!CurrentReadSetInfos.contains(rsKey));

CurrentReadSetInfos[rsInfo] = seqNo;
TReadSetInfo rsInfo(rsKey);
rsInfo.Step = step;

CurrentReadSetInfos[rsKey] = seqNo;
CurrentReadSets[seqNo] = rsInfo;

UpdateMonCounter();

db.Table<Schema::OutReadSets>().Key(seqNo).Update(
NIceDb::TUpdate<Schema::OutReadSets::Step>(step),
NIceDb::TUpdate<Schema::OutReadSets::Step>(rsInfo.Step),
NIceDb::TUpdate<Schema::OutReadSets::TxId>(rsInfo.TxId),
NIceDb::TUpdate<Schema::OutReadSets::Origin>(rsInfo.Origin),
NIceDb::TUpdate<Schema::OutReadSets::From>(rsInfo.From),
NIceDb::TUpdate<Schema::OutReadSets::To>(rsInfo.To),
NIceDb::TUpdate<Schema::OutReadSets::Body>(body));
}

void TOutReadSets::RemoveReadSet(NIceDb::TNiceDb& db, ui64 seqNo) {
using Schema = TDataShard::Schema;

db.Table<Schema::OutReadSets>().Key(seqNo).Delete();

auto it = CurrentReadSets.find(seqNo);
if (it != CurrentReadSets.end()) {
CurrentReadSetInfos.erase(it->second);
CurrentReadSets.erase(it);
}
}

TReadSetInfo TOutReadSets::ReplaceReadSet(NIceDb::TNiceDb& db, ui64 seqNo, const TString& body) {
using Schema = TDataShard::Schema;

auto it = CurrentReadSets.find(seqNo);
if (it != CurrentReadSets.end()) {
db.Table<Schema::OutReadSets>().Key(seqNo).Update(
NIceDb::TUpdate<Schema::OutReadSets::Body>(body));
return it->second;
} else {
return TReadSetInfo();
}
}

void TOutReadSets::AckForDeletedDestination(ui64 tabletId, ui64 seqNo, const TActorContext &ctx) {
const TReadSetKey* rsInfo = CurrentReadSets.FindPtr(seqNo);

Expand Down Expand Up @@ -113,8 +148,6 @@ void TOutReadSets::SaveAck(const TActorContext &ctx, TAutoPtr<TEvTxProcessing::T
}

void TOutReadSets::Cleanup(NIceDb::TNiceDb& db, const TActorContext& ctx) {
using Schema = TDataShard::Schema;

// Note that this code should be called only after no-more-reads to ensure we wont lost updates
for (TIntrusivePtr<TEvTxProcessing::TEvReadSetAck>& event : ReadSetAcks) {
TEvTxProcessing::TEvReadSetAck& ev = *event;
Expand All @@ -128,7 +161,7 @@ void TOutReadSets::Cleanup(NIceDb::TNiceDb& db, const TActorContext& ctx) {
"Deleted RS at %" PRIu64 " source %" PRIu64 " dest %" PRIu64 " consumer %" PRIu64 " seqno %" PRIu64" txId %" PRIu64,
Self->TabletID(), sender, dest, consumer, seqno, txId);

db.Table<Schema::OutReadSets>().Key(seqno).Delete();
RemoveReadSet(db, seqno);
Self->ResendReadSetPipeTracker.DetachTablet(seqno, ev.Record.GetTabletDest(), 0, ctx);
}
ReadSetAcks.clear();
Expand All @@ -140,14 +173,44 @@ void TOutReadSets::Cleanup(NIceDb::TNiceDb& db, const TActorContext& ctx) {
void TOutReadSets::ResendAll(const TActorContext& ctx) {
TPendingPipeTrackerCommands pendingPipeTrackerCommands;
for (const auto& rs : CurrentReadSets) {
if (rs.second.OnHold) {
continue;
}
ui64 seqNo = rs.first;
ui64 target = rs.second.To;
Self->ResendReadSetQueue.Progress(rs.first, ctx);
Self->ResendReadSetQueue.Progress(seqNo, ctx);
pendingPipeTrackerCommands.AttachTablet(seqNo, target);
}
pendingPipeTrackerCommands.Apply(Self->ResendReadSetPipeTracker, ctx);
}

void TOutReadSets::HoldArbiterReadSets() {
for (auto& rs : CurrentReadSets) {
const ui64& seqNo = rs.first;
const ui64& txId = rs.second.TxId;
auto* info = Self->VolatileTxManager.FindByTxId(txId);
if (info && info->IsArbiter && info->State != EVolatileTxState::Committed) {
info->ArbiterReadSets.push_back(seqNo);
info->IsArbiterOnHold = true;
rs.second.OnHold = true;
}
}
}

void TOutReadSets::ReleaseOnHoldReadSets(const std::vector<ui64>& seqNos, const TActorContext& ctx) {
TPendingPipeTrackerCommands pendingPipeTrackerCommands;
for (ui64 seqNo : seqNos) {
auto it = CurrentReadSets.find(seqNo);
if (it != CurrentReadSets.end() && it->second.OnHold) {
it->second.OnHold = false;
ui64 target = it->second.To;
Self->ResendReadSetQueue.Progress(seqNo, ctx);
pendingPipeTrackerCommands.AttachTablet(seqNo, target);
}
}
pendingPipeTrackerCommands.Apply(Self->ResendReadSetPipeTracker, ctx);
}

bool TOutReadSets::ResendRS(NTabletFlatExecutor::TTransactionContext &txc, const TActorContext &ctx, ui64 seqNo) {
using Schema = TDataShard::Schema;

Expand Down
Loading