@@ -2339,6 +2339,7 @@ class TYtPhysicalFinalizingTransformer : public TSyncTransformerBase {
23392339 continue ;
23402340 }
23412341
2342+ const size_t opOutTables = op.Output ().Size ();
23422343 std::map<size_t , std::pair<std::vector<const TExprNode*>, std::vector<const TExprNode*>>> maps; // output -> pair<vector<YtMap>, vector<other YtOutput's>>
23432344 for (size_t i = 0 ; i < x.second .size (); ++i) {
23442345 auto reader = std::get<0 >(x.second [i]);
@@ -2354,7 +2355,9 @@ class TYtPhysicalFinalizingTransformer : public TSyncTransformerBase {
23542355 if (newPair && TYtMap::Match (reader)) {
23552356 const auto outerMap = TYtMap (reader);
23562357 if ((outerMap.World ().Ref ().IsWorld () || outerMap.World ().Raw () == op.World ().Raw ())
2357- && outerMap.Input ().Size () == 1 && outerMap.DataSink ().Cluster ().Value () == op.DataSink ().Cluster ().Value ()
2358+ && outerMap.Input ().Size () == 1
2359+ && outerMap.Output ().Size () + item.first .size () <= maxOutTables // fast check for too many operations
2360+ && outerMap.DataSink ().Cluster ().Value () == op.DataSink ().Cluster ().Value ()
23582361 && NYql::HasSetting (op.Settings ().Ref (), EYtSettingType::Flow) == NYql::HasSetting (outerMap.Settings ().Ref (), EYtSettingType::Flow)
23592362 && !NYql::HasSetting (op.Settings ().Ref (), EYtSettingType::JobCount)
23602363 && !NYql::HasSetting (outerMap.Settings ().Ref (), EYtSettingType::JobCount)
@@ -2382,7 +2385,7 @@ class TYtPhysicalFinalizingTransformer : public TSyncTransformerBase {
23822385 if (AnyOf (maps, [](const auto & item) { return item.second .first .size () > 0 ; })) {
23832386 TMap<TStringBuf, ui64> memUsage;
23842387 size_t currenFiles = 1 ; // jobstate. Take into account only once
2385- size_t currOutTables = op. Output (). Size () ;
2388+ size_t currOutTables = opOutTables ;
23862389
23872390 TExprNode::TPtr updatedBody = lambda.Body ().Ptr ();
23882391 if (maxJobMemoryLimit) {
@@ -2397,10 +2400,11 @@ class TYtPhysicalFinalizingTransformer : public TSyncTransformerBase {
23972400 TMap<TStringBuf, double > cpuUsage;
23982401 for (auto & item: maps) {
23992402 if (!item.second .first .empty ()) {
2403+ size_t otherTablesDelta = item.second .second .empty () ? 1 : 0 ;
24002404 for (auto it = item.second .first .begin (); it != item.second .first .end (); ) {
24012405 const auto outerMap = TYtMap (*it);
24022406
2403- const size_t outTablesDelta = outerMap.Output ().Size () - size_t (item. second . second . empty ()) ;
2407+ const size_t outTablesDelta = outerMap.Output ().Size () - otherTablesDelta ;
24042408
24052409 updatedBody = outerMap.Mapper ().Body ().Ptr ();
24062410 if (maxJobMemoryLimit) {
@@ -2418,7 +2422,7 @@ class TYtPhysicalFinalizingTransformer : public TSyncTransformerBase {
24182422 cpuUsage.clear ();
24192423 ScanResourceUsage (*updatedBody, *State_->Configuration , State_->Types , pMemUsage, &cpuUsage, &newCurrenFiles);
24202424
2421- auto usedMemory = Accumulate (memUsage .begin (), memUsage .end (), switchLimit,
2425+ auto usedMemory = Accumulate (newMemUsage .begin (), newMemUsage .end (), switchLimit,
24222426 [](ui64 sum, const std::pair<const TStringBuf, ui64>& val) { return sum + val.second ; });
24232427
24242428 // Take into account codec input/output buffers (one for all inputs and one per output)
@@ -2453,12 +2457,16 @@ class TYtPhysicalFinalizingTransformer : public TSyncTransformerBase {
24532457 if (skip) {
24542458 // Move to other usages
24552459 it = item.second .first .erase (it);
2460+ if (item.second .second .empty ()) {
2461+ ++currOutTables;
2462+ }
24562463 item.second .second .push_back (outerMap.Input ().Item (0 ).Paths ().Item (0 ).Table ().Raw ());
24572464 continue ;
24582465 }
24592466 currenFiles = newCurrenFiles;
24602467 memUsage = std::move (newMemUsage);
24612468 currOutTables += outTablesDelta;
2469+ otherTablesDelta = 0 ; // Take into account only once
24622470 ++it;
24632471 }
24642472 }
0 commit comments