@@ -160,20 +160,20 @@ struct TEvS3FileQueue {
160160
161161 EvEnd
162162 };
163- static_assert(EvEnd < EventSpaceEnd(NKikimr::TKikimrEvents::ES_S3_FILE_QUEUE),
163+ static_assert(EvEnd < EventSpaceEnd(NKikimr::TKikimrEvents::ES_S3_FILE_QUEUE),
164164 " expect EvEnd < EventSpaceEnd(TEvents::ES_S3_FILE_QUEUE)");
165-
165+
166166 struct TEvUpdateConsumersCount :
167167 public TEventPB<TEvUpdateConsumersCount, NS3::FileQueue::TEvUpdateConsumersCount, EvUpdateConsumersCount> {
168-
168+
169169 explicit TEvUpdateConsumersCount (ui64 consumersCountDelta = 0 ) {
170170 Record.SetConsumersCountDelta (consumersCountDelta);
171171 }
172172 };
173173
174174 struct TEvAck :
175175 public TEventPB<TEvAck, NS3::FileQueue::TEvAck, EvAck> {
176-
176+
177177 TEvAck () = default ;
178178
179179 explicit TEvAck (const TMessageTransportMeta& transportMeta) {
@@ -388,6 +388,7 @@ class TS3FileQueueActor : public TActorBootstrapped<TS3FileQueueActor> {
388388 TPathList paths,
389389 size_t prefetchSize,
390390 ui64 fileSizeLimit,
391+ ui64 readLimit,
391392 bool useRuntimeListing,
392393 ui64 consumersCount,
393394 ui64 batchSizeLimit,
@@ -401,6 +402,7 @@ class TS3FileQueueActor : public TActorBootstrapped<TS3FileQueueActor> {
401402 : TxId(std::move(txId))
402403 , PrefetchSize(prefetchSize)
403404 , FileSizeLimit(fileSizeLimit)
405+ , ReadLimit(readLimit)
404406 , MaybeIssues(Nothing())
405407 , UseRuntimeListing(useRuntimeListing)
406408 , ConsumersCount(consumersCount)
@@ -513,7 +515,9 @@ class TS3FileQueueActor : public TActorBootstrapped<TS3FileQueueActor> {
513515 // skip 'directories'
514516 continue ;
515517 }
516- if (object.Size > FileSizeLimit) {
518+
519+ const ui64 bytesUsed = std::min (object.Size , ReadLimit);
520+ if (bytesUsed > FileSizeLimit) {
517521 auto errorMessage = TStringBuilder ()
518522 << " Size of object " << object.Path << " = "
519523 << object.Size
@@ -525,10 +529,10 @@ class TS3FileQueueActor : public TActorBootstrapped<TS3FileQueueActor> {
525529 LOG_T (" TS3FileQueueActor" , " SaveRetrievedResults adding path: " << object.Path << " of size " << object.Size );
526530 TObjectPath objectPath;
527531 objectPath.SetPath (object.Path );
528- objectPath.SetSize (object. Size );
532+ objectPath.SetSize (bytesUsed );
529533 objectPath.SetPathIndex (CurrentDirectoryPathIndex);
530534 Objects.emplace_back (std::move (objectPath));
531- ObjectsTotalSize += object. Size ;
535+ ObjectsTotalSize += bytesUsed ;
532536 }
533537 return true ;
534538 }
@@ -598,7 +602,7 @@ class TS3FileQueueActor : public TActorBootstrapped<TS3FileQueueActor> {
598602 Send (ev->Sender , new TEvS3FileQueue::TEvObjectPathReadError (*MaybeIssues, ev->Get ()->Record .GetTransportMeta ()));
599603 TryFinish (ev->Sender , ev->Get ()->Record .GetTransportMeta ().GetSeqNo ());
600604 }
601-
605+
602606 void HandleUpdateConsumersCount (TEvS3FileQueue::TEvUpdateConsumersCount::TPtr& ev) {
603607 if (!UpdatedConsumers.contains (ev->Sender )) {
604608 UpdatedConsumers.insert (ev->Sender );
@@ -653,7 +657,7 @@ class TS3FileQueueActor : public TActorBootstrapped<TS3FileQueueActor> {
653657
654658 LOG_D (" TS3FileQueueActor" , " SendObjects Sending " << result.size () << " objects to consumer with id " << consumer << " , " << ObjectsTotalSize << " bytes left" );
655659 Send (consumer, new TEvS3FileQueue::TEvObjectPathBatch (std::move (result), HasNoMoreItems (), transportMeta));
656-
660+
657661 if (HasNoMoreItems ()) {
658662 TryFinish (consumer, transportMeta.GetSeqNo ());
659663 }
@@ -675,7 +679,7 @@ class TS3FileQueueActor : public TActorBootstrapped<TS3FileQueueActor> {
675679 }
676680
677681 bool CanSendToConsumer (const TActorId& consumer) {
678- return !UseRuntimeListing || RoundRobinStageFinished ||
682+ return !UseRuntimeListing || RoundRobinStageFinished ||
679683 (StartedConsumers.size () < ConsumersCount && !StartedConsumers.contains (consumer));
680684 }
681685
@@ -753,7 +757,7 @@ class TS3FileQueueActor : public TActorBootstrapped<TS3FileQueueActor> {
753757 }
754758 });
755759 }
756-
760+
757761 void ScheduleRequest (const TActorId& consumer, const TMessageTransportMeta& transportMeta) {
758762 PendingRequests[consumer].push_back (transportMeta);
759763 HasPendingRequests = true ;
@@ -790,7 +794,7 @@ class TS3FileQueueActor : public TActorBootstrapped<TS3FileQueueActor> {
790794 }
791795 }
792796 }
793-
797+
794798 void TryFinish (const TActorId& consumer, ui64 seqNo) {
795799 LOG_T (" TS3FileQueueActor" , " TryFinish from consumer " << consumer << " , " << FinishedConsumers.size () << " consumers already finished, seqNo=" << seqNo);
796800 if (FinishingConsumerToLastSeqNo.contains (consumer)) {
@@ -814,6 +818,7 @@ class TS3FileQueueActor : public TActorBootstrapped<TS3FileQueueActor> {
814818
815819 size_t PrefetchSize;
816820 ui64 FileSizeLimit;
821+ ui64 ReadLimit;
817822 TMaybe<NS3Lister::IS3Lister::TPtr> MaybeLister = Nothing();
818823 TMaybe<NThreading::TFuture<NS3Lister::TListResult>> ListingFuture;
819824 size_t CurrentDirectoryPathIndex = 0 ;
@@ -838,7 +843,7 @@ class TS3FileQueueActor : public TActorBootstrapped<TS3FileQueueActor> {
838843 const TString Pattern;
839844 const ES3PatternVariant PatternVariant;
840845 const ES3PatternType PatternType;
841-
846+
842847 static constexpr TDuration PoisonTimeout = TDuration::Hours(3 );
843848 static constexpr TDuration RoundRobinStageTimeout = TDuration::Seconds(3 );
844849};
@@ -918,6 +923,7 @@ class TS3ReadActor : public TActorBootstrapped<TS3ReadActor>, public IDqComputeA
918923 std::move (Paths),
919924 ReadActorFactoryCfg.MaxInflight * 2 ,
920925 FileSizeLimit,
926+ SizeLimit,
921927 false ,
922928 1 ,
923929 FileQueueBatchSizeLimit,
@@ -1097,7 +1103,7 @@ class TS3ReadActor : public TActorBootstrapped<TS3ReadActor>, public IDqComputeA
10971103 void HandleAck (TEvS3FileQueue::TEvAck::TPtr& ev) {
10981104 FileQueueEvents.OnEventReceived (ev);
10991105 }
1100-
1106+
11011107 static void OnDownloadFinished (TActorSystem* actorSystem, TActorId selfId, const TString& requestId, IHTTPGateway::TResult&& result, size_t pathInd, const TString path) {
11021108 if (!result.Issues ) {
11031109 actorSystem->Send (new IEventHandle (selfId, TActorId (), new TEvPrivate::TEvReadResult (std::move (result.Content ), requestId, pathInd, path)));
@@ -1209,7 +1215,7 @@ class TS3ReadActor : public TActorBootstrapped<TS3ReadActor>, public IDqComputeA
12091215 auto issues = NS3Util::AddParentIssue (TStringBuilder{} << " Error while reading file " << path << " with request id [" << requestId << " ]" , TIssues{result->Get ()->Error });
12101216 Send (ComputeActorId, new TEvAsyncInputError (InputIndex, std::move (issues), NYql::NDqProto::StatusIds::EXTERNAL_ERROR));
12111217 }
1212-
1218+
12131219 void Handle (const NYql::NDq::TEvRetryQueuePrivate::TEvRetry::TPtr&) {
12141220 FileQueueEvents.Retry ();
12151221 }
@@ -2088,7 +2094,7 @@ class TS3ReadCoroImpl : public TActorCoroImpl {
20882094 if (isCancelled) {
20892095 LOG_CORO_D (" RunCoroBlockArrowParserOverHttp - STOPPED ON SATURATION, downloaded " <<
20902096 QueueBufferCounter->DownloadedBytes << " bytes" );
2091- break ;
2097+ break ;
20922098 }
20932099 }
20942100 }
@@ -2538,6 +2544,7 @@ class TS3StreamReadActor : public TActorBootstrapped<TS3StreamReadActor>, public
25382544 ::NMonitoring::TDynamicCounterPtr counters,
25392545 ::NMonitoring::TDynamicCounterPtr taskCounters,
25402546 ui64 fileSizeLimit,
2547+ ui64 readLimit,
25412548 std::optional<ui64> rowsLimitHint,
25422549 IMemoryQuotaManager::TPtr memoryQuotaManager,
25432550 bool useRuntimeListing,
@@ -2564,6 +2571,7 @@ class TS3StreamReadActor : public TActorBootstrapped<TS3StreamReadActor>, public
25642571 , TaskCounters(std::move(taskCounters))
25652572 , FileQueueActor(fileQueueActor)
25662573 , FileSizeLimit(fileSizeLimit)
2574+ , ReadLimit(readLimit)
25672575 , MemoryQuotaManager(memoryQuotaManager)
25682576 , UseRuntimeListing(useRuntimeListing)
25692577 , FileQueueBatchSizeLimit(fileQueueBatchSizeLimit)
@@ -2622,6 +2630,7 @@ class TS3StreamReadActor : public TActorBootstrapped<TS3StreamReadActor>, public
26222630 std::move (Paths),
26232631 ReadActorFactoryCfg.MaxInflight * 2 ,
26242632 FileSizeLimit,
2633+ ReadLimit,
26252634 false ,
26262635 1 ,
26272636 FileQueueBatchSizeLimit,
@@ -2784,7 +2793,7 @@ class TS3StreamReadActor : public TActorBootstrapped<TS3StreamReadActor>, public
27842793 void CommitState (const NDqProto::TCheckpoint&) final {}
27852794
27862795 ui64 GetInputIndex () const final {
2787- return InputIndex;
2796+ return InputIndex;
27882797 }
27892798
27902799 const TDqAsyncStats& GetIngressStats () const final {
@@ -3038,7 +3047,7 @@ class TS3StreamReadActor : public TActorBootstrapped<TS3StreamReadActor>, public
30383047 }
30393048 }
30403049 }
3041-
3050+
30423051 void Handle (TEvS3FileQueue::TEvAck::TPtr& ev) {
30433052 FileQueueEvents.OnEventReceived (ev);
30443053 }
@@ -3136,6 +3145,7 @@ class TS3StreamReadActor : public TActorBootstrapped<TS3StreamReadActor>, public
31363145 std::set<NActors::TActorId> CoroActors;
31373146 NActors::TActorId FileQueueActor;
31383147 const ui64 FileSizeLimit;
3148+ const ui64 ReadLimit;
31393149 bool Bootstrapped = false ;
31403150 IMemoryQuotaManager::TPtr MemoryQuotaManager;
31413151 bool UseRuntimeListing;
@@ -3295,6 +3305,7 @@ IActor* CreateS3FileQueueActor(
32953305 TPathList paths,
32963306 size_t prefetchSize,
32973307 ui64 fileSizeLimit,
3308+ ui64 readLimit,
32983309 bool useRuntimeListing,
32993310 ui64 consumersCount,
33003311 ui64 batchSizeLimit,
@@ -3310,6 +3321,7 @@ IActor* CreateS3FileQueueActor(
33103321 paths,
33113322 prefetchSize,
33123323 fileSizeLimit,
3324+ readLimit,
33133325 useRuntimeListing,
33143326 consumersCount,
33153327 batchSizeLimit,
@@ -3394,15 +3406,15 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor(
33943406 if (params.GetRowsLimitHint () != 0 ) {
33953407 rowsLimitHint = params.GetRowsLimitHint ();
33963408 }
3397-
3409+
33983410 TActorId fileQueueActor;
33993411 if (auto it = settings.find (" fileQueueActor" ); it != settings.cend ()) {
34003412 NActorsProto::TActorId protoId;
34013413 TMemoryInput inputStream (it->second );
34023414 ParseFromTextFormat (inputStream, protoId);
34033415 fileQueueActor = ActorIdFromProto (protoId);
34043416 }
3405-
3417+
34063418 ui64 fileQueueBatchSizeLimit = 0 ;
34073419 if (auto it = settings.find (" fileQueueBatchSizeLimit" ); it != settings.cend ()) {
34083420 fileQueueBatchSizeLimit = FromString<ui64>(it->second );
@@ -3412,7 +3424,7 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor(
34123424 if (auto it = settings.find (" fileQueueBatchObjectCountLimit" ); it != settings.cend ()) {
34133425 fileQueueBatchObjectCountLimit = FromString<ui64>(it->second );
34143426 }
3415-
3427+
34163428 ui64 fileQueueConsumersCountDelta = 0 ;
34173429 if (readRanges.size () > 1 ) {
34183430 fileQueueConsumersCountDelta = readRanges.size () - 1 ;
@@ -3520,9 +3532,14 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor(
35203532
35213533#undef SET_FLAG
35223534#undef SUPPORTED_FLAGS
3535+ ui64 sizeLimit = std::numeric_limits<ui64>::max ();
3536+ if (const auto it = settings.find (" sizeLimit" ); settings.cend () != it) {
3537+ sizeLimit = FromString<ui64>(it->second );
3538+ }
3539+
35233540 const auto actor = new TS3StreamReadActor (inputIndex, statsLevel, txId, std::move (gateway), holderFactory, params.GetUrl (), authInfo, pathPattern, pathPatternVariant,
35243541 std::move (paths), addPathIndex, readSpec, computeActorId, retryPolicy,
3525- cfg, counters, taskCounters, fileSizeLimit, rowsLimitHint, memoryQuotaManager,
3542+ cfg, counters, taskCounters, fileSizeLimit, sizeLimit, rowsLimitHint, memoryQuotaManager,
35263543 params.GetUseRuntimeListing (), fileQueueActor, fileQueueBatchSizeLimit, fileQueueBatchObjectCountLimit, fileQueueConsumersCountDelta);
35273544
35283545 return {actor, actor};
0 commit comments