2424
2525namespace {
2626 constexpr i64 kInFlightMemoryLimitPerActor = 64_MB;
27- constexpr i64 kMemoryLimitPerMessage = 48_MB ;
28- constexpr i64 kMaxBatchesPerMessage = 1 ;
27+ constexpr i64 kMemoryLimitPerMessage = 64_MB ;
28+ constexpr i64 kMaxBatchesPerMessage = 8 ;
2929
3030 struct TWriteActorBackoffSettings {
3131 TDuration StartRetryDelay = TDuration::MilliSeconds(250 );
@@ -81,12 +81,12 @@ namespace {
8181namespace NKikimr {
8282namespace NKqp {
8383
84- class TKqpWriteActor : public TActorBootstrapped <TKqpWriteActor >, public NYql::NDq::IDqComputeActorAsyncOutput {
85- using TBase = TActorBootstrapped<TKqpWriteActor >;
84+ class TKqpDirectWriteActor : public TActorBootstrapped <TKqpDirectWriteActor >, public NYql::NDq::IDqComputeActorAsyncOutput {
85+ using TBase = TActorBootstrapped<TKqpDirectWriteActor >;
8686
8787 class TResumeNotificationManager {
8888 public:
89- TResumeNotificationManager (TKqpWriteActor & writer)
89+ TResumeNotificationManager (TKqpDirectWriteActor & writer)
9090 : Writer(writer) {
9191 CheckMemory ();
9292 }
@@ -102,7 +102,7 @@ class TKqpWriteActor : public TActorBootstrapped<TKqpWriteActor>, public NYql::N
102102 }
103103
104104 private:
105- TKqpWriteActor & Writer;
105+ TKqpDirectWriteActor & Writer;
106106 i64 LastFreeMemory = std::numeric_limits<i64 >::max();
107107 };
108108
@@ -127,7 +127,7 @@ class TKqpWriteActor : public TActorBootstrapped<TKqpWriteActor>, public NYql::N
127127 };
128128
129129public:
130- TKqpWriteActor (
130+ TKqpDirectWriteActor (
131131 NKikimrKqp::TKqpTableSinkSettings&& settings,
132132 NYql::NDq::TDqAsyncIoFactory::TSinkArguments&& args,
133133 TIntrusivePtr<TKqpCounters> counters)
@@ -137,6 +137,7 @@ class TKqpWriteActor : public TActorBootstrapped<TKqpWriteActor>, public NYql::N
137137 , Callbacks(args.Callback)
138138 , Counters(counters)
139139 , TypeEnv(args.TypeEnv)
140+ , Alloc(args.Alloc)
140141 , TxId(args.TxId)
141142 , TableId(
142143 Settings.GetTable().GetOwnerId(),
@@ -157,13 +158,13 @@ class TKqpWriteActor : public TActorBootstrapped<TKqpWriteActor>, public NYql::N
157158 void Bootstrap () {
158159 LogPrefix = TStringBuilder () << " SelfId: " << this ->SelfId () << " , " << LogPrefix;
159160 ResolveTable ();
160- Become (&TKqpWriteActor ::StateFunc);
161+ Become (&TKqpDirectWriteActor ::StateFunc);
161162 }
162163
163164 static constexpr char ActorName[] = " KQP_WRITE_ACTOR" ;
164165
165166private:
166- virtual ~TKqpWriteActor () {
167+ virtual ~TKqpDirectWriteActor () {
167168 }
168169
169170 void CommitState (const NYql::NDqProto::TCheckpoint&) final {};
@@ -491,7 +492,7 @@ class TKqpWriteActor : public TActorBootstrapped<TKqpWriteActor>, public NYql::N
491492 << " , Cookie=" << ev->Cookie
492493 << " , LocksCount=" << ev->Get ()->Record .GetTxLocks ().size ());
493494
494- PopShardBatch (ev->Get ()->Record .GetOrigin (), ev->Cookie );
495+ OnMessageAcknowledged (ev->Get ()->Record .GetOrigin (), ev->Cookie );
495496
496497 for (const auto & lock : ev->Get ()->Record .GetTxLocks ()) {
497498 LocksInfo[ev->Get ()->Record .GetOrigin ()].AddAndCheckLock (lock);
@@ -500,7 +501,7 @@ class TKqpWriteActor : public TActorBootstrapped<TKqpWriteActor>, public NYql::N
500501 ProcessBatches ();
501502 }
502503
503- void PopShardBatch (ui64 shardId, ui64 cookie) {
504+ void OnMessageAcknowledged (ui64 shardId, ui64 cookie) {
504505 TResumeNotificationManager resumeNotificator (*this );
505506 const auto removedDataSize = ShardedWriteController->OnMessageAcknowledged (shardId, cookie);
506507 if (removedDataSize) {
@@ -669,7 +670,7 @@ class TKqpWriteActor : public TActorBootstrapped<TKqpWriteActor>, public NYql::N
669670
670671 void PassAway () override {
671672 Send (PipeCacheId, new TEvPipeCache::TEvUnlink (0 ));
672- TActorBootstrapped<TKqpWriteActor >::PassAway ();
673+ TActorBootstrapped<TKqpDirectWriteActor >::PassAway ();
673674 }
674675
675676 void Prepare () {
@@ -693,7 +694,8 @@ class TKqpWriteActor : public TActorBootstrapped<TKqpWriteActor>, public NYql::N
693694 : kMaxBatchesPerMessage ),
694695 },
695696 std::move (columnsMetadata),
696- TypeEnv);
697+ TypeEnv,
698+ Alloc);
697699 } catch (...) {
698700 RuntimeError (
699701 CurrentExceptionMessage (),
@@ -721,7 +723,6 @@ class TKqpWriteActor : public TActorBootstrapped<TKqpWriteActor>, public NYql::N
721723 Callbacks->ResumeExecution ();
722724 }
723725
724- NActors::TActorId TxProxyId = MakeTxProxyID();
725726 NActors::TActorId PipeCacheId = NKikimr::MakePipePerNodeCacheID(false );
726727
727728 TString LogPrefix;
@@ -731,6 +732,7 @@ class TKqpWriteActor : public TActorBootstrapped<TKqpWriteActor>, public NYql::N
731732 NYql::NDq::IDqComputeActorAsyncOutput::ICallbacks * Callbacks = nullptr ;
732733 TIntrusivePtr<TKqpCounters> Counters;
733734 const NMiniKQL::TTypeEnvironment& TypeEnv;
735+ std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc;
734736
735737 const NYql::NDq::TTxId TxId;
736738 const TTableId TableId;
@@ -754,7 +756,7 @@ void RegisterKqpWriteActor(NYql::NDq::TDqAsyncIoFactory& factory, TIntrusivePtr<
754756 factory.RegisterSink <NKikimrKqp::TKqpTableSinkSettings>(
755757 TString (NYql::KqpTableSinkName),
756758 [counters] (NKikimrKqp::TKqpTableSinkSettings&& settings, NYql::NDq::TDqAsyncIoFactory::TSinkArguments&& args) {
757- auto * actor = new TKqpWriteActor (std::move (settings), std::move (args), counters);
759+ auto * actor = new TKqpDirectWriteActor (std::move (settings), std::move (args), counters);
758760 return std::make_pair<NYql::NDq::IDqComputeActorAsyncOutput*, NActors::IActor*>(actor, actor);
759761 });
760762}
0 commit comments