Skip to content

Implemented tracing for bulk row upsert #2218

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Feb 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions ydb/core/grpc_services/rpc_load_rows.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,9 @@ const Ydb::Table::BulkUpsertRequest* GetProtoRequest(IRequestOpCtx* req) {
class TUploadRowsRPCPublic : public NTxProxy::TUploadRowsBase<NKikimrServices::TActivity::GRPC_REQ> {
using TBase = NTxProxy::TUploadRowsBase<NKikimrServices::TActivity::GRPC_REQ>;
public:
explicit TUploadRowsRPCPublic(IRequestOpCtx* request, bool diskQuotaExceeded)
: TBase(GetDuration(GetProtoRequest(request)->operation_params().operation_timeout()), diskQuotaExceeded)
explicit TUploadRowsRPCPublic(IRequestOpCtx* request, bool diskQuotaExceeded, const char* name)
: TBase(GetDuration(GetProtoRequest(request)->operation_params().operation_timeout()), diskQuotaExceeded,
NWilson::TSpan(TWilsonKqp::BulkUpsertActor, request->GetWilsonTraceId(), name))
, Request(request)
{}

Expand Down Expand Up @@ -517,7 +518,7 @@ void DoBulkUpsertRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvid
} else if (GetProtoRequest(p.get())->has_csv_settings()) {
f.RegisterActor(new TUploadColumnsRPCPublic(p.release(), diskQuotaExceeded));
} else {
f.RegisterActor(new TUploadRowsRPCPublic(p.release(), diskQuotaExceeded));
f.RegisterActor(new TUploadRowsRPCPublic(p.release(), diskQuotaExceeded, "BulkRowsUpsertActor"));
}
}

Expand All @@ -530,7 +531,7 @@ IActor* TEvBulkUpsertRequest::CreateRpcActor(NKikimr::NGRpcService::IRequestOpCt
} else if (GetProtoRequest(msg)->has_csv_settings()) {
return new TUploadColumnsRPCPublic(msg, diskQuotaExceeded);
} else {
return new TUploadRowsRPCPublic(msg, diskQuotaExceeded);
return new TUploadRowsRPCPublic(msg, diskQuotaExceeded, "BulkRowsUpsertActor");
}
}

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/datashard/datashard__op_rows.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ class TTxDirectBase : public TTransactionBase<TDataShard> {

public:
TTxDirectBase(TDataShard* ds, TEvRequest ev)
: TBase(ds)
: TBase(ds, std::move(ev->TraceId))
, Ev(ev)
{
}
Expand Down
34 changes: 20 additions & 14 deletions ydb/core/tx/tx_proxy/upload_rows_common_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
#include <ydb/core/formats/arrow/converter.h>
#include <ydb/core/io_formats/arrow/csv_arrow.h>
#include <ydb/core/base/tablet_pipecache.h>
#include <ydb/library/ydb_issue/issue_helpers.h>
#include <ydb/core/base/path.h>
#include <ydb/core/base/feature_flags.h>
#include <ydb/core/scheme/scheme_tablecell.h>
Expand All @@ -29,6 +28,8 @@
#undef INCLUDE_YDB_INTERNAL_H

#include <ydb/library/actors/core/actor_bootstrapped.h>
#include <ydb/library/wilson_ids/wilson.h>
#include <ydb/library/ydb_issue/issue_helpers.h>

#include <util/string/join.h>
#include <util/string/vector.h>
Expand Down Expand Up @@ -208,18 +209,21 @@ class TUploadRowsBase : public TActorBootstrapped<TUploadRowsBase<DerivedActivit
std::shared_ptr<arrow::RecordBatch> Batch;
float RuCost = 0.0;

NWilson::TSpan Span;

public:
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
return DerivedActivityType;
}

explicit TUploadRowsBase(TDuration timeout = TDuration::Max(), bool diskQuotaExceeded = false)
explicit TUploadRowsBase(TDuration timeout = TDuration::Max(), bool diskQuotaExceeded = false, NWilson::TSpan span = {})
: TBase()
, SchemeCache(MakeSchemeCacheID())
, LeaderPipeCache(MakePipePeNodeCacheID(false))
, Timeout((timeout && timeout <= DEFAULT_TIMEOUT) ? timeout : DEFAULT_TIMEOUT)
, Status(Ydb::StatusIds::SUCCESS)
, DiskQuotaExceeded(diskQuotaExceeded)
, Span(std::move(span))
{}

