Skip to content

Commit 58867ee

Browse files
committed
KIKIMR-20217 Initial datashard tracing
1 parent 4362647 commit 58867ee

20 files changed

+187
-109
lines changed

ydb/core/kqp/runtime/kqp_read_actor.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -895,8 +895,11 @@ class TKqpReadActor : public TActorBootstrapped<TKqpReadActor>, public NYql::NDq
895895

896896
Counters->CreatedIterators->Inc();
897897
ReadIdByTabletId[state->TabletId].push_back(id);
898+
899+
NWilson::TTraceId traceId; // TODO: get traceId from kqp.
900+
898901
Send(PipeCacheId, new TEvPipeCache::TEvForward(ev.Release(), state->TabletId, true),
899-
IEventHandle::FlagTrackDelivery);
902+
IEventHandle::FlagTrackDelivery, 0, std::move(traceId));
900903

901904
if (!FirstShardStarted) {
902905
state->IsFirst = true;

ydb/core/tablet_flat/flat_exec_seat.h

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,10 @@ namespace NTabletFlatExecutor {
2020
TSeat(ui32 uniqId, TAutoPtr<ITransaction> self, NWilson::TTraceId txTraceId)
2121
: UniqID(uniqId)
2222
, Self(self)
23-
, TxSpan(NWilson::TSpan(TWilsonTablet::Tablet, std::move(txTraceId), "Tablet.Transaction"))
2423
{
25-
24+
if (txTraceId) {
25+
SetupTxSpan(std::move(txTraceId));
26+
}
2627
}
2728

2829
void Describe(IOutputStream &out) const noexcept
@@ -36,7 +37,16 @@ namespace NTabletFlatExecutor {
3637

3738
void Terminate(ETerminationReason reason, const TActorContext& ctx) noexcept;
3839

39-
void CreateEnqueuedSpan() noexcept {
40+
void SetupTxSpan(NWilson::TTraceId txTraceId) noexcept {
41+
TxSpan = NWilson::TSpan(TWilsonTablet::Tablet, std::move(txTraceId), "Tablet.Transaction");
42+
TxSpan.Attribute("Type", TypeName(*Self));
43+
}
44+
45+
NWilson::TSpan CreateExecutionSpan() noexcept {
46+
return NWilson::TSpan(TWilsonTablet::Tablet, TxSpan.GetTraceId(), "Tablet.Transaction.Execute");
47+
}
48+
49+
void StartEnqueuedSpan() noexcept {
4050
EnqueuedSpan = NWilson::TSpan(TWilsonTablet::Tablet, TxSpan.GetTraceId(), "Tablet.Transaction.Enqueued");
4151
}
4252

ydb/core/tablet_flat/flat_executor.cpp

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,7 @@ void TExecutor::RecreatePageCollectionsCache() noexcept
195195
auto &seat = xpair.second->Seat;
196196
xpair.second->WaitingSpan.EndOk();
197197
LWTRACK(TransactionEnqueued, seat->Self->Orbit, seat->UniqID);
198-
seat->CreateEnqueuedSpan();
198+
seat->StartEnqueuedSpan();
199199
ActivationQueue->Push(seat.Release());
200200
ActivateTransactionWaiting++;
201201
}
@@ -519,7 +519,7 @@ void TExecutor::PlanTransactionActivation() {
519519
while (PendingQueue->Head() && (!limitTxInFly || (Stats->TxInFly - Stats->TxPending < limitTxInFly))) {
520520
TAutoPtr<TSeat> seat = PendingQueue->Pop();
521521
LWTRACK(TransactionEnqueued, seat->Self->Orbit, seat->UniqID);
522-
seat->CreateEnqueuedSpan();
522+
seat->StartEnqueuedSpan();
523523
ActivationQueue->Push(seat.Release());
524524
ActivateTransactionWaiting++;
525525
--Stats->TxPending;
@@ -540,7 +540,7 @@ void TExecutor::ActivateWaitingTransactions(TPrivatePageCache::TPage::TWaitQueue
540540
it->second->WaitingSpan.EndOk();
541541
auto &seat = it->second->Seat;
542542
LWTRACK(TransactionEnqueued, seat->Self->Orbit, seat->UniqID);
543-
seat->CreateEnqueuedSpan();
543+
seat->StartEnqueuedSpan();
544544
ActivationQueue->Push(seat.Release());
545545
ActivateTransactionWaiting++;
546546
TransactionWaitPads.erase(waitPad);
@@ -1575,11 +1575,9 @@ bool TExecutor::CanExecuteTransaction() const {
15751575
return Stats->IsActive && (Stats->IsFollower || PendingPartSwitches.empty()) && !BrokenTransaction;
15761576
}
15771577

1578-
void TExecutor::DoExecute(TAutoPtr<ITransaction> self, bool allowImmediate, const TActorContext &ctx) {
1578+
void TExecutor::DoExecute(TAutoPtr<ITransaction> self, bool allowImmediate, const TActorContext &ctx, NWilson::TTraceId traceId) {
15791579
Y_ABORT_UNLESS(ActivationQueue, "attempt to execute transaction before activation");
15801580

1581-
NWilson::TTraceId traceId;
1582-
15831581
TAutoPtr<TSeat> seat = new TSeat(++TransactionUniqCounter, self, std::move(traceId));
15841582

15851583
LWTRACK(TransactionBegin, seat->Self->Orbit, seat->UniqID, Owner->TabletID(), TypeName(*seat->Self));
@@ -1624,7 +1622,7 @@ void TExecutor::DoExecute(TAutoPtr<ITransaction> self, bool allowImmediate, cons
16241622

16251623
if (ActiveTransaction || ActivateTransactionWaiting || !allowImmediate) {
16261624
LWTRACK(TransactionEnqueued, seat->Self->Orbit, seat->UniqID);
1627-
seat->CreateEnqueuedSpan();
1625+
seat->StartEnqueuedSpan();
16281626
ActivationQueue->Push(seat.Release());
16291627
ActivateTransactionWaiting++;
16301628
PlanTransactionActivation();
@@ -1634,12 +1632,12 @@ void TExecutor::DoExecute(TAutoPtr<ITransaction> self, bool allowImmediate, cons
16341632
ExecuteTransaction(seat, ctx);
16351633
}
16361634

1637-
void TExecutor::Execute(TAutoPtr<ITransaction> self, const TActorContext &ctx) {
1638-
DoExecute(self, true, ctx);
1635+
void TExecutor::Execute(TAutoPtr<ITransaction> self, const TActorContext &ctx, NWilson::TTraceId traceId) {
1636+
DoExecute(self, true, ctx, std::move(traceId));
16391637
}
16401638

1641-
void TExecutor::Enqueue(TAutoPtr<ITransaction> self, const TActorContext &ctx) {
1642-
DoExecute(self, false, ctx);
1639+
void TExecutor::Enqueue(TAutoPtr<ITransaction> self, const TActorContext &ctx, NWilson::TTraceId traceId) {
1640+
DoExecute(self, false, ctx, std::move(traceId));
16431641
}
16441642

16451643
void TExecutor::ExecuteTransaction(TAutoPtr<TSeat> seat, const TActorContext &ctx) {
@@ -1653,14 +1651,14 @@ void TExecutor::ExecuteTransaction(TAutoPtr<TSeat> seat, const TActorContext &ct
16531651
PrivatePageCache->ResetTouchesAndToLoad(true);
16541652
TPageCollectionTxEnv env(*Database, *PrivatePageCache);
16551653

1656-
TTransactionContext txc(Owner->TabletID(), Generation(), Step(), *Database, env, seat->CurrentTxDataLimit, seat->TaskId);
1654+
TTransactionContext txc(*seat, Owner->TabletID(), Generation(), Step(), *Database, env, seat->CurrentTxDataLimit, seat->TaskId);
16571655
txc.NotEnoughMemory(seat->NotEnoughMemoryCount);
16581656

16591657
Database->Begin(Stamp(), env);
16601658

16611659
LWTRACK(TransactionExecuteBegin, seat->Self->Orbit, seat->UniqID);
16621660

1663-
NWilson::TSpan txExecuteSpan(TWilsonTablet::Tablet, seat->GetTxTraceId(), "Tablet.Transaction.Execute");
1661+
NWilson::TSpan txExecuteSpan = seat->CreateExecutionSpan();
16641662
const bool done = seat->Self->Execute(txc, ctx.MakeFor(OwnerActorId));
16651663
txExecuteSpan.EndOk();
16661664

@@ -1857,7 +1855,7 @@ void TExecutor::PostponeTransaction(TAutoPtr<TSeat> seat, TPageCollectionTxEnv &
18571855
// then tx may be re-activated.
18581856
if (!PrivatePageCache->GetStats().CurrentCacheMisses) {
18591857
LWTRACK(TransactionEnqueued, seat->Self->Orbit, seat->UniqID);
1860-
seat->CreateEnqueuedSpan();
1858+
seat->StartEnqueuedSpan();
18611859
ActivationQueue->Push(seat.Release());
18621860
ActivateTransactionWaiting++;
18631861
PlanTransactionActivation();
@@ -2945,7 +2943,7 @@ void TExecutor::StartSeat(ui64 task, TResource *cookie_) noexcept
29452943
PostponedTransactions.erase(it);
29462944
Memory->AcquiredMemory(*seat, task);
29472945
LWTRACK(TransactionEnqueued, seat->Self->Orbit, seat->UniqID);
2948-
seat->CreateEnqueuedSpan();
2946+
seat->StartEnqueuedSpan();
29492947
ActivationQueue->Push(seat.Release());
29502948
ActivateTransactionWaiting++;
29512949
PlanTransactionActivation();

ydb/core/tablet_flat/flat_executor.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -629,9 +629,9 @@ class TExecutor
629629
void Boot(TEvTablet::TEvBoot::TPtr &ev, const TActorContext &ctx) override;
630630
void Restored(TEvTablet::TEvRestored::TPtr &ev, const TActorContext &ctx) override;
631631
void DetachTablet(const TActorContext &ctx) override;
632-
void DoExecute(TAutoPtr<ITransaction> transaction, bool allowImmediate, const TActorContext &ctx);
633-
void Execute(TAutoPtr<ITransaction> transaction, const TActorContext &ctx) override;
634-
void Enqueue(TAutoPtr<ITransaction> transaction, const TActorContext &ctx) override;
632+
void DoExecute(TAutoPtr<ITransaction> transaction, bool allowImmediate, const TActorContext &ctx, NWilson::TTraceId traceId);
633+
void Execute(TAutoPtr<ITransaction> transaction, const TActorContext &ctx, NWilson::TTraceId traceId = {}) override;
634+
void Enqueue(TAutoPtr<ITransaction> transaction, const TActorContext &ctx, NWilson::TTraceId traceId = {}) override;
635635

636636
TLeaseCommit* AttachLeaseCommit(TLogCommit* commit, bool force = false);
637637
TLeaseCommit* EnsureReadOnlyLease(TMonotonic at);

ydb/core/tablet_flat/flat_executor_txloglogic.cpp

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -184,12 +184,26 @@ TLogicRedo::TCommitRWTransactionResult TLogicRedo::CommitRWTransaction(
184184
if (!Batch->Commit) {
185185
Batch->Commit = CommitManager->Begin(false, ECommit::Redo, seat->GetTxTraceId());
186186
} else {
187-
i64 batchSize = Batch->Bodies.size() + 1;
187+
// Batch commit's TraceId will be used for all blobstorage requests of the batch.
188+
if (!Batch->Commit->TraceId && seat->TxSpan) {
189+
// It is possible that the original or consequent transactions didn't have a TraceId,
190+
// but if a new transaction of a batch has TraceId, use it for the whole batch
191+
// (and consequent traced transactions).
192+
Batch->Commit->TraceId = seat->GetTxTraceId();
193+
} else {
194+
seat->TxSpan.Link(Batch->Commit->TraceId, {});
195+
}
188196

189-
Batch->Commit->FirstTx->TxSpan.Attribute("BatchSize", batchSize);
197+
for (TSeat* tx = Batch->Commit->FirstTx; tx != nullptr; tx = tx->NextCommitTx) {
198+
// Update batch size of the transaction, whose TraceId the commit uses (first transaction in batch, that has TraceId).
199+
if (tx->TxSpan) {
200+
i64 batchSize = Batch->Bodies.size() + 1;
201+
tx->TxSpan.Attribute("BatchSize", batchSize);
202+
break;
203+
}
204+
}
190205

191206
seat->TxSpan.Attribute("Batched", true);
192-
seat->TxSpan.Link(Batch->Commit->FirstTx->GetTxTraceId(), {});
193207
}
194208

195209
Batch->Commit->PushTx(seat.Get());

ydb/core/tablet_flat/tablet_flat_executed.cpp

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,19 +29,19 @@ IExecutor* TTabletExecutedFlat::CreateExecutor(const TActorContext &ctx) {
2929
return Executor();
3030
}
3131

32-
void TTabletExecutedFlat::Execute(TAutoPtr<ITransaction> transaction, const TActorContext &ctx) {
32+
void TTabletExecutedFlat::Execute(TAutoPtr<ITransaction> transaction, const TActorContext &ctx, NWilson::TTraceId traceId) {
3333
Y_UNUSED(ctx);
34-
Execute(transaction);
34+
Execute(transaction, std::move(traceId));
3535
}
3636

37-
void TTabletExecutedFlat::Execute(TAutoPtr<ITransaction> transaction) {
37+
void TTabletExecutedFlat::Execute(TAutoPtr<ITransaction> transaction, NWilson::TTraceId traceId) {
3838
if (transaction)
39-
static_cast<TExecutor*>(Executor())->Execute(transaction, ExecutorCtx(*TlsActivationContext));
39+
static_cast<TExecutor*>(Executor())->Execute(transaction, ExecutorCtx(*TlsActivationContext), std::move(traceId));
4040
}
4141

42-
void TTabletExecutedFlat::EnqueueExecute(TAutoPtr<ITransaction> transaction) {
42+
void TTabletExecutedFlat::EnqueueExecute(TAutoPtr<ITransaction> transaction, NWilson::TTraceId traceId) {
4343
if (transaction)
44-
static_cast<TExecutor*>(Executor())->Enqueue(transaction, ExecutorCtx(*TlsActivationContext));
44+
static_cast<TExecutor*>(Executor())->Enqueue(transaction, ExecutorCtx(*TlsActivationContext), std::move(traceId));
4545
}
4646

4747
const NTable::TScheme& TTabletExecutedFlat::Scheme() const noexcept {

ydb/core/tablet_flat/tablet_flat_executed.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,9 @@ class TTabletExecutedFlat : public NFlatExecutorSetup::ITablet {
2323
IExecutor* Executor() const { return Executor0; }
2424
const TInstant StartTime() const { return StartTime0; }
2525

26-
void Execute(TAutoPtr<ITransaction> transaction, const TActorContext &ctx);
27-
void Execute(TAutoPtr<ITransaction> transaction);
28-
void EnqueueExecute(TAutoPtr<ITransaction> transaction);
26+
void Execute(TAutoPtr<ITransaction> transaction, const TActorContext &ctx, NWilson::TTraceId traceId = {});
27+
void Execute(TAutoPtr<ITransaction> transaction, NWilson::TTraceId traceId = {});
28+
void EnqueueExecute(TAutoPtr<ITransaction> transaction, NWilson::TTraceId traceId = {});
2929

3030
const NTable::TScheme& Scheme() const noexcept;
3131

ydb/core/tablet_flat/tablet_flat_executor.h

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ namespace NTabletFlatExecutor {
2424
class TTransactionContext;
2525
class TExecutor;
2626
struct TPageCollectionTxEnv;
27+
struct TSeat;
2728

2829
class TTableSnapshotContext : public TThrRefBase, TNonCopyable {
2930
friend class TExecutor;
@@ -199,9 +200,10 @@ class TTransactionContext : public TTxMemoryProviderBase {
199200
friend class TExecutor;
200201

201202
public:
202-
TTransactionContext(ui64 tablet, ui32 gen, ui32 step, NTable::TDatabase &db, IExecuting &env,
203+
TTransactionContext(TSeat &seat, ui64 tablet, ui32 gen, ui32 step, NTable::TDatabase &db, IExecuting &env,
203204
ui64 memoryLimit, ui64 taskId)
204205
: TTxMemoryProviderBase(memoryLimit, taskId)
206+
, Seat(seat)
205207
, Tablet(tablet)
206208
, Generation(gen)
207209
, Step(step)
@@ -225,6 +227,7 @@ class TTransactionContext : public TTxMemoryProviderBase {
225227
}
226228

227229
public:
230+
TSeat& Seat;
228231
const ui64 Tablet = Max<ui32>();
229232
const ui32 Generation = Max<ui32>();
230233
const ui32 Step = Max<ui32>();
@@ -514,8 +517,8 @@ namespace NFlatExecutorSetup {
514517
// all followers had completed log with requested gc-barrier
515518
virtual void FollowerGcApplied(ui32 step, TDuration followerSyncDelay) = 0;
516519

517-
virtual void Execute(TAutoPtr<ITransaction> transaction, const TActorContext &ctx) = 0;
518-
virtual void Enqueue(TAutoPtr<ITransaction> transaction, const TActorContext &ctx) = 0;
520+
virtual void Execute(TAutoPtr<ITransaction> transaction, const TActorContext &ctx, NWilson::TTraceId traceId = {}) = 0;
521+
virtual void Enqueue(TAutoPtr<ITransaction> transaction, const TActorContext &ctx, NWilson::TTraceId traceId = {}) = 0;
519522

520523
virtual void ConfirmReadOnlyLease(TMonotonic at) = 0;
521524
virtual void ConfirmReadOnlyLease(TMonotonic at, std::function<void()> callback) = 0;

ydb/core/tx/datashard/datashard.cpp

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2788,7 +2788,8 @@ void TDataShard::ProposeTransaction(TEvDataShard::TEvProposeTransaction::TPtr &&
27882788
UpdateProposeQueueSize();
27892789
} else {
27902790
// Prepare planned transactions as soon as possible
2791-
Execute(new TTxProposeTransactionBase(this, std::move(ev), TAppData::TimeProvider->Now(), NextTieBreakerIndex++, /* delayed */ false), ctx);
2791+
TTxProposeTransactionBase *tx = new TTxProposeTransactionBase(this, std::move(ev), TAppData::TimeProvider->Now(), NextTieBreakerIndex++, /* delayed */ false);
2792+
Execute(tx, ctx, tx->GetTraceId());
27922793
}
27932794
}
27942795

@@ -2849,7 +2850,8 @@ void TDataShard::Handle(TEvPrivate::TEvDelayedProposeTransaction::TPtr &ev, cons
28492850

28502851
if (!item.Cancelled) {
28512852
// N.B. we don't call ProposeQueue.Reset(), tx will Ack() on its first Execute()
2852-
Execute(new TTxProposeTransactionBase(this, std::move(item.Event), item.ReceivedAt, item.TieBreakerIndex, /* delayed */ true), ctx);
2853+
TTxProposeTransactionBase *tx = new TTxProposeTransactionBase(this, std::move(item.Event), item.ReceivedAt, item.TieBreakerIndex, /* delayed */ true);
2854+
Execute(tx, ctx, tx->GetTraceId());
28532855
return;
28542856
}
28552857

@@ -4033,12 +4035,13 @@ bool TDataShard::ReassignChannelsEnabled() const {
40334035
}
40344036

40354037
void TDataShard::ExecuteProgressTx(const TActorContext& ctx) {
4036-
Execute(new TTxProgressTransaction(this), ctx);
4038+
Execute(new TTxProgressTransaction(this, {}), ctx);
40374039
}
40384040

40394041
void TDataShard::ExecuteProgressTx(TOperation::TPtr op, const TActorContext& ctx) {
40404042
Y_ABORT_UNLESS(op->IsInProgress());
4041-
Execute(new TTxProgressTransaction(this, std::move(op)), ctx);
4043+
NWilson::TTraceId traceId = op->GetTraceId();
4044+
Execute(new TTxProgressTransaction(this, std::move(op)), ctx, std::move(traceId));
40424045
}
40434046

40444047
TDuration TDataShard::CleanupTimeout() const {

ydb/core/tx/datashard/datashard.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
#include <ydb/core/scheme/scheme_type_registry.h>
1313
#include <ydb/core/protos/tx_datashard.pb.h>
1414
#include <ydb/core/tablet_flat/flat_row_versions.h>
15+
#include <ydb/library/actors/wilson/wilson_span.h>
1516

1617
#include <library/cpp/lwtrace/shuttle.h>
1718
#include <library/cpp/time_provider/time_provider.h>
@@ -951,6 +952,9 @@ struct TEvDataShard {
951952

952953
// Orbit used for tracking request events
953954
NLWTrace::TOrbit Orbit;
955+
956+
// Wilson span for this request.
957+
NWilson::TSpan ReadSpan;
954958
};
955959

956960
struct TEvReadResult : public TEventPB<TEvReadResult,

0 commit comments

Comments
 (0)