Skip to content

Commit 3b02ad8

Browse files
authored
Merge 1f6cb5c into 5200012
2 parents 5200012 + 1f6cb5c commit 3b02ad8

File tree

3 files changed

+27
-18
lines changed

3 files changed

+27
-18
lines changed

ydb/core/grpc_services/rpc_load_rows.cpp

+5-4
Original file line numberDiff line numberDiff line change
@@ -117,8 +117,9 @@ const Ydb::Table::BulkUpsertRequest* GetProtoRequest(IRequestOpCtx* req) {
117117
class TUploadRowsRPCPublic : public NTxProxy::TUploadRowsBase<NKikimrServices::TActivity::GRPC_REQ> {
118118
using TBase = NTxProxy::TUploadRowsBase<NKikimrServices::TActivity::GRPC_REQ>;
119119
public:
120-
explicit TUploadRowsRPCPublic(IRequestOpCtx* request, bool diskQuotaExceeded)
121-
: TBase(GetDuration(GetProtoRequest(request)->operation_params().operation_timeout()), diskQuotaExceeded)
120+
explicit TUploadRowsRPCPublic(IRequestOpCtx* request, bool diskQuotaExceeded, const char* name)
121+
: TBase(GetDuration(GetProtoRequest(request)->operation_params().operation_timeout()), diskQuotaExceeded,
122+
NWilson::TSpan(TWilsonKqp::BulkUpsertActor, request->GetWilsonTraceId(), name))
122123
, Request(request)
123124
{}
124125

@@ -517,7 +518,7 @@ void DoBulkUpsertRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvid
517518
} else if (GetProtoRequest(p.get())->has_csv_settings()) {
518519
f.RegisterActor(new TUploadColumnsRPCPublic(p.release(), diskQuotaExceeded));
519520
} else {
520-
f.RegisterActor(new TUploadRowsRPCPublic(p.release(), diskQuotaExceeded));
521+
f.RegisterActor(new TUploadRowsRPCPublic(p.release(), diskQuotaExceeded, "BulkRowsUpsertActor"));
521522
}
522523
}
523524

@@ -530,7 +531,7 @@ IActor* TEvBulkUpsertRequest::CreateRpcActor(NKikimr::NGRpcService::IRequestOpCt
530531
} else if (GetProtoRequest(msg)->has_csv_settings()) {
531532
return new TUploadColumnsRPCPublic(msg, diskQuotaExceeded);
532533
} else {
533-
return new TUploadRowsRPCPublic(msg, diskQuotaExceeded);
534+
return new TUploadRowsRPCPublic(msg, diskQuotaExceeded, "BulkRowsUpsertActor");
534535
}
535536
}
536537

ydb/core/tx/tx_proxy/upload_rows_common_impl.h

+20-14
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
#include <ydb/core/formats/arrow/converter.h>
1010
#include <ydb/core/io_formats/arrow/csv_arrow.h>
1111
#include <ydb/core/base/tablet_pipecache.h>
12-
#include <ydb/library/ydb_issue/issue_helpers.h>
1312
#include <ydb/core/base/path.h>
1413
#include <ydb/core/base/feature_flags.h>
1514
#include <ydb/core/scheme/scheme_tablecell.h>
@@ -29,6 +28,8 @@
2928
#undef INCLUDE_YDB_INTERNAL_H
3029

3130
#include <ydb/library/actors/core/actor_bootstrapped.h>
31+
#include <ydb/library/wilson_ids/wilson.h>
32+
#include <ydb/library/ydb_issue/issue_helpers.h>
3233

3334
#include <util/string/join.h>
3435
#include <util/string/vector.h>
@@ -208,18 +209,21 @@ class TUploadRowsBase : public TActorBootstrapped<TUploadRowsBase<DerivedActivit
208209
std::shared_ptr<arrow::RecordBatch> Batch;
209210
float RuCost = 0.0;
210211

212+
NWilson::TSpan Span;
213+
211214
public:
212215
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
213216
return DerivedActivityType;
214217
}
215218

216-
explicit TUploadRowsBase(TDuration timeout = TDuration::Max(), bool diskQuotaExceeded = false)
219+
explicit TUploadRowsBase(TDuration timeout = TDuration::Max(), bool diskQuotaExceeded = false, NWilson::TSpan span = {})
217220
: TBase()
218221
, SchemeCache(MakeSchemeCacheID())
219222
, LeaderPipeCache(MakePipePeNodeCacheID(false))
220223
, Timeout((timeout && timeout <= DEFAULT_TIMEOUT) ? timeout : DEFAULT_TIMEOUT)
221224
, Status(Ydb::StatusIds::SUCCESS)
222225
, DiskQuotaExceeded(diskQuotaExceeded)
226+
, Span(std::move(span))
223227
{}
224228

