1
1
#include " kqp_stream_lookup_actor.h"
2
2
3
- #include < ydb/library/actors/core/actor_bootstrapped.h>
4
-
5
3
#include < ydb/core/actorlib_impl/long_timer.h>
6
4
#include < ydb/core/base/tablet_pipecache.h>
7
5
#include < ydb/core/engine/minikql/minikql_engine_host.h>
8
6
#include < ydb/core/kqp/common/kqp_resolve.h>
7
+ #include < ydb/core/kqp/common/kqp_event_ids.h>
9
8
#include < ydb/core/kqp/gateway/kqp_gateway.h>
9
+ #include < ydb/core/kqp/runtime/kqp_scan_data.h>
10
+ #include < ydb/core/kqp/runtime/kqp_stream_lookup_worker.h>
10
11
#include < ydb/core/protos/kqp_stats.pb.h>
11
12
#include < ydb/core/tx/scheme_cache/scheme_cache.h>
12
- #include < ydb/core/kqp/common/kqp_event_ids.h>
13
+
14
+ #include < ydb/library/actors/core/actor_bootstrapped.h>
13
15
#include < ydb/library/yql/public/issue/yql_issue_message.h>
14
- #include < ydb/core/kqp/runtime/kqp_scan_data.h>
15
- #include < ydb/core/kqp/runtime/kqp_stream_lookup_worker.h>
16
16
#include < ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h>
17
+ #include < ydb/library/wilson_ids/wilson.h>
17
18
18
19
namespace NKikimr {
19
20
namespace NKqp {
@@ -25,24 +26,22 @@ static constexpr ui64 MAX_SHARD_RETRIES = 10;
25
26
26
27
class TKqpStreamLookupActor : public NActors ::TActorBootstrapped<TKqpStreamLookupActor>, public NYql::NDq::IDqComputeActorAsyncInput {
27
28
public:
28
- TKqpStreamLookupActor (ui64 inputIndex, NYql::NDq::TCollectStatsLevel statsLevel, const NUdf::TUnboxedValue& input,
29
- const NActors::TActorId& computeActorId, const NMiniKQL::TTypeEnvironment& typeEnv,
30
- const NMiniKQL::THolderFactory& holderFactory, std::shared_ptr<NMiniKQL::TScopedAlloc>& alloc,
31
- const NYql::NDqProto::TTaskInput& inputDesc, NKikimrKqp::TKqpStreamLookupSettings&& settings,
29
+ TKqpStreamLookupActor (NYql::NDq::IDqAsyncIoFactory::TInputTransformArguments&& args, NKikimrKqp::TKqpStreamLookupSettings&& settings,
32
30
TIntrusivePtr<TKqpCounters> counters)
33
- : LogPrefix(TStringBuilder() << " StreamLookupActor, inputIndex: " << inputIndex << " , CA Id " << computeActorId )
34
- , InputIndex(inputIndex )
35
- , Input(input )
36
- , ComputeActorId(computeActorId )
37
- , TypeEnv(typeEnv )
38
- , Alloc(alloc )
31
+ : LogPrefix(TStringBuilder() << " StreamLookupActor, inputIndex: " << args.InputIndex << " , CA Id " << args.ComputeActorId )
32
+ , InputIndex(args.InputIndex )
33
+ , Input(args.TransformInput )
34
+ , ComputeActorId(args.ComputeActorId )
35
+ , TypeEnv(args.TypeEnv )
36
+ , Alloc(args.Alloc )
39
37
, Snapshot(settings.GetSnapshot().GetStep(), settings.GetSnapshot().GetTxId())
40
38
, LockTxId(settings.HasLockTxId() ? settings.GetLockTxId() : TMaybe<ui64>())
41
39
, SchemeCacheRequestTimeout(SCHEME_CACHE_REQUEST_TIMEOUT)
42
- , StreamLookupWorker(CreateStreamLookupWorker(std::move(settings), typeEnv, holderFactory, inputDesc ))
40
+ , StreamLookupWorker(CreateStreamLookupWorker(std::move(settings), args.TypeEnv, args.HolderFactory, args.InputDesc ))
43
41
, Counters(counters)
42
+ , LookupActorSpan(TWilsonKqp::LookupActor, std::move(args.TraceId), " LookupActor" )
44
43
{
45
- IngressStats.Level = statsLevel ;
44
+ IngressStats.Level = args. StatsLevel ;
46
45
}
47
46
48
47
virtual ~TKqpStreamLookupActor () {
@@ -174,6 +173,10 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
174
173
175
174
Send (MakePipePeNodeCacheID (false ), new TEvPipeCache::TEvUnlink (0 ));
176
175
TActorBootstrapped<TKqpStreamLookupActor>::PassAway ();
176
+
177
+ if (LookupActorSpan) {
178
+ LookupActorSpan.End ();
179
+ }
177
180
}
178
181
179
182
i64 GetAsyncInputData (NKikimr::NMiniKQL::TUnboxedValueBatch& batch, TMaybe<TInstant>&, bool & finished, i64 freeSpace) final {
@@ -234,10 +237,15 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
234
237
void Handle (TEvTxProxySchemeCache::TEvResolveKeySetResult::TPtr& ev) {
235
238
CA_LOG_D (" TEvResolveKeySetResult was received for table: " << StreamLookupWorker->GetTablePath ());
236
239
if (ev->Get ()->Request ->ErrorCount > 0 ) {
237
- return RuntimeError (TStringBuilder () << " Failed to get partitioning for table: "
238
- << StreamLookupWorker->GetTablePath (), NYql::NDqProto::StatusIds::SCHEME_ERROR);
240
+ TString errorMsg = TStringBuilder () << " Failed to get partitioning for table: "
241
+ << StreamLookupWorker->GetTablePath ();
242
+ LookupActorStateSpan.EndError (errorMsg);
243
+
244
+ return RuntimeError (errorMsg, NYql::NDqProto::StatusIds::SCHEME_ERROR);
239
245
}
240
246
247
+ LookupActorStateSpan.EndOk ();
248
+
241
249
auto & resultSet = ev->Get ()->Request ->ResultSet ;
242
250
YQL_ENSURE (resultSet.size () == 1 , " Expected one result for range [NULL, +inf)" );
243
251
Partitioning = resultSet[0 ].KeyDescription ->Partitioning ;
@@ -342,8 +350,11 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
342
350
<< " was resolved: " << !!Partitioning);
343
351
344
352
if (!Partitioning) {
345
- RuntimeError (TStringBuilder () << " Failed to resolve shards for table: " << StreamLookupWorker->GetTablePath ()
346
- << " (request timeout exceeded)" , NYql::NDqProto::StatusIds::TIMEOUT);
353
+ TString errorMsg = TStringBuilder () << " Failed to resolve shards for table: " << StreamLookupWorker->GetTablePath ()
354
+ << " (request timeout exceeded)" ;
355
+ LookupActorStateSpan.EndError (errorMsg);
356
+
357
+ RuntimeError (errorMsg, NYql::NDqProto::StatusIds::TIMEOUT);
347
358
}
348
359
}
349
360
@@ -392,7 +403,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
392
403
record.SetResultFormat (NKikimrDataEvents::FORMAT_CELLVEC);
393
404
394
405
Send (MakePipePeNodeCacheID (false ), new TEvPipeCache::TEvForward (request.Release (), shardId, true ),
395
- IEventHandle::FlagTrackDelivery);
406
+ IEventHandle::FlagTrackDelivery, 0 , LookupActorSpan. GetTraceId () );
396
407
397
408
read .State = EReadState::Running;
398
409
@@ -438,6 +449,9 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
438
449
keyColumnTypes, TVector<TKeyDesc::TColumnOp>{}));
439
450
440
451
Counters->IteratorsShardResolve ->Inc ();
452
+ LookupActorStateSpan = NWilson::TSpan (TWilsonKqp::LookupActorShardsResolve, LookupActorSpan.GetTraceId (),
453
+ " WaitForShardsResolve" , NWilson::EFlags::AUTO_END);
454
+
441
455
Send (MakeSchemeCacheID (), new TEvTxProxySchemeCache::TEvInvalidateTable (StreamLookupWorker->GetTableId (), {}));
442
456
Send (MakeSchemeCacheID (), new TEvTxProxySchemeCache::TEvResolveKeySet (request));
443
457
@@ -467,6 +481,11 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
467
481
468
482
NYql::TIssues issues;
469
483
issues.AddIssue (std::move (issue));
484
+
485
+ if (LookupActorSpan) {
486
+ LookupActorSpan.EndError (issues.ToOneLineString ());
487
+ }
488
+
470
489
Send (ComputeActorId, new TEvAsyncInputError (InputIndex, std::move (issues), statusCode));
471
490
}
472
491
@@ -495,17 +514,15 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
495
514
ui64 ReadBytesCount = 0 ;
496
515
497
516
TIntrusivePtr<TKqpCounters> Counters;
517
+ NWilson::TSpan LookupActorSpan;
518
+ NWilson::TSpan LookupActorStateSpan;
498
519
};
499
520
500
521
} // namespace
501
522
502
- std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateStreamLookupActor (ui64 inputIndex,
503
- NYql::NDq::TCollectStatsLevel statsLevel, const NUdf::TUnboxedValue& input, const NActors::TActorId& computeActorId,
504
- const NMiniKQL::TTypeEnvironment& typeEnv, const NMiniKQL::THolderFactory& holderFactory,
505
- std::shared_ptr<NMiniKQL::TScopedAlloc>& alloc, const NYql::NDqProto::TTaskInput& inputDesc,
523
+ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateStreamLookupActor (NYql::NDq::IDqAsyncIoFactory::TInputTransformArguments&& args,
506
524
NKikimrKqp::TKqpStreamLookupSettings&& settings, TIntrusivePtr<TKqpCounters> counters) {
507
- auto actor = new TKqpStreamLookupActor (inputIndex, statsLevel, input, computeActorId, typeEnv, holderFactory,
508
- alloc, inputDesc, std::move (settings), counters);
525
+ auto actor = new TKqpStreamLookupActor (std::move (args), std::move (settings), counters);
509
526
return {actor, actor};
510
527
}
511
528
0 commit comments