@@ -76,6 +76,12 @@ inline bool IsDebugLogEnabled() {
7676 TlsActivationContext->LoggerSettings ()->Satisfies (NActors::NLog::PRI_DEBUG, NKikimrServices::KQP_EXECUTER);
7777}
7878
79+ struct TShardRangesWithShardId {
80+ TMaybe<ui64> ShardId;
81+ const TShardKeyRanges* Ranges;
82+ };
83+
84+
7985TActorId ReportToRl (ui64 ru, const TString& database, const TString& userToken,
8086 const NKikimrKqp::TRlPath& path);
8187
@@ -986,21 +992,32 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
986992 }
987993 }
988994
989- TVector<TVector<const TShardKeyRanges*>> DistributeShardsToTasks (TVector<const TShardKeyRanges*> shardsRanges, const size_t tasksCount, const TVector<NScheme::TTypeInfo>& keyTypes) {
990- std::sort (std::begin (shardsRanges), std::end (shardsRanges), [&](const TShardKeyRanges* lhs, const TShardKeyRanges* rhs) {
995+ TVector<TVector<TShardRangesWithShardId>> DistributeShardsToTasks (TVector<TShardRangesWithShardId> shardsRanges, const size_t tasksCount, const TVector<NScheme::TTypeInfo>& keyTypes) {
996+ if (IsDebugLogEnabled ()) {
997+ TStringBuilder sb;
998+ sb << " Distrubiting shards to tasks: [" ;
999+ for (size_t i = 0 ; i < shardsRanges.size (); i++) {
1000+ sb << " # " << i << " : " << shardsRanges[i].Ranges ->ToString (keyTypes, *AppData ()->TypeRegistry );
1001+ }
1002+
1003+ sb << " ]." ;
1004+ LOG_D (sb);
1005+ }
1006+
1007+ std::sort (std::begin (shardsRanges), std::end (shardsRanges), [&](const TShardRangesWithShardId& lhs, const TShardRangesWithShardId& rhs) {
9911008 // Special case for infinity
992- if (lhs->GetRightBorder ().first ->GetCells ().empty () || rhs->GetRightBorder ().first ->GetCells ().empty ()) {
993- YQL_ENSURE (!lhs->GetRightBorder ().first ->GetCells ().empty () || !rhs->GetRightBorder ().first ->GetCells ().empty ());
994- return rhs->GetRightBorder ().first ->GetCells ().empty ();
1009+ if (lhs. Ranges ->GetRightBorder ().first ->GetCells ().empty () || rhs. Ranges ->GetRightBorder ().first ->GetCells ().empty ()) {
1010+ YQL_ENSURE (!lhs. Ranges ->GetRightBorder ().first ->GetCells ().empty () || !rhs. Ranges ->GetRightBorder ().first ->GetCells ().empty ());
1011+ return rhs. Ranges ->GetRightBorder ().first ->GetCells ().empty ();
9951012 }
9961013 return CompareTypedCellVectors (
997- lhs->GetRightBorder ().first ->GetCells ().data (),
998- rhs->GetRightBorder ().first ->GetCells ().data (),
1014+ lhs. Ranges ->GetRightBorder ().first ->GetCells ().data (),
1015+ rhs. Ranges ->GetRightBorder ().first ->GetCells ().data (),
9991016 keyTypes.data (), keyTypes.size ()) < 0 ;
10001017 });
10011018
10021019 // One shard (ranges set) can be assigned only to one task. Otherwise, we can break some optimizations like removing unnecessary shuffle.
1003- TVector<TVector<const TShardKeyRanges* >> result (tasksCount);
1020+ TVector<TVector<TShardRangesWithShardId >> result (tasksCount);
10041021 size_t shardIndex = 0 ;
10051022 for (size_t taskIndex = 0 ; taskIndex < tasksCount; ++taskIndex) {
10061023 const size_t tasksLeft = tasksCount - taskIndex;
@@ -1129,7 +1146,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
11291146 };
11301147
11311148 THashMap<ui64, TVector<ui64>> nodeIdToTasks;
1132- THashMap<ui64, TVector<const TShardKeyRanges* >> nodeIdToShardKeyRanges;
1149+ THashMap<ui64, TVector<TShardRangesWithShardId >> nodeIdToShardKeyRanges;
11331150
11341151 auto addPartiton = [&](
11351152 ui64 taskLocation,
@@ -1155,11 +1172,11 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
11551172 const auto maxScanTasksPerNode = GetScanTasksPerNode (stageInfo, /* isOlapScan */ false , *nodeId);
11561173 auto & nodeTasks = nodeIdToTasks[*nodeId];
11571174 if (nodeTasks.size () < maxScanTasksPerNode) {
1158- const auto & task = createNewTask (nodeId, taskLocation, shardId , maxInFlightShards);
1175+ const auto & task = createNewTask (nodeId, taskLocation, {} , maxInFlightShards);
11591176 nodeTasks.push_back (task.Id );
11601177 }
11611178
1162- nodeIdToShardKeyRanges[*nodeId].push_back (&*shardInfo.KeyReadRanges );
1179+ nodeIdToShardKeyRanges[*nodeId].push_back (TShardRangesWithShardId{shardId, &*shardInfo.KeyReadRanges } );
11631180 } else {
11641181 auto & task = createNewTask (nodeId, taskLocation, shardId, maxInFlightShards);
11651182 const auto & stageSource = stage.GetSources (0 );
@@ -1186,12 +1203,12 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
11861203
11871204 const auto & shardsRangesForTask = rangesDistribution[taskIndex];
11881205
1189- if (shardsRangesForTask.size () > 1 ) {
1190- settings->ClearShardIdHint ( );
1206+ if (shardsRangesForTask.size () == 1 && shardsRangesForTask[ 0 ]. ShardId ) {
1207+ settings->SetShardIdHint (*shardsRangesForTask[ 0 ]. ShardId );
11911208 }
11921209
11931210 for (const auto & shardRanges : shardsRangesForTask) {
1194- shardRanges->SerializeTo (settings);
1211+ shardRanges. Ranges ->SerializeTo (settings);
11951212 }
11961213 }
11971214 }
0 commit comments