225229
void Bootstrap(const NActors::TActorContext& ctx) {
@@ -232,10 +236,10 @@ class TUploadRowsBase : public TActorBootstrapped<TUploadRowsBase<DerivedActivit
232236
for (auto& pr : ShardUploadRetryStates) {
233237
if (pr.second.SentOverloadSeqNo) {
234238
auto* msg = new TEvDataShard::TEvOverloadUnsubscribe(pr.second.SentOverloadSeqNo);
235-
ctx.Send(LeaderPipeCache, new TEvPipeCache::TEvForward(msg, pr.first, false));
239+
ctx.Send(LeaderPipeCache, new TEvPipeCache::TEvForward(msg, pr.first, false), 0, 0, Span.GetTraceId());
236240
}
237241
}
238-
ctx.Send(LeaderPipeCache, new TEvPipeCache::TEvUnlink(0));
242+
ctx.Send(LeaderPipeCache, new TEvPipeCache::TEvUnlink(0), 0, 0, Span.GetTraceId());
239243
if (TimeoutTimerActorId) {
240244
ctx.Send(TimeoutTimerActorId, new TEvents::TEvPoisonPill());
241245
}
@@ -330,6 +334,7 @@ class TUploadRowsBase : public TActorBootstrapped<TUploadRowsBase<DerivedActivit
330334
private:
331335
void Handle(TEvents::TEvPoison::TPtr&, const TActorContext& ctx) {
332336
OnBeforePoison(ctx);
337+
Span.EndError("poison");
333338
Die(ctx);
334339
}
335340

@@ -595,7 +600,7 @@ class TUploadRowsBase : public TActorBootstrapped<TUploadRowsBase<DerivedActivit
595600
entry.SyncVersion = true;
596601
entry.ShowPrivatePath = AllowWriteToPrivateTable;
597602
request->ResultSet.emplace_back(entry);
598-
ctx.Send(SchemeCache, new TEvTxProxySchemeCache::TEvNavigateKeySet(request));
603+
ctx.Send(SchemeCache, new TEvTxProxySchemeCache::TEvNavigateKeySet(request), 0, 0, Span.GetTraceId());
599604

600605
TimeoutTimerActorId = CreateLongTimer(ctx, Timeout,
601606
new IEventHandle(ctx.SelfID, ctx.SelfID, new TEvents::TEvWakeup()));
@@ -743,7 +748,7 @@ class TUploadRowsBase : public TActorBootstrapped<TUploadRowsBase<DerivedActivit
743748
// Begin Long Tx for writing a batch into OLAP table
744749
TActorId longTxServiceId = NLongTxService::MakeLongTxServiceID(ctx.SelfID.NodeId());
745750
NKikimrLongTxService::TEvBeginTx::EMode mode = NKikimrLongTxService::TEvBeginTx::MODE_WRITE_ONLY;
746-
ctx.Send(longTxServiceId, new NLongTxService::TEvLongTxService::TEvBeginTx(GetDatabase(), mode));
751+
ctx.Send(longTxServiceId, new NLongTxService::TEvLongTxService::TEvBeginTx(GetDatabase(), mode), 0, 0, Span.GetTraceId());
747752
TBase::Become(&TThis::StateWaitBeginLongTx);
748753
}
749754

@@ -861,7 +866,7 @@ class TUploadRowsBase : public TActorBootstrapped<TUploadRowsBase<DerivedActivit
861866
LogPrefix() << "rolling back LongTx '" << LongTxId.ToString() << "'");
862867

863868
TActorId longTxServiceId = NLongTxService::MakeLongTxServiceID(ctx.SelfID.NodeId());
864-
ctx.Send(longTxServiceId, new NLongTxService::TEvLongTxService::TEvRollbackTx(LongTxId));
869+
ctx.Send(longTxServiceId, new NLongTxService::TEvLongTxService::TEvRollbackTx(LongTxId), 0, 0, Span.GetTraceId());
865870
}
866871

867872
STFUNC(StateWaitWriteBatchResult) {
@@ -887,7 +892,7 @@ class TUploadRowsBase : public TActorBootstrapped<TUploadRowsBase<DerivedActivit
887892

888893
void CommitLongTx(const TActorContext& ctx) {
889894
TActorId longTxServiceId = NLongTxService::MakeLongTxServiceID(ctx.SelfID.NodeId());
890-
ctx.Send(longTxServiceId, new NLongTxService::TEvLongTxService::TEvCommitTx(LongTxId));
895+
ctx.Send(longTxServiceId, new NLongTxService::TEvLongTxService::TEvCommitTx(LongTxId), 0, 0, Span.GetTraceId());
891896
TBase::Become(&TThis::StateWaitCommitLongTx);
892897
}
893898

@@ -966,7 +971,7 @@ class TUploadRowsBase : public TActorBootstrapped<TUploadRowsBase<DerivedActivit
966971
request->ResultSet.emplace_back(std::move(keyRange));
967972

968973
TAutoPtr<TEvTxProxySchemeCache::TEvResolveKeySet> resolveReq(new TEvTxProxySchemeCache::TEvResolveKeySet(request));
969-
ctx.Send(SchemeCache, resolveReq.Release());
974+
ctx.Send(SchemeCache, resolveReq.Release(), 0, 0, Span.GetTraceId());
970975

971976
TBase::Become(&TThis::StateWaitResolveShards);
972977
}
@@ -1027,7 +1032,7 @@ class TUploadRowsBase : public TActorBootstrapped<TUploadRowsBase<DerivedActivit
10271032
ev->Record.SetOverloadSubscribe(seqNo);
10281033
state->SentOverloadSeqNo = seqNo;
10291034