void Bootstrap(const NActors::TActorContext& ctx) {
Expand All @@ -232,10 +236,10 @@ class TUploadRowsBase : public TActorBootstrapped<TUploadRowsBase<DerivedActivit
for (auto& pr : ShardUploadRetryStates) {
if (pr.second.SentOverloadSeqNo) {
auto* msg = new TEvDataShard::TEvOverloadUnsubscribe(pr.second.SentOverloadSeqNo);
ctx.Send(LeaderPipeCache, new TEvPipeCache::TEvForward(msg, pr.first, false));
ctx.Send(LeaderPipeCache, new TEvPipeCache::TEvForward(msg, pr.first, false), 0, 0, Span.GetTraceId());
}
}
ctx.Send(LeaderPipeCache, new TEvPipeCache::TEvUnlink(0));
ctx.Send(LeaderPipeCache, new TEvPipeCache::TEvUnlink(0), 0, 0, Span.GetTraceId());
if (TimeoutTimerActorId) {
ctx.Send(TimeoutTimerActorId, new TEvents::TEvPoisonPill());
}
Expand Down Expand Up @@ -330,6 +334,7 @@ class TUploadRowsBase : public TActorBootstrapped<TUploadRowsBase<DerivedActivit
private:
void Handle(TEvents::TEvPoison::TPtr&, const TActorContext& ctx) {
OnBeforePoison(ctx);
Span.EndError("poison");
Die(ctx);
}

Expand Down Expand Up @@ -595,7 +600,7 @@ class TUploadRowsBase : public TActorBootstrapped<TUploadRowsBase<DerivedActivit
entry.SyncVersion = true;
entry.ShowPrivatePath = AllowWriteToPrivateTable;
request->ResultSet.emplace_back(entry);
ctx.Send(SchemeCache, new TEvTxProxySchemeCache::TEvNavigateKeySet(request));
ctx.Send(SchemeCache, new TEvTxProxySchemeCache::TEvNavigateKeySet(request), 0, 0, Span.GetTraceId());

TimeoutTimerActorId = CreateLongTimer(ctx, Timeout,
new IEventHandle(ctx.SelfID, ctx.SelfID, new TEvents::TEvWakeup()));
Expand Down Expand Up @@ -743,7 +748,7 @@ class TUploadRowsBase : public TActorBootstrapped<TUploadRowsBase<DerivedActivit
// Begin Long Tx for writing a batch into OLAP table
TActorId longTxServiceId = NLongTxService::MakeLongTxServiceID(ctx.SelfID.NodeId());
NKikimrLongTxService::TEvBeginTx::EMode mode = NKikimrLongTxService::TEvBeginTx::MODE_WRITE_ONLY;
ctx.Send(longTxServiceId, new NLongTxService::TEvLongTxService::TEvBeginTx(GetDatabase(), mode));
ctx.Send(longTxServiceId, new NLongTxService::TEvLongTxService::TEvBeginTx(GetDatabase(), mode), 0, 0, Span.GetTraceId());
TBase::Become(&TThis::StateWaitBeginLongTx);
}

Expand Down Expand Up @@ -861,7 +866,7 @@ class TUploadRowsBase : public TActorBootstrapped<TUploadRowsBase<DerivedActivit
LogPrefix() << "rolling back LongTx '" << LongTxId.ToString() << "'");

TActorId longTxServiceId = NLongTxService::MakeLongTxServiceID(ctx.SelfID.NodeId());
ctx.Send(longTxServiceId, new NLongTxService::TEvLongTxService::TEvRollbackTx(LongTxId));
ctx.Send(longTxServiceId, new NLongTxService::TEvLongTxService::TEvRollbackTx(LongTxId), 0, 0, Span.GetTraceId());
}

STFUNC(StateWaitWriteBatchResult) {
Expand All @@ -887,7 +892,7 @@ class TUploadRowsBase : public TActorBootstrapped<TUploadRowsBase<DerivedActivit

void CommitLongTx(const TActorContext& ctx) {
TActorId longTxServiceId = NLongTxService::MakeLongTxServiceID(ctx.SelfID.NodeId());
ctx.Send(longTxServiceId, new NLongTxService::TEvLongTxService::TEvCommitTx(LongTxId));
ctx.Send(longTxServiceId, new NLongTxService::TEvLongTxService::TEvCommitTx(LongTxId), 0, 0, Span.GetTraceId());
TBase::Become(&TThis::StateWaitCommitLongTx);
}

Expand Down Expand Up @@ -966,7 +971,7 @@ class TUploadRowsBase : public TActorBootstrapped<TUploadRowsBase<DerivedActivit
request->ResultSet.emplace_back(std::move(keyRange));

TAutoPtr<TEvTxProxySchemeCache::TEvResolveKeySet> resolveReq(new TEvTxProxySchemeCache::TEvResolveKeySet(request));
ctx.Send(SchemeCache, resolveReq.Release());
ctx.Send(SchemeCache, resolveReq.Release(), 0, 0, Span.GetTraceId());

TBase::Become(&TThis::StateWaitResolveShards);
}
Expand Down Expand Up @@ -1027,7 +1032,7 @@ class TUploadRowsBase : public TActorBootstrapped<TUploadRowsBase<DerivedActivit
ev->Record.SetOverloadSubscribe(seqNo);
state->SentOverloadSeqNo = seqNo;

