Skip to content

allow to broadcast join inputs from multiple tasks #14347

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Feb 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ydb/core/kqp/compile_service/kqp_compile_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -651,6 +651,7 @@ void ApplyServiceConfig(TKikimrConfiguration& kqpConfig, const TTableServiceConf
kqpConfig.EnableConstantFolding = serviceConfig.GetEnableConstantFolding();
kqpConfig.SetDefaultEnabledSpillingNodes(serviceConfig.GetEnableSpillingNodes());
kqpConfig.EnableSnapshotIsolationRW = serviceConfig.GetEnableSnapshotIsolationRW();
kqpConfig.AllowMultiBroadcasts = serviceConfig.GetAllowMultiBroadcasts();

if (const auto limit = serviceConfig.GetResourceManager().GetMkqlHeavyProgramMemoryLimit()) {
kqpConfig._KqpYqlCombinerMemoryLimit = std::max(1_GB, limit - (limit >> 2U));
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/kqp/compile_service/kqp_compile_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
void HandleConfig(NConsole::TEvConsole::TEvConfigNotificationRequest::TPtr& ev) {
auto &event = ev->Get()->Record;

bool allowMultiBroadcasts = TableServiceConfig.GetAllowMultiBroadcasts();
bool enableKqpDataQueryStreamLookup = TableServiceConfig.GetEnableKqpDataQueryStreamLookup();
bool enableKqpDataQueryStreamIdxLookupJoin = TableServiceConfig.GetEnableKqpDataQueryStreamIdxLookupJoin();
bool enableKqpScanQueryStreamIdxLookupJoin = TableServiceConfig.GetEnableKqpScanQueryStreamIdxLookupJoin();
Expand Down Expand Up @@ -343,7 +344,8 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
TableServiceConfig.GetEnableImplicitQueryParameterTypes() != enableImplicitQueryParameterTypes ||
TableServiceConfig.GetEnablePgConstsToParams() != enablePgConstsToParams ||
TableServiceConfig.GetEnablePerStatementQueryExecution() != enablePerStatementQueryExecution ||
TableServiceConfig.GetEnableSnapshotIsolationRW() != enableSnapshotIsolationRW) {
TableServiceConfig.GetEnableSnapshotIsolationRW() != enableSnapshotIsolationRW ||
TableServiceConfig.GetAllowMultiBroadcasts() != allowMultiBroadcasts) {

QueryCache->Clear();

Expand Down
5 changes: 3 additions & 2 deletions ydb/core/kqp/executer_actor/kqp_data_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2769,13 +2769,14 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da

// error
LOG_T("Updating channels after the creation of compute actors");
Y_ENSURE(Planner);
THashMap<TActorId, THashSet<ui64>> updates;
for (ui64 taskId : ComputeTasks) {
auto& task = TasksGraph.GetTask(taskId);
if (task.ComputeActorId)
CollectTaskChannelsUpdates(task, updates);
Planner->CollectTaskChannelsUpdates(task, updates);
}
PropagateChannelsUpdates(updates);
Planner->PropagateChannelsUpdates(updates);
}

void ExecuteTopicTabletTransactions(TTopicTabletTxs& topicTxs) {
Expand Down
74 changes: 6 additions & 68 deletions ydb/core/kqp/executer_actor/kqp_executer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -512,8 +512,8 @@ class TKqpExecuterBase : public TActor<TDerived> {
if (populateChannels) {
auto& task = TasksGraph.GetTask(taskId);
THashMap<TActorId, THashSet<ui64>> updates;
CollectTaskChannelsUpdates(task, updates);
PropagateChannelsUpdates(updates);
Planner->CollectTaskChannelsUpdates(task, updates);
Planner->PropagateChannelsUpdates(updates);
}
break;
}
Expand Down Expand Up @@ -751,14 +751,14 @@ class TKqpExecuterBase : public TActor<TDerived> {
TActorId computeActorId = ActorIdFromProto(startedTask.GetActorId());
LOG_D("Executing task: " << taskId << " on compute actor: " << computeActorId);
YQL_ENSURE(Planner);
bool channelUpdates = Planner->AcknowledgeCA(taskId, computeActorId, nullptr);
if (channelUpdates) {
CollectTaskChannelsUpdates(task, channelsUpdates);
bool ack = Planner->AcknowledgeCA(taskId, computeActorId, nullptr);
if (ack) {
Planner->CollectTaskChannelsUpdates(task, channelsUpdates);
}

}

PropagateChannelsUpdates(channelsUpdates);
Planner->PropagateChannelsUpdates(channelsUpdates);
}

void HandleAbortExecution(TEvKqp::TEvAbortExecution::TPtr& ev) {
Expand All @@ -777,68 +777,6 @@ class TKqpExecuterBase : public TActor<TDerived> {
}

protected:
void CollectTaskChannelsUpdates(const TKqpTasksGraph::TTaskType& task, THashMap<TActorId, THashSet<ui64>>& updates) {
YQL_ENSURE(task.ComputeActorId);

LOG_T("Collect channels updates for task: " << task.Id << " at actor " << task.ComputeActorId);

auto& selfUpdates = updates[task.ComputeActorId];

for (auto& input : task.Inputs) {
for (auto channelId : input.Channels) {
auto& channel = TasksGraph.GetChannel(channelId);
YQL_ENSURE(channel.DstTask == task.Id);
YQL_ENSURE(channel.SrcTask);

auto& srcTask = TasksGraph.GetTask(channel.SrcTask);
if (srcTask.ComputeActorId) {
updates[srcTask.ComputeActorId].emplace(channelId);
selfUpdates.emplace(channelId);
}

LOG_T("Task: " << task.Id << ", input channelId: " << channelId << ", src task: " << channel.SrcTask
<< ", at actor " << srcTask.ComputeActorId);
}
}

for (auto& output : task.Outputs) {
for (auto channelId : output.Channels) {
selfUpdates.emplace(channelId);

auto& channel = TasksGraph.GetChannel(channelId);
YQL_ENSURE(channel.SrcTask == task.Id);

if (channel.DstTask) {
auto& dstTask = TasksGraph.GetTask(channel.DstTask);
if (dstTask.ComputeActorId) {
// not a optimal solution
updates[dstTask.ComputeActorId].emplace(channelId);
}

LOG_T("Task: " << task.Id << ", output channelId: " << channelId << ", dst task: " << channel.DstTask
<< ", at actor " << dstTask.ComputeActorId);
}
}
}
}

void PropagateChannelsUpdates(const THashMap<TActorId, THashSet<ui64>>& updates) {
for (auto& pair : updates) {
auto computeActorId = pair.first;
auto& channelIds = pair.second;

auto channelsInfoEv = MakeHolder<NYql::NDq::TEvDqCompute::TEvChannelsInfo>();
auto& record = channelsInfoEv->Record;

for (auto& channelId : channelIds) {
FillChannelDesc(TasksGraph, *record.AddUpdate(), TasksGraph.GetChannel(channelId), TasksGraph.GetMeta().ChannelTransportVersion, false);
}

LOG_T("Sending channels info to compute actor: " << computeActorId << ", channels: " << channelIds.size());
this->Send(computeActorId, channelsInfoEv.Release());
}
}

void UpdateResourcesUsage(bool force) {
TInstant now = TActivationContext::Now();
if ((now - LastResourceUsageUpdate < ResourceUsageUpdateInterval) && !force)
Expand Down
69 changes: 68 additions & 1 deletion ydb/core/kqp/executer_actor/kqp_planner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ using namespace NActors;

namespace NKikimr::NKqp {

#define LOG_T(stream) LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "TxId: " << TxId << ". " << "Ctx: " << *UserRequestContext << ". " << stream)
#define LOG_D(stream) LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "TxId: " << TxId << ". " << "Ctx: " << *UserRequestContext << ". " << stream)
#define LOG_I(stream) LOG_INFO_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "TxId: " << TxId << ". " << "Ctx: " << *UserRequestContext << ". " << stream)
#define LOG_C(stream) LOG_CRIT_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "TxId: " << TxId << ". " << "Ctx: " << *UserRequestContext << ". " << stream)
Expand Down Expand Up @@ -511,7 +512,11 @@ TString TKqpPlanner::ExecuteDataComputeTask(ui64 taskId, ui32 computeTasksSize)

TActorId* actorId = std::get_if<TActorId>(&startResult);
Y_ABORT_UNLESS(actorId);
AcknowledgeCA(taskId, *actorId, nullptr);
Y_ABORT_UNLESS(AcknowledgeCA(taskId, *actorId, nullptr));

THashMap<TActorId, THashSet<ui64>> updates;
CollectTaskChannelsUpdates(task, updates);
PropagateChannelsUpdates(updates);
return TString();
}

Expand Down Expand Up @@ -761,6 +766,68 @@ ui32 TKqpPlanner::CalcSendMessageFlagsForNode(ui32 nodeId) {
return flags;
}

void TKqpPlanner::PropagateChannelsUpdates(const THashMap<TActorId, THashSet<ui64>>& updates) {
for (auto& pair : updates) {
auto computeActorId = pair.first;
auto& channelIds = pair.second;

auto channelsInfoEv = MakeHolder<NYql::NDq::TEvDqCompute::TEvChannelsInfo>();
auto& record = channelsInfoEv->Record;

for (auto& channelId : channelIds) {
FillChannelDesc(TasksGraph, *record.AddUpdate(), TasksGraph.GetChannel(channelId), TasksGraph.GetMeta().ChannelTransportVersion, false);
}

LOG_T("Sending channels info to compute actor: " << computeActorId << ", channels: " << channelIds.size());
TlsActivationContext->Send(std::make_unique<NActors::IEventHandle>(computeActorId, ExecuterId, channelsInfoEv.Release()));
}
}

void TKqpPlanner::CollectTaskChannelsUpdates(const TKqpTasksGraph::TTaskType& task, THashMap<TActorId, THashSet<ui64>>& updates) {
YQL_ENSURE(task.ComputeActorId);

LOG_T("Collect channels updates for task: " << task.Id << " at actor " << task.ComputeActorId);

auto& selfUpdates = updates[task.ComputeActorId];

for (auto& input : task.Inputs) {
for (auto channelId : input.Channels) {
auto& channel = TasksGraph.GetChannel(channelId);
YQL_ENSURE(channel.DstTask == task.Id);
YQL_ENSURE(channel.SrcTask);

auto& srcTask = TasksGraph.GetTask(channel.SrcTask);
if (srcTask.ComputeActorId) {
updates[srcTask.ComputeActorId].emplace(channelId);
selfUpdates.emplace(channelId);
}

LOG_T("Task: " << task.Id << ", input channelId: " << channelId << ", src task: " << channel.SrcTask
<< ", at actor " << srcTask.ComputeActorId);
}
}

for (auto& output : task.Outputs) {
for (auto channelId : output.Channels) {
selfUpdates.emplace(channelId);

auto& channel = TasksGraph.GetChannel(channelId);
YQL_ENSURE(channel.SrcTask == task.Id);

if (channel.DstTask) {
auto& dstTask = TasksGraph.GetTask(channel.DstTask);
if (dstTask.ComputeActorId) {
// not a optimal solution
updates[dstTask.ComputeActorId].emplace(channelId);
}

LOG_T("Task: " << task.Id << ", output channelId: " << channelId << ", dst task: " << channel.DstTask
<< ", at actor " << dstTask.ComputeActorId);
}
}
}
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
std::unique_ptr<TKqpPlanner> CreateKqpPlanner(TKqpPlanner::TArgs args) {
return std::make_unique<TKqpPlanner>(std::move(args));
Expand Down
3 changes: 3 additions & 0 deletions ydb/core/kqp/executer_actor/kqp_planner.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ class TKqpPlanner {
ui32 GetnScanTasks();
ui32 GetnComputeTasks();

void PropagateChannelsUpdates(const THashMap<TActorId, THashSet<ui64>>& updates);
void CollectTaskChannelsUpdates(const TKqpTasksGraph::TTaskType& task, THashMap<TActorId, THashSet<ui64>>& updates);

private:

const IKqpGateway::TKqpSnapshot& GetSnapshot() const;
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/kqp/opt/physical/kqp_opt_phy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -450,8 +450,10 @@ class TKqpPhysicalOptTransformer : public TOptimizeTransformerBase {
// It is now possible as we don't use datashard transactions for reads in data queries.
bool pushLeftStage = (KqpCtx.IsScanQuery() || KqpCtx.Config->EnableKqpDataQueryStreamLookup) && AllowFuseJoinInputs(node);
bool shuffleEliminationWithMap = KqpCtx.Config->OptShuffleEliminationWithMap.Get().GetOrElse(false);
bool rightCollectStage = !KqpCtx.Config->AllowMultiBroadcasts;
TExprBase output = DqBuildJoin(node, ctx, optCtx, *getParents(), IsGlobal,
pushLeftStage, KqpCtx.Config->GetHashJoinMode(), false, KqpCtx.Config->UseGraceJoinCoreForMap.Get().GetOrElse(false), KqpCtx.Config->OptShuffleElimination.Get().GetOrElse(false), shuffleEliminationWithMap
pushLeftStage, KqpCtx.Config->GetHashJoinMode(), false, KqpCtx.Config->UseGraceJoinCoreForMap.Get().GetOrElse(false), KqpCtx.Config->OptShuffleElimination.Get().GetOrElse(false), shuffleEliminationWithMap,
rightCollectStage
);
DumpAppliedRule("BuildJoin", node.Ptr(), output.Ptr(), ctx);
return output;
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/provider/yql_kikimr_settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ struct TKikimrConfiguration : public TKikimrSettings, public NCommon::TSettingDi
ui64 DefaultEnableSpillingNodes = 0;
bool EnableAntlr4Parser = false;
bool EnableSnapshotIsolationRW = false;
bool AllowMultiBroadcasts = false;

void SetDefaultEnabledSpillingNodes(const TString& node);
ui64 GetEnabledSpillingNodes() const;
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/protos/table_service_config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -362,4 +362,6 @@ message TTableServiceConfig {
optional bool EnableSnapshotIsolationRW = 76 [default = false];
optional bool EnableStreamWrite = 77 [default = false];
optional bool EnableBatchUpdates = 78 [default = false];

optional bool AllowMultiBroadcasts = 79 [default = false];
};
45 changes: 27 additions & 18 deletions ydb/library/yql/dq/opt/dq_opt_join.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -708,7 +708,7 @@ TExprBase DqRewriteLeftPureJoin(const TExprBase node, TExprContext& ctx, const T
.Done();
}

TExprBase DqBuildPhyJoin(const TDqJoin& join, bool pushLeftStage, TExprContext& ctx, IOptimizationContext& optCtx, bool useGraceCoreForMap) {
TExprBase DqBuildPhyJoin(const TDqJoin& join, bool pushLeftStage, TExprContext& ctx, IOptimizationContext& optCtx, bool useGraceCoreForMap, bool buildCollectStage) {
static const std::set<std::string_view> supportedTypes = {
"Inner"sv,
"Left"sv,
Expand Down Expand Up @@ -745,24 +745,33 @@ TExprBase DqBuildPhyJoin(const TDqJoin& join, bool pushLeftStage, TExprContext&
TMaybeNode<TDqCnBroadcast> rightBroadcast;
TNodeOnNodeOwnedMap rightPrecomputes;

if (rightCn) {
auto collectRightStage = Build<TDqStage>(ctx, join.Pos())
.Inputs()
.Add(rightCn.Cast())
.Build()
.Program()
.Args({"stream"})
.Body("stream")
.Build()
.Settings(TDqStageSettings().BuildNode(ctx, join.Pos()))
.Done();
if (rightCn) {
if (buildCollectStage) {
auto collectRightStage = Build<TDqStage>(ctx, join.Pos())
.Inputs()
.Add(rightCn.Cast())
.Build()
.Program()
.Args({"stream"})
.Body("stream")
.Build()
.Settings(TDqStageSettings().BuildNode(ctx, join.Pos()))
.Done();

rightBroadcast = Build<TDqCnBroadcast>(ctx, join.Pos())
.Output()
.Stage(collectRightStage)
.Index().Build("0")
.Build()
.Done();
rightBroadcast = Build<TDqCnBroadcast>(ctx, join.Pos())
.Output()
.Stage(collectRightStage)
.Index().Build("0")
.Build()
.Done();
} else {
rightBroadcast = Build<TDqCnBroadcast>(ctx, join.Pos())
.Output()
.Stage(rightCn.Cast().Output().Stage())
.Index(rightCn.Cast().Output().Index())
.Build()
.Done();
}
} else {
YQL_CLOG(TRACE, CoreDq) << "-- DqBuildPhyJoin: right input is DqPure expr";

Expand Down
5 changes: 3 additions & 2 deletions ydb/library/yql/dq/opt/dq_opt_join.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ NNodes::TExprBase DqRewriteEquiJoin(const NNodes::TExprBase& node, EHashJoinMode

NNodes::TExprBase DqRewriteEquiJoin(const NNodes::TExprBase& node, EHashJoinMode mode, bool useCBO, TExprContext& ctx, const TTypeAnnotationContext& typeCtx, int& joinCounter, const TOptimizerHints& hints = {});

NNodes::TExprBase DqBuildPhyJoin(const NNodes::TDqJoin& join, bool pushLeftStage, TExprContext& ctx, IOptimizationContext& optCtx, bool useGraceCoreForMap);
NNodes::TExprBase DqBuildPhyJoin(const NNodes::TDqJoin& join, bool pushLeftStage, TExprContext& ctx, IOptimizationContext& optCtx, bool useGraceCoreForMap, bool buildCollectStage=true);

NNodes::TExprBase DqBuildJoin(
const NNodes::TExprBase& node,
Expand All @@ -30,7 +30,8 @@ NNodes::TExprBase DqBuildJoin(
bool shuffleMapJoin = true,
bool useGraceCoreForMap = false,
bool shuffleElimination = false,
bool shuffleEliminationWithMap = false
bool shuffleEliminationWithMap = false,
bool buildCollectStage=true
);

NNodes::TExprBase DqBuildHashJoin(const NNodes::TDqJoin& join, EHashJoinMode mode, TExprContext& ctx, IOptimizationContext& optCtx, bool shuffleElimination, bool shuffleEliminationWithMap);
Expand Down
8 changes: 4 additions & 4 deletions ydb/library/yql/dq/opt/dq_opt_phy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2726,9 +2726,9 @@ TExprBase DqBuildJoin(
bool shuffleMapJoin,
bool useGraceCoreForMap,
bool shuffleElimination,
bool shuffleEliminationWithMap
)
{
bool shuffleEliminationWithMap,
bool buildCollectStage
) {
if (!node.Maybe<TDqJoin>()) {
return node;
}
Expand Down Expand Up @@ -2777,7 +2777,7 @@ TExprBase DqBuildJoin(
// separate stage to receive data from both sides of join.
// TODO: We can push MapJoin to existing stage for data query, if it doesn't have table reads. This
// requires some additional knowledge, probably with use of constraints.
return DqBuildPhyJoin(join, pushLeftStage, ctx, optCtx, useGraceCoreForMap);
return DqBuildPhyJoin(join, pushLeftStage, ctx, optCtx, useGraceCoreForMap, buildCollectStage);
}

TExprBase DqPrecomputeToInput(const TExprBase& node, TExprContext& ctx) {
Expand Down
Loading
Loading