Skip to content

Commit 349f53b

Browse files
authored
Merge 4b86a71 into 14389e2
2 parents 14389e2 + 4b86a71 commit 349f53b

File tree

13 files changed

+148
-116
lines changed

13 files changed

+148
-116
lines changed

ydb/core/kqp/compile_service/kqp_compile_actor.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -651,6 +651,7 @@ void ApplyServiceConfig(TKikimrConfiguration& kqpConfig, const TTableServiceConf
651651
kqpConfig.EnableConstantFolding = serviceConfig.GetEnableConstantFolding();
652652
kqpConfig.SetDefaultEnabledSpillingNodes(serviceConfig.GetEnableSpillingNodes());
653653
kqpConfig.EnableSnapshotIsolationRW = serviceConfig.GetEnableSnapshotIsolationRW();
654+
kqpConfig.AllowMultiBroadcasts = serviceConfig.GetAllowMultiBroadcasts();
654655

655656
if (const auto limit = serviceConfig.GetResourceManager().GetMkqlHeavyProgramMemoryLimit()) {
656657
kqpConfig._KqpYqlCombinerMemoryLimit = std::max(1_GB, limit - (limit >> 2U));

ydb/core/kqp/compile_service/kqp_compile_service.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -279,6 +279,7 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
279279
void HandleConfig(NConsole::TEvConsole::TEvConfigNotificationRequest::TPtr& ev) {
280280
auto &event = ev->Get()->Record;
281281

282+
bool allowMultiBroadcasts = TableServiceConfig.GetAllowMultiBroadcasts();
282283
bool enableKqpDataQueryStreamLookup = TableServiceConfig.GetEnableKqpDataQueryStreamLookup();
283284
bool enableKqpDataQueryStreamIdxLookupJoin = TableServiceConfig.GetEnableKqpDataQueryStreamIdxLookupJoin();
284285
bool enableKqpScanQueryStreamIdxLookupJoin = TableServiceConfig.GetEnableKqpScanQueryStreamIdxLookupJoin();
@@ -343,7 +344,8 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
343344
TableServiceConfig.GetEnableImplicitQueryParameterTypes() != enableImplicitQueryParameterTypes ||
344345
TableServiceConfig.GetEnablePgConstsToParams() != enablePgConstsToParams ||
345346
TableServiceConfig.GetEnablePerStatementQueryExecution() != enablePerStatementQueryExecution ||
346-
TableServiceConfig.GetEnableSnapshotIsolationRW() != enableSnapshotIsolationRW) {
347+
TableServiceConfig.GetEnableSnapshotIsolationRW() != enableSnapshotIsolationRW ||
348+
TableServiceConfig.GetAllowMultiBroadcasts() != allowMultiBroadcasts) {
347349

348350
QueryCache->Clear();
349351

ydb/core/kqp/executer_actor/kqp_data_executer.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2769,13 +2769,14 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
27692769

27702770
// error
27712771
LOG_T("Updating channels after the creation of compute actors");
2772+
Y_ENSURE(Planner);
27722773
THashMap<TActorId, THashSet<ui64>> updates;
27732774
for (ui64 taskId : ComputeTasks) {
27742775
auto& task = TasksGraph.GetTask(taskId);
27752776
if (task.ComputeActorId)
2776-
CollectTaskChannelsUpdates(task, updates);
2777+
Planner->CollectTaskChannelsUpdates(task, updates);
27772778
}
2778-
PropagateChannelsUpdates(updates);
2779+
Planner->PropagateChannelsUpdates(updates);
27792780
}
27802781

27812782
void ExecuteTopicTabletTransactions(TTopicTabletTxs& topicTxs) {

ydb/core/kqp/executer_actor/kqp_executer_impl.h

Lines changed: 6 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -512,8 +512,8 @@ class TKqpExecuterBase : public TActor<TDerived> {
512512
if (populateChannels) {
513513
auto& task = TasksGraph.GetTask(taskId);
514514
THashMap<TActorId, THashSet<ui64>> updates;
515-
CollectTaskChannelsUpdates(task, updates);
516-
PropagateChannelsUpdates(updates);
515+
Planner->CollectTaskChannelsUpdates(task, updates);
516+
Planner->PropagateChannelsUpdates(updates);
517517
}
518518
break;
519519
}
@@ -751,14 +751,14 @@ class TKqpExecuterBase : public TActor<TDerived> {
751751
TActorId computeActorId = ActorIdFromProto(startedTask.GetActorId());
752752
LOG_D("Executing task: " << taskId << " on compute actor: " << computeActorId);
753753
YQL_ENSURE(Planner);
754-
bool channelUpdates = Planner->AcknowledgeCA(taskId, computeActorId, nullptr);
755-
if (channelUpdates) {
756-
CollectTaskChannelsUpdates(task, channelsUpdates);
754+
bool ack = Planner->AcknowledgeCA(taskId, computeActorId, nullptr);
755+
if (ack) {
756+
Planner->CollectTaskChannelsUpdates(task, channelsUpdates);
757757
}
758758

759759
}
760760

761-
PropagateChannelsUpdates(channelsUpdates);
761+
Planner->PropagateChannelsUpdates(channelsUpdates);
762762
}
763763

764764
void HandleAbortExecution(TEvKqp::TEvAbortExecution::TPtr& ev) {
@@ -777,68 +777,6 @@ class TKqpExecuterBase : public TActor<TDerived> {
777777
}
778778

779779
protected:
780-
void CollectTaskChannelsUpdates(const TKqpTasksGraph::TTaskType& task, THashMap<TActorId, THashSet<ui64>>& updates) {
781-
YQL_ENSURE(task.ComputeActorId);
782-
783-
LOG_T("Collect channels updates for task: " << task.Id << " at actor " << task.ComputeActorId);
784-
785-
auto& selfUpdates = updates[task.ComputeActorId];
786-
787-
for (auto& input : task.Inputs) {
788-
for (auto channelId : input.Channels) {
789-
auto& channel = TasksGraph.GetChannel(channelId);
790-
YQL_ENSURE(channel.DstTask == task.Id);
791-
YQL_ENSURE(channel.SrcTask);
792-
793-
auto& srcTask = TasksGraph.GetTask(channel.SrcTask);
794-
if (srcTask.ComputeActorId) {
795-
updates[srcTask.ComputeActorId].emplace(channelId);
796-
selfUpdates.emplace(channelId);
797-
}
798-
799-
LOG_T("Task: " << task.Id << ", input channelId: " << channelId << ", src task: " << channel.SrcTask
800-
<< ", at actor " << srcTask.ComputeActorId);
801-
}
802-
}
803-
804-
for (auto& output : task.Outputs) {
805-
for (auto channelId : output.Channels) {
806-
selfUpdates.emplace(channelId);
807-
808-
auto& channel = TasksGraph.GetChannel(channelId);
809-
YQL_ENSURE(channel.SrcTask == task.Id);
810-
811-
if (channel.DstTask) {
812-
auto& dstTask = TasksGraph.GetTask(channel.DstTask);
813-
if (dstTask.ComputeActorId) {
814-
// not a optimal solution
815-
updates[dstTask.ComputeActorId].emplace(channelId);
816-
}
817-
818-
LOG_T("Task: " << task.Id << ", output channelId: " << channelId << ", dst task: " << channel.DstTask
819-
<< ", at actor " << dstTask.ComputeActorId);
820-
}
821-
}
822-
}
823-
}
824-
825-
void PropagateChannelsUpdates(const THashMap<TActorId, THashSet<ui64>>& updates) {
826-
for (auto& pair : updates) {
827-
auto computeActorId = pair.first;
828-
auto& channelIds = pair.second;
829-
830-
auto channelsInfoEv = MakeHolder<NYql::NDq::TEvDqCompute::TEvChannelsInfo>();
831-
auto& record = channelsInfoEv->Record;
832-
833-
for (auto& channelId : channelIds) {
834-
FillChannelDesc(TasksGraph, *record.AddUpdate(), TasksGraph.GetChannel(channelId), TasksGraph.GetMeta().ChannelTransportVersion, false);
835-
}
836-
837-
LOG_T("Sending channels info to compute actor: " << computeActorId << ", channels: " << channelIds.size());
838-
this->Send(computeActorId, channelsInfoEv.Release());
839-
}
840-
}
841-
842780
void UpdateResourcesUsage(bool force) {
843781
TInstant now = TActivationContext::Now();
844782
if ((now - LastResourceUsageUpdate < ResourceUsageUpdateInterval) && !force)

ydb/core/kqp/executer_actor/kqp_planner.cpp

Lines changed: 68 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ using namespace NActors;
1313

1414
namespace NKikimr::NKqp {
1515

16+
#define LOG_T(stream) LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "TxId: " << TxId << ". " << "Ctx: " << *UserRequestContext << ". " << stream)
1617
#define LOG_D(stream) LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "TxId: " << TxId << ". " << "Ctx: " << *UserRequestContext << ". " << stream)
1718
#define LOG_I(stream) LOG_INFO_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "TxId: " << TxId << ". " << "Ctx: " << *UserRequestContext << ". " << stream)
1819
#define LOG_C(stream) LOG_CRIT_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "TxId: " << TxId << ". " << "Ctx: " << *UserRequestContext << ". " << stream)
@@ -511,7 +512,11 @@ TString TKqpPlanner::ExecuteDataComputeTask(ui64 taskId, ui32 computeTasksSize)
511512

512513
TActorId* actorId = std::get_if<TActorId>(&startResult);
513514
Y_ABORT_UNLESS(actorId);
514-
AcknowledgeCA(taskId, *actorId, nullptr);
515+
Y_ABORT_UNLESS(AcknowledgeCA(taskId, *actorId, nullptr));
516+
517+
THashMap<TActorId, THashSet<ui64>> updates;
518+
CollectTaskChannelsUpdates(task, updates);
519+
PropagateChannelsUpdates(updates);
515520
return TString();
516521
}
517522

@@ -761,6 +766,68 @@ ui32 TKqpPlanner::CalcSendMessageFlagsForNode(ui32 nodeId) {
761766
return flags;
762767
}
763768

769+
void TKqpPlanner::PropagateChannelsUpdates(const THashMap<TActorId, THashSet<ui64>>& updates) {
770+
for (auto& pair : updates) {
771+
auto computeActorId = pair.first;
772+
auto& channelIds = pair.second;
773+
774+
auto channelsInfoEv = MakeHolder<NYql::NDq::TEvDqCompute::TEvChannelsInfo>();
775+
auto& record = channelsInfoEv->Record;
776+
777+
for (auto& channelId : channelIds) {
778+
FillChannelDesc(TasksGraph, *record.AddUpdate(), TasksGraph.GetChannel(channelId), TasksGraph.GetMeta().ChannelTransportVersion, false);
779+
}
780+
781+
LOG_T("Sending channels info to compute actor: " << computeActorId << ", channels: " << channelIds.size());
782+
TlsActivationContext->Send(std::make_unique<NActors::IEventHandle>(computeActorId, ExecuterId, channelsInfoEv.Release()));
783+
}
784+
}
785+
786+
void TKqpPlanner::CollectTaskChannelsUpdates(const TKqpTasksGraph::TTaskType& task, THashMap<TActorId, THashSet<ui64>>& updates) {
787+
YQL_ENSURE(task.ComputeActorId);
788+
789+
LOG_T("Collect channels updates for task: " << task.Id << " at actor " << task.ComputeActorId);
790+
791+
auto& selfUpdates = updates[task.ComputeActorId];
792+
793+
for (auto& input : task.Inputs) {
794+
for (auto channelId : input.Channels) {
795+
auto& channel = TasksGraph.GetChannel(channelId);
796+
YQL_ENSURE(channel.DstTask == task.Id);
797+
YQL_ENSURE(channel.SrcTask);
798+
799+
auto& srcTask = TasksGraph.GetTask(channel.SrcTask);
800+
if (srcTask.ComputeActorId) {
801+
updates[srcTask.ComputeActorId].emplace(channelId);
802+
selfUpdates.emplace(channelId);
803+
}
804+
805+
LOG_T("Task: " << task.Id << ", input channelId: " << channelId << ", src task: " << channel.SrcTask
806+
<< ", at actor " << srcTask.ComputeActorId);
807+
}
808+
}
809+
810+
for (auto& output : task.Outputs) {
811+
for (auto channelId : output.Channels) {
812+
selfUpdates.emplace(channelId);
813+
814+
auto& channel = TasksGraph.GetChannel(channelId);
815+
YQL_ENSURE(channel.SrcTask == task.Id);
816+
817+
if (channel.DstTask) {
818+
auto& dstTask = TasksGraph.GetTask(channel.DstTask);
819+
if (dstTask.ComputeActorId) {
820+
// not a optimal solution
821+
updates[dstTask.ComputeActorId].emplace(channelId);
822+
}
823+
824+
LOG_T("Task: " << task.Id << ", output channelId: " << channelId << ", dst task: " << channel.DstTask
825+
<< ", at actor " << dstTask.ComputeActorId);
826+
}
827+
}
828+
}
829+
}
830+
764831
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
765832
std::unique_ptr<TKqpPlanner> CreateKqpPlanner(TKqpPlanner::TArgs args) {
766833
return std::make_unique<TKqpPlanner>(std::move(args));

ydb/core/kqp/executer_actor/kqp_planner.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,9 @@ class TKqpPlanner {
8686
ui32 GetnScanTasks();
8787
ui32 GetnComputeTasks();
8888

89+
void PropagateChannelsUpdates(const THashMap<TActorId, THashSet<ui64>>& updates);
90+
void CollectTaskChannelsUpdates(const TKqpTasksGraph::TTaskType& task, THashMap<TActorId, THashSet<ui64>>& updates);
91+
8992
private:
9093

9194
const IKqpGateway::TKqpSnapshot& GetSnapshot() const;

ydb/core/kqp/opt/physical/kqp_opt_phy.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -450,8 +450,10 @@ class TKqpPhysicalOptTransformer : public TOptimizeTransformerBase {
450450
// It is now possible as we don't use datashard transactions for reads in data queries.
451451
bool pushLeftStage = (KqpCtx.IsScanQuery() || KqpCtx.Config->EnableKqpDataQueryStreamLookup) && AllowFuseJoinInputs(node);
452452
bool shuffleEliminationWithMap = KqpCtx.Config->OptShuffleEliminationWithMap.Get().GetOrElse(false);
453+
bool rightCollectStage = !KqpCtx.Config->AllowMultiBroadcasts;
453454
TExprBase output = DqBuildJoin(node, ctx, optCtx, *getParents(), IsGlobal,
454-
pushLeftStage, KqpCtx.Config->GetHashJoinMode(), false, KqpCtx.Config->UseGraceJoinCoreForMap.Get().GetOrElse(false), KqpCtx.Config->OptShuffleElimination.Get().GetOrElse(false), shuffleEliminationWithMap
455+
pushLeftStage, KqpCtx.Config->GetHashJoinMode(), false, KqpCtx.Config->UseGraceJoinCoreForMap.Get().GetOrElse(false), KqpCtx.Config->OptShuffleElimination.Get().GetOrElse(false), shuffleEliminationWithMap,
456+
rightCollectStage
455457
);
456458
DumpAppliedRule("BuildJoin", node.Ptr(), output.Ptr(), ctx);
457459
return output;

ydb/core/kqp/provider/yql_kikimr_settings.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,7 @@ struct TKikimrConfiguration : public TKikimrSettings, public NCommon::TSettingDi
180180
ui64 DefaultEnableSpillingNodes = 0;
181181
bool EnableAntlr4Parser = false;
182182
bool EnableSnapshotIsolationRW = false;
183+
bool AllowMultiBroadcasts = false;
183184

184185
void SetDefaultEnabledSpillingNodes(const TString& node);
185186
ui64 GetEnabledSpillingNodes() const;

ydb/core/protos/table_service_config.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -362,4 +362,6 @@ message TTableServiceConfig {
362362
optional bool EnableSnapshotIsolationRW = 76 [default = false];
363363
optional bool EnableStreamWrite = 77 [default = false];
364364
optional bool EnableBatchUpdates = 78 [default = false];
365+
366+
optional bool AllowMultiBroadcasts = 79 [default = false];
365367
};

ydb/library/yql/dq/opt/dq_opt_join.cpp

Lines changed: 27 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -708,7 +708,7 @@ TExprBase DqRewriteLeftPureJoin(const TExprBase node, TExprContext& ctx, const T
708708
.Done();
709709
}
710710

711-
TExprBase DqBuildPhyJoin(const TDqJoin& join, bool pushLeftStage, TExprContext& ctx, IOptimizationContext& optCtx, bool useGraceCoreForMap) {
711+
TExprBase DqBuildPhyJoin(const TDqJoin& join, bool pushLeftStage, TExprContext& ctx, IOptimizationContext& optCtx, bool useGraceCoreForMap, bool buildCollectStage) {
712712
static const std::set<std::string_view> supportedTypes = {
713713
"Inner"sv,
714714
"Left"sv,
@@ -745,24 +745,33 @@ TExprBase DqBuildPhyJoin(const TDqJoin& join, bool pushLeftStage, TExprContext&
745745
TMaybeNode<TDqCnBroadcast> rightBroadcast;
746746
TNodeOnNodeOwnedMap rightPrecomputes;
747747

748-
if (rightCn) {
749-
auto collectRightStage = Build<TDqStage>(ctx, join.Pos())
750-
.Inputs()
751-
.Add(rightCn.Cast())
752-
.Build()
753-
.Program()
754-
.Args({"stream"})
755-
.Body("stream")
756-
.Build()
757-
.Settings(TDqStageSettings().BuildNode(ctx, join.Pos()))
758-
.Done();
748+
if (rightCn) {
749+
if (buildCollectStage) {
750+
auto collectRightStage = Build<TDqStage>(ctx, join.Pos())
751+
.Inputs()
752+
.Add(rightCn.Cast())
753+
.Build()
754+
.Program()
755+
.Args({"stream"})
756+
.Body("stream")
757+
.Build()
758+
.Settings(TDqStageSettings().BuildNode(ctx, join.Pos()))
759+
.Done();
759760

760-
rightBroadcast = Build<TDqCnBroadcast>(ctx, join.Pos())
761-
.Output()
762-
.Stage(collectRightStage)
763-
.Index().Build("0")
764-
.Build()
765-
.Done();
761+
rightBroadcast = Build<TDqCnBroadcast>(ctx, join.Pos())
762+
.Output()
763+
.Stage(collectRightStage)
764+
.Index().Build("0")
765+
.Build()
766+
.Done();
767+
} else {
768+
rightBroadcast = Build<TDqCnBroadcast>(ctx, join.Pos())
769+
.Output()
770+
.Stage(rightCn.Cast().Output().Stage())
771+
.Index(rightCn.Cast().Output().Index())
772+
.Build()
773+
.Done();
774+
}
766775
} else {
767776
YQL_CLOG(TRACE, CoreDq) << "-- DqBuildPhyJoin: right input is DqPure expr";
768777

0 commit comments

Comments
 (0)