ctx.Send(LeaderPipeCache, new TEvPipeCache::TEvForward(ev.release(), shardId, true), IEventHandle::FlagTrackDelivery);
ctx.Send(LeaderPipeCache, new TEvPipeCache::TEvForward(ev.release(), shardId, true), IEventHandle::FlagTrackDelivery, 0, Span.GetTraceId());
}

void MakeShardRequests(const NActors::TActorContext& ctx) {
Expand Down Expand Up @@ -1109,7 +1114,7 @@ class TUploadRowsBase : public TActorBootstrapped<TUploadRowsBase<DerivedActivit
ev->Record.SetOverloadSubscribe(seqNo);
uploadRetryStates[idx]->SentOverloadSeqNo = seqNo;

ctx.Send(LeaderPipeCache, new TEvPipeCache::TEvForward(ev.release(), shardId, true), IEventHandle::FlagTrackDelivery);
ctx.Send(LeaderPipeCache, new TEvPipeCache::TEvForward(ev.release(), shardId, true), IEventHandle::FlagTrackDelivery, 0, Span.GetTraceId());

auto res = ShardRepliesLeft.insert(shardId);
if (!res.second) {
Expand All @@ -1133,7 +1138,7 @@ class TUploadRowsBase : public TActorBootstrapped<TUploadRowsBase<DerivedActivit
}

void Handle(TEvPipeCache::TEvDeliveryProblem::TPtr &ev, const TActorContext &ctx) {
ctx.Send(SchemeCache, new TEvTxProxySchemeCache::TEvInvalidateTable(GetKeyRange()->TableId, TActorId()));
ctx.Send(SchemeCache, new TEvTxProxySchemeCache::TEvInvalidateTable(GetKeyRange()->TableId, TActorId()), 0, 0, Span.GetTraceId());

SetError(Ydb::StatusIds::UNAVAILABLE, Sprintf("Failed to connect to shard %" PRIu64, ev->Get()->TabletId));
ShardRepliesLeft.erase(ev->Get()->TabletId);
Expand Down Expand Up @@ -1170,7 +1175,7 @@ class TUploadRowsBase : public TActorBootstrapped<TUploadRowsBase<DerivedActivit
switch (shardResponse.GetStatus()) {
case NKikimrTxDataShard::TError::WRONG_SHARD_STATE:
case NKikimrTxDataShard::TError::SHARD_IS_BLOCKED:
ctx.Send(SchemeCache, new TEvTxProxySchemeCache::TEvInvalidateTable(GetKeyRange()->TableId, TActorId()));
ctx.Send(SchemeCache, new TEvTxProxySchemeCache::TEvInvalidateTable(GetKeyRange()->TableId, TActorId()), 0, 0, Span.GetTraceId());
status = Ydb::StatusIds::OVERLOADED;
break;
case NKikimrTxDataShard::TError::DISK_SPACE_EXHAUSTED:
Expand Down Expand Up @@ -1203,7 +1208,7 @@ class TUploadRowsBase : public TActorBootstrapped<TUploadRowsBase<DerivedActivit
}

// Notify the cache that we are done with the pipe
ctx.Send(LeaderPipeCache, new TEvPipeCache::TEvUnlink(shardId));
ctx.Send(LeaderPipeCache, new TEvPipeCache::TEvUnlink(shardId), 0, 0, Span.GetTraceId());

ShardRepliesLeft.erase(shardId);
ShardUploadRetryStates.erase(shardId);
Expand Down Expand Up @@ -1266,6 +1271,7 @@ class TUploadRowsBase : public TActorBootstrapped<TUploadRowsBase<DerivedActivit
Y_DEBUG_ABORT_UNLESS(status != ::Ydb::StatusIds::SUCCESS);
RollbackLongTx(ctx);
}
Span.EndOk();

Die(ctx);
}
Expand Down
2 changes: 2 additions & 0 deletions ydb/library/wilson_ids/wilson.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ namespace NKikimr {

LookupActor = 9,
LookupActorShardsResolve = 10,

BulkUpsertActor = 9,
};
};

Expand Down