@@ -560,16 +560,20 @@ class TKqpBuildTxsTransformer : public TSyncTransformerBase {
560560 }
561561
562562 if (!query.Effects ().Empty ()) {
563- auto tx = BuildTx (query.Effects ().Ptr (), ctx, /* isPrecompute */ false );
564- if (!tx) {
565- return TStatus::Error;
566- }
563+ auto collectedEffects = CollectEffects (query.Effects (), ctx);
567564
568- if (!CheckEffectsTx (tx.Cast (), query, ctx)) {
569- return TStatus::Error;
570- }
565+ for (auto & effects : collectedEffects) {
566+ auto tx = BuildTx (effects.Ptr (), ctx, /* isPrecompute */ false );
567+ if (!tx) {
568+ return TStatus::Error;
569+ }
571570
572- BuildCtx->PhysicalTxs .emplace_back (tx.Cast ());
571+ if (!CheckEffectsTx (tx.Cast (), effects, ctx)) {
572+ return TStatus::Error;
573+ }
574+
575+ BuildCtx->PhysicalTxs .emplace_back (tx.Cast ());
576+ }
573577 }
574578
575579 return TStatus::Ok;
@@ -581,8 +585,86 @@ class TKqpBuildTxsTransformer : public TSyncTransformerBase {
581585 }
582586
583587private:
584- bool HasTableEffects (const TKqlQuery& query) const {
585- for (const TExprBase& effect : query.Effects ()) {
588+ TVector<TExprList> CollectEffects (const TExprList& list, TExprContext& ctx) {
589+ struct TEffectsInfo {
590+ enum class EType {
591+ KQP_EFFECT,
592+ KQP_SINK,
593+ EXTERNAL_SINK,
594+ };
595+
596+ EType Type;
597+ THashSet<TStringBuf> TablesPathIds;
598+ TVector<TExprNode::TPtr> Exprs;
599+ };
600+ TVector<TEffectsInfo> effectsInfos;
601+
602+ for (const auto & expr : list) {
603+ if (auto sinkEffect = expr.Maybe <TKqpSinkEffect>()) {
604+ const size_t sinkIndex = FromString (TStringBuf (sinkEffect.Cast ().SinkIndex ()));
605+ const auto stage = sinkEffect.Cast ().Stage ().Maybe <TDqStageBase>();
606+ YQL_ENSURE (stage);
607+ YQL_ENSURE (stage.Cast ().Outputs ());
608+ const auto outputs = stage.Cast ().Outputs ().Cast ();
609+ YQL_ENSURE (sinkIndex < outputs.Size ());
610+ const auto sink = outputs.Item (sinkIndex).Maybe <TDqSink>();
611+ YQL_ENSURE (sink);
612+
613+ const auto sinkSettings = sink.Cast ().Settings ().Maybe <TKqpTableSinkSettings>();
614+ if (!sinkSettings) {
615+ // External writes always use their own physical transaction.
616+ effectsInfos.emplace_back ();
617+ effectsInfos.back ().Type = TEffectsInfo::EType::EXTERNAL_SINK;
618+ effectsInfos.back ().Exprs .push_back (expr.Ptr ());
619+ } else {
620+ // Two table sinks can't be executed in one physical transaction if they write into one table.
621+ const TStringBuf tablePathId = sinkSettings.Cast ().Table ().PathId ().Value ();
622+
623+ auto it = std::find_if (
624+ std::begin (effectsInfos),
625+ std::end (effectsInfos),
626+ [&tablePathId](const auto & effectsInfo) {
627+ return effectsInfo.Type == TEffectsInfo::EType::KQP_SINK
628+ && !effectsInfo.TablesPathIds .contains (tablePathId);
629+ });
630+ if (it == std::end (effectsInfos)) {
631+ effectsInfos.emplace_back ();
632+ it = std::prev (std::end (effectsInfos));
633+ it->Type = TEffectsInfo::EType::KQP_SINK;
634+ }
635+ it->TablesPathIds .insert (tablePathId);
636+ it->Exprs .push_back (expr.Ptr ());
637+ }
638+ } else {
639+ // Table effects are executed all in one physical transaction.
640+ auto it = std::find_if (
641+ std::begin (effectsInfos),
642+ std::end (effectsInfos),
643+ [](const auto & effectsInfo) { return effectsInfo.Type == TEffectsInfo::EType::KQP_EFFECT; });
644+ if (it == std::end (effectsInfos)) {
645+ effectsInfos.emplace_back ();
646+ it = std::prev (std::end (effectsInfos));
647+ it->Type = TEffectsInfo::EType::KQP_EFFECT;
648+ }
649+ it->Exprs .push_back (expr.Ptr ());
650+ }
651+ }
652+
653+ TVector<TExprList> results;
654+
655+ for (const auto & effects : effectsInfos) {
656+ auto builder = Build<TExprList>(ctx, list.Pos ());
657+ for (const auto & expr : effects.Exprs ) {
658+ builder.Add (expr);
659+ }
660+ results.push_back (builder.Done ());
661+ }
662+
663+ return results;
664+ }
665+
666+ bool HasTableEffects (const TExprList& effectsList) const {
667+ for (const TExprBase& effect : effectsList) {
586668 if (auto maybeSinkEffect = effect.Maybe <TKqpSinkEffect>()) {
587669 // (KqpSinkEffect (DqStage (... ((DqSink '0 (DataSink '"kikimr") ...)))) '0)
588670 auto sinkEffect = maybeSinkEffect.Cast ();
@@ -608,7 +690,7 @@ class TKqpBuildTxsTransformer : public TSyncTransformerBase {
608690 return false ;
609691 }
610692
611- bool CheckEffectsTx (TKqpPhysicalTx tx, const TKqlQuery& query , TExprContext& ctx) const {
693+ bool CheckEffectsTx (TKqpPhysicalTx tx, const TExprList& effectsList , TExprContext& ctx) const {
612694 TMaybeNode<TExprBase> blackistedNode;
613695 VisitExpr (tx.Ptr (), [&blackistedNode](const TExprNode::TPtr& exprNode) {
614696 if (blackistedNode) {
@@ -630,7 +712,7 @@ class TKqpBuildTxsTransformer : public TSyncTransformerBase {
630712 return true ;
631713 });
632714
633- if (blackistedNode && HasTableEffects (query )) {
715+ if (blackistedNode && HasTableEffects (effectsList )) {
634716 ctx.AddError (TIssue (ctx.GetPosition (blackistedNode.Cast ().Pos ()), TStringBuilder ()
635717 << " Callable not expected in effects tx: " << blackistedNode.Cast <TCallable>().CallableName ()));
636718 return false ;
0 commit comments