Skip to content

Add per-tx inflight counter to tablets and fix follower per-tx counters #15575

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions ydb/core/protos/counters.proto
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ extend google.protobuf.FileOptions {
//
enum ETxTypeSimpleCounters {
COUNTER_TT_SIMPLE_IGNORE = 0;

COUNTER_TT_INFLY = 1 [(CounterOpts) = {Name: "InFly"}];
}

enum ETxTypeCumulativeCounters {
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/tablet_flat/flat_exec_seat.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ namespace NTabletFlatExecutor {
TSeat(ui32 uniqId, TAutoPtr<ITransaction> self)
: UniqID(uniqId)
, Self(self)
, TxType(Self->GetTxType())
{
}

Expand Down Expand Up @@ -69,6 +70,7 @@ namespace NTabletFlatExecutor {

const ui64 UniqID;
const TAutoPtr<ITransaction> Self;
const TTxType TxType;
NWilson::TSpan WaitingSpan;
ui64 Retries = 0;
TPinned Pinned;
Expand Down
17 changes: 13 additions & 4 deletions ydb/core/tablet_flat/flat_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,7 @@ void TExecutor::Active(const TActorContext &ctx) {
CompactionLogic = THolder<TCompactionLogic>(new TCompactionLogic(MemTableMemoryConsumersCollection.Get(), Logger.Get(), Broker.Get(), this, loadedState->Comp,
Sprintf("tablet-%" PRIu64, Owner->TabletID())));
DataCleanupLogic = MakeHolder<TDataCleanupLogic>(static_cast<NActors::IActorOps*>(this), this, Owner, Logger.Get(), GcLogic.Get());
LogicRedo->InstallCounters(Counters.Get(), nullptr);
LogicRedo->InstallCounters(Counters.Get(), AppTxCounters);

ResourceMetrics = MakeHolder<NMetrics::TResourceMetrics>(Owner->TabletID(), 0, Launcher);

Expand Down Expand Up @@ -567,6 +567,9 @@ void TExecutor::FinishCancellation(TSeat* seat, bool activateMore) {
Memory->ReleaseMemory(*seat);
--Stats->TxInFly;
Counters->Simple()[TExecutorCounters::DB_TX_IN_FLY] = Stats->TxInFly;
if (AppTxCounters && seat->TxType != UnknownTxType) {
AppTxCounters->TxSimple(seat->TxType, COUNTER_TT_INFLY) -= 1;
}
RemoveTransaction(seat->UniqID);
if (activateMore) {
PlanTransactionActivation();
Expand Down Expand Up @@ -1746,6 +1749,9 @@ ui64 TExecutor::DoExecute(TAutoPtr<ITransaction> self, ETxMode mode) {

++Stats->TxInFly;
Counters->Simple()[TExecutorCounters::DB_TX_IN_FLY] = Stats->TxInFly;
if (AppTxCounters && seat->TxType != UnknownTxType) {
AppTxCounters->TxSimple(seat->TxType, COUNTER_TT_INFLY) += 1;
}
Counters->Cumulative()[TExecutorCounters::TX_COUNT_ALL].Increment(1); //Deprecated
Counters->Cumulative()[TExecutorCounters::TX_QUEUED].Increment(1);

Expand Down Expand Up @@ -2005,7 +2011,7 @@ void TExecutor::PostponeTransaction(TSeat* seat, TPageCollectionTxEnv &env,
TAutoPtr<NTable::TChange> change,
THPTimer &bookkeepingTimer)
{
TTxType txType = seat->Self->GetTxType();
TTxType txType = seat->TxType;

ui32 touchedPages = 0;
ui32 newPinnedPages = 0;
Expand Down Expand Up @@ -2164,7 +2170,7 @@ void TExecutor::PostponeTransaction(TSeat* seat, TPageCollectionTxEnv &env,
void TExecutor::CommitTransactionLog(std::unique_ptr<TSeat> seat, TPageCollectionTxEnv &env,
TAutoPtr<NTable::TChange> change, THPTimer &bookkeepingTimer) {
const bool isReadOnly = !(change->HasAny() || env.HasChanges());
const TTxType txType = seat->Self->GetTxType();
const TTxType txType = seat->TxType;

size_t touchedBlocks = PrivatePageCache->GetStats().CurrentCacheHits;
Counters->Percentile()[TExecutorCounters::TX_PERCENTILE_TOUCHED_BLOCKS].IncrementFor(touchedBlocks);
Expand Down Expand Up @@ -2192,6 +2198,7 @@ void TExecutor::CommitTransactionLog(std::unique_ptr<TSeat> seat, TPageCollectio
const double currentExecTime = seat->CPUExecTime;

if (isReadOnly) {
// Note: per-tx InFly is decremented in txloglogic
if (Stats->IsFollower()) {
// todo: extract completion counters from txloglogic
--Stats->TxInFly;
Expand Down Expand Up @@ -3093,6 +3100,7 @@ void TExecutor::Handle(TEvTablet::TEvCommitResult::TPtr &ev, const TActorContext
switch (cookie) {
case ECommit::Redo:
{
// Note: per-tx InFly is decrememnted in txloglogic
const ui64 confirmedTransactions = LogicRedo->Confirm(step, ctx, OwnerActorId);
Stats->TxInFly -= confirmedTransactions;
Counters->Simple()[TExecutorCounters::DB_TX_IN_FLY] = Stats->TxInFly;
Expand Down Expand Up @@ -4387,8 +4395,9 @@ void TExecutor::RegisterExternalTabletCounters(TAutoPtr<TTabletCountersBase> app
AppCountersBaseline = MakeHolder<TTabletCountersBase>();
AppCounters->RememberCurrentStateAsBaseline(*AppCountersBaseline);

AppTxCounters = dynamic_cast<TTabletCountersWithTxTypes*>(AppCounters.Get());

if (LogicRedo) {
AppTxCounters = dynamic_cast<TTabletCountersWithTxTypes*>(AppCounters.Get());
LogicRedo->InstallCounters(Counters.Get(), AppTxCounters);
}
}
Expand Down
12 changes: 8 additions & 4 deletions ydb/core/tablet_flat/flat_executor_txloglogic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ TArrayRef<const NRedo::TUsage> TLogicRedo::GrabLogUsage() const noexcept
}

void CompleteRoTransaction(std::unique_ptr<TSeat> seat, const TActorContext &ownerCtx, TExecutorCounters *counters, TTabletCountersWithTxTypes *appTxCounters) {
const TTxType txType = seat->Self->GetTxType();
const TTxType txType = seat->TxType;

const ui64 latencyus = ui64(1000000. * seat->LatencyTimer.Passed());
counters->Percentile()[TExecutorCounters::TX_PERCENTILE_LATENCY_RO].IncrementFor(latencyus);
Expand All @@ -73,11 +73,13 @@ void CompleteRoTransaction(std::unique_ptr<TSeat> seat, const TActorContext &own
if (Y_UNLIKELY(seat->IsTerminated())) {
counters->Cumulative()[TExecutorCounters::TX_TERMINATED].Increment(1);
if (appTxCounters && txType != UnknownTxType) {
appTxCounters->TxSimple(txType, COUNTER_TT_INFLY) -= 1;
appTxCounters->TxCumulative(txType, COUNTER_TT_TERMINATED).Increment(1);
}
} else {
counters->Cumulative()[TExecutorCounters::TX_RO_COMPLETED].Increment(1);
if (appTxCounters && txType != UnknownTxType) {
appTxCounters->TxSimple(txType, COUNTER_TT_INFLY) -= 1;
appTxCounters->TxCumulative(txType, COUNTER_TT_RO_COMPLETED).Increment(1);
}
}
Expand Down Expand Up @@ -129,7 +131,7 @@ TLogicRedo::TCommitRWTransactionResult TLogicRedo::CommitRWTransaction(

Y_ABORT_UNLESS(force || !(change.Scheme || change.Annex));

const TTxType txType = seat->Self->GetTxType();
const TTxType txType = seat->TxType;

if (auto bytes = change.Redo.size()) {
Counters->Cumulative()[TMonCo::DB_REDO_WRITTEN_BYTES].Increment(bytes);
Expand Down Expand Up @@ -266,7 +268,7 @@ ui64 TLogicRedo::Confirm(ui32 step, const TActorContext &ctx, const TActorId &ow

std::unique_ptr<TSeat> seat{entry.Transactions.PopFront()};

const TTxType txType = seat->Self->GetTxType();
const TTxType txType = seat->TxType;
const ui64 commitLatencyus = ui64(1000000. * seat->CommitTimer.Passed());
const ui64 latencyus = ui64(1000000. * seat->LatencyTimer.Passed());
Counters->Percentile()[TExecutorCounters::TX_PERCENTILE_LATENCY_COMMIT].IncrementFor(commitLatencyus);
Expand All @@ -281,8 +283,10 @@ ui64 TLogicRedo::Confirm(ui32 step, const TActorContext &ctx, const TActorId &ow
const ui64 completeTimeus = ui64(1000000. * completeTimer.Passed());

Counters->Cumulative()[TExecutorCounters::TX_RW_COMPLETED].Increment(1);
if (AppTxCounters && txType != UnknownTxType)
if (AppTxCounters && txType != UnknownTxType) {
AppTxCounters->TxSimple(txType, COUNTER_TT_INFLY) -= 1;
AppTxCounters->TxCumulative(txType, COUNTER_TT_RW_COMPLETED).Increment(1);
}
Counters->Percentile()[TExecutorCounters::TX_PERCENTILE_COMMITED_CPUTIME].IncrementFor(completeTimeus);
Counters->Cumulative()[TExecutorCounters::CONSUMED_CPU].Increment(completeTimeus);
if (AppTxCounters && txType != UnknownTxType)
Expand Down
Loading