1030-
ctx.Send(LeaderPipeCache, new TEvPipeCache::TEvForward(ev.release(), shardId, true), IEventHandle::FlagTrackDelivery);
1035+
ctx.Send(LeaderPipeCache, new TEvPipeCache::TEvForward(ev.release(), shardId, true), IEventHandle::FlagTrackDelivery, 0, Span.GetTraceId());
10311036
}
10321037

10331038
void MakeShardRequests(const NActors::TActorContext& ctx) {
@@ -1109,7 +1114,7 @@ class TUploadRowsBase : public TActorBootstrapped<TUploadRowsBase<DerivedActivit
11091114
ev->Record.SetOverloadSubscribe(seqNo);
11101115
uploadRetryStates[idx]->SentOverloadSeqNo = seqNo;
11111116

1112-
ctx.Send(LeaderPipeCache, new TEvPipeCache::TEvForward(ev.release(), shardId, true), IEventHandle::FlagTrackDelivery);
1117+
ctx.Send(LeaderPipeCache, new TEvPipeCache::TEvForward(ev.release(), shardId, true), IEventHandle::FlagTrackDelivery, 0, Span.GetTraceId());
11131118

11141119
auto res = ShardRepliesLeft.insert(shardId);
11151120
if (!res.second) {
@@ -1133,7 +1138,7 @@ class TUploadRowsBase : public TActorBootstrapped<TUploadRowsBase<DerivedActivit
11331138
}
11341139

11351140
void Handle(TEvPipeCache::TEvDeliveryProblem::TPtr &ev, const TActorContext &ctx) {
1136-
ctx.Send(SchemeCache, new TEvTxProxySchemeCache::TEvInvalidateTable(GetKeyRange()->TableId, TActorId()));
1141+
ctx.Send(SchemeCache, new TEvTxProxySchemeCache::TEvInvalidateTable(GetKeyRange()->TableId, TActorId()), 0, 0, Span.GetTraceId());
11371142

11381143
SetError(Ydb::StatusIds::UNAVAILABLE, Sprintf("Failed to connect to shard %" PRIu64, ev->Get()->TabletId));
11391144
ShardRepliesLeft.erase(ev->Get()->TabletId);
@@ -1170,7 +1175,7 @@ class TUploadRowsBase : public TActorBootstrapped<TUploadRowsBase<DerivedActivit
11701175
switch (shardResponse.GetStatus()) {
11711176
case NKikimrTxDataShard::TError::WRONG_SHARD_STATE:
11721177
case NKikimrTxDataShard::TError::SHARD_IS_BLOCKED:
1173-
ctx.Send(SchemeCache, new TEvTxProxySchemeCache::TEvInvalidateTable(GetKeyRange()->TableId, TActorId()));
1178+
ctx.Send(SchemeCache, new TEvTxProxySchemeCache::TEvInvalidateTable(GetKeyRange()->TableId, TActorId()), 0, 0, Span.GetTraceId());
11741179
status = Ydb::StatusIds::OVERLOADED;
11751180
break;
11761181
case NKikimrTxDataShard::TError::DISK_SPACE_EXHAUSTED:
@@ -1203,7 +1208,7 @@ class TUploadRowsBase : public TActorBootstrapped<TUploadRowsBase<DerivedActivit
12031208
}
12041209

12051210
// Notify the cache that we are done with the pipe
1206-
ctx.Send(LeaderPipeCache, new TEvPipeCache::TEvUnlink(shardId));
1211+
ctx.Send(LeaderPipeCache, new TEvPipeCache::TEvUnlink(shardId), 0, 0, Span.GetTraceId());
12071212

12081213
ShardRepliesLeft.erase(shardId);
12091214
ShardUploadRetryStates.erase(shardId);
@@ -1266,6 +1271,7 @@ class TUploadRowsBase : public TActorBootstrapped<TUploadRowsBase<DerivedActivit
12661271
Y_DEBUG_ABORT_UNLESS(status != ::Ydb::StatusIds::SUCCESS);
12671272
RollbackLongTx(ctx);
12681273
}
1274+
Span.EndOk();
12691275

12701276
Die(ctx);
12711277
}

ydb/library/wilson_ids/wilson.h

+2
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ namespace NKikimr {
4242

4343
LookupActor = 9,
4444
LookupActorShardsResolve = 10,
45+
46+
BulkUpsertActor = 9,
4547
};
4648
};
4749

0 commit comments

Comments
 (0)