Skip to content

Commit 6e3fc43

Browse files
authored
Fix sinks order (#9345)
1 parent 2a0bfb0 commit 6e3fc43

File tree

4 files changed

+217
-21
lines changed

4 files changed

+217
-21
lines changed

.github/config/muted_ya.txt

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,7 @@ ydb/core/kqp/ut/service [*/*]*
2424
ydb/core/kqp/ut/service KqpQueryService.ExecuteQueryPgTableSelect
2525
ydb/core/kqp/ut/service KqpQueryService.QueryOnClosedSession
2626
ydb/core/kqp/ut/service KqpService.CloseSessionsWithLoad
27-
ydb/core/kqp/ut/service KqpQueryService.TableSink_OlapRWQueries
28-
ydb/core/kqp/ut/service KqpQueryService.TableSink_Htap
29-
ydb/core/kqp/ut/tx KqpSnapshotRead.ReadOnlyTxWithIndexCommitsOnConcurrentWrite+withSink
30-
ydb/core/kqp/ut/tx KqpSinkMvcc.OltpNamedStatement
3127
ydb/core/kqp/ut/tx KqpSinkMvcc.OlapNamedStatement
32-
ydb/core/kqp/ut/tx KqpSinkMvcc.OltpMultiSinks
3328
ydb/core/kqp/ut/tx KqpSinkMvcc.OlapMultiSinks
3429
ydb/core/persqueue/ut [*/*]*
3530
ydb/core/persqueue/ut TPQTest.*DirectRead*

ydb/core/kqp/opt/kqp_opt_build_txs.cpp

Lines changed: 94 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -560,16 +560,20 @@ class TKqpBuildTxsTransformer : public TSyncTransformerBase {
560560
}
561561

562562
if (!query.Effects().Empty()) {
563-
auto tx = BuildTx(query.Effects().Ptr(), ctx, /* isPrecompute */ false);
564-
if (!tx) {
565-
return TStatus::Error;
566-
}
563+
auto collectedEffects = CollectEffects(query.Effects(), ctx);
567564

568-
if (!CheckEffectsTx(tx.Cast(), query, ctx)) {
569-
return TStatus::Error;
570-
}
565+
for (auto& effects : collectedEffects) {
566+
auto tx = BuildTx(effects.Ptr(), ctx, /* isPrecompute */ false);
567+
if (!tx) {
568+
return TStatus::Error;
569+
}
571570

572-
BuildCtx->PhysicalTxs.emplace_back(tx.Cast());
571+
if (!CheckEffectsTx(tx.Cast(), effects, ctx)) {
572+
return TStatus::Error;
573+
}
574+
575+
BuildCtx->PhysicalTxs.emplace_back(tx.Cast());
576+
}
573577
}
574578

575579
return TStatus::Ok;
@@ -581,8 +585,86 @@ class TKqpBuildTxsTransformer : public TSyncTransformerBase {
581585
}
582586

583587
private:
584-
bool HasTableEffects(const TKqlQuery& query) const {
585-
for (const TExprBase& effect : query.Effects()) {
588+
TVector<TExprList> CollectEffects(const TExprList& list, TExprContext& ctx) {
589+
struct TEffectsInfo {
590+
enum class EType {
591+
KQP_EFFECT,
592+
KQP_SINK,
593+
EXTERNAL_SINK,
594+
};
595+
596+
EType Type;
597+
THashSet<TStringBuf> TablesPathIds;
598+
TVector<TExprNode::TPtr> Exprs;
599+
};
600+
TVector<TEffectsInfo> effectsInfos;
601+
602+
for (const auto& expr : list) {
603+
if (auto sinkEffect = expr.Maybe<TKqpSinkEffect>()) {
604+
const size_t sinkIndex = FromString(TStringBuf(sinkEffect.Cast().SinkIndex()));
605+
const auto stage = sinkEffect.Cast().Stage().Maybe<TDqStageBase>();
606+
YQL_ENSURE(stage);
607+
YQL_ENSURE(stage.Cast().Outputs());
608+
const auto outputs = stage.Cast().Outputs().Cast();
609+
YQL_ENSURE(sinkIndex < outputs.Size());
610+
const auto sink = outputs.Item(sinkIndex).Maybe<TDqSink>();
611+
YQL_ENSURE(sink);
612+
613+
const auto sinkSettings = sink.Cast().Settings().Maybe<TKqpTableSinkSettings>();
614+
if (!sinkSettings) {
615+
// External writes always use their own physical transaction.
616+
effectsInfos.emplace_back();
617+
effectsInfos.back().Type = TEffectsInfo::EType::EXTERNAL_SINK;
618+
effectsInfos.back().Exprs.push_back(expr.Ptr());
619+
} else {
620+
// Two table sinks can't be executed in one physical transaction if they write into one table.
621+
const TStringBuf tablePathId = sinkSettings.Cast().Table().PathId().Value();
622+
623+
auto it = std::find_if(
624+
std::begin(effectsInfos),
625+
std::end(effectsInfos),
626+
[&tablePathId](const auto& effectsInfo) {
627+
return effectsInfo.Type == TEffectsInfo::EType::KQP_SINK
628+
&& !effectsInfo.TablesPathIds.contains(tablePathId);
629+
});
630+
if (it == std::end(effectsInfos)) {
631+
effectsInfos.emplace_back();
632+
it = std::prev(std::end(effectsInfos));
633+
it->Type = TEffectsInfo::EType::KQP_SINK;
634+
}
635+
it->TablesPathIds.insert(tablePathId);
636+
it->Exprs.push_back(expr.Ptr());
637+
}
638+
} else {
639+
// Table effects are executed all in one physical transaction.
640+
auto it = std::find_if(
641+
std::begin(effectsInfos),
642+
std::end(effectsInfos),
643+
[](const auto& effectsInfo) { return effectsInfo.Type == TEffectsInfo::EType::KQP_EFFECT; });
644+
if (it == std::end(effectsInfos)) {
645+
effectsInfos.emplace_back();
646+
it = std::prev(std::end(effectsInfos));
647+
it->Type = TEffectsInfo::EType::KQP_EFFECT;
648+
}
649+
it->Exprs.push_back(expr.Ptr());
650+
}
651+
}
652+
653+
TVector<TExprList> results;
654+
655+
for (const auto& effects : effectsInfos) {
656+
auto builder = Build<TExprList>(ctx, list.Pos());
657+
for (const auto& expr : effects.Exprs) {
658+
builder.Add(expr);
659+
}
660+
results.push_back(builder.Done());
661+
}
662+
663+
return results;
664+
}
665+
666+
bool HasTableEffects(const TExprList& effectsList) const {
667+
for (const TExprBase& effect : effectsList) {
586668
if (auto maybeSinkEffect = effect.Maybe<TKqpSinkEffect>()) {
587669
// (KqpSinkEffect (DqStage (... ((DqSink '0 (DataSink '"kikimr") ...)))) '0)
588670
auto sinkEffect = maybeSinkEffect.Cast();
@@ -608,7 +690,7 @@ class TKqpBuildTxsTransformer : public TSyncTransformerBase {
608690
return false;
609691
}
610692

611-
bool CheckEffectsTx(TKqpPhysicalTx tx, const TKqlQuery& query, TExprContext& ctx) const {
693+
bool CheckEffectsTx(TKqpPhysicalTx tx, const TExprList& effectsList, TExprContext& ctx) const {
612694
TMaybeNode<TExprBase> blackistedNode;
613695
VisitExpr(tx.Ptr(), [&blackistedNode](const TExprNode::TPtr& exprNode) {
614696
if (blackistedNode) {
@@ -630,7 +712,7 @@ class TKqpBuildTxsTransformer : public TSyncTransformerBase {
630712
return true;
631713
});
632714

633-
if (blackistedNode && HasTableEffects(query)) {
715+
if (blackistedNode && HasTableEffects(effectsList)) {
634716
ctx.AddError(TIssue(ctx.GetPosition(blackistedNode.Cast().Pos()), TStringBuilder()
635717
<< "Callable not expected in effects tx: " << blackistedNode.Cast<TCallable>().CallableName()));
636718
return false;

ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1224,8 +1224,7 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
12241224
auto db = kikimr->GetQueryClient();
12251225
auto resultFuture = db.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx());
12261226
resultFuture.Wait();
1227-
UNIT_ASSERT_C(!resultFuture.GetValueSync().IsSuccess(), resultFuture.GetValueSync().GetIssues().ToString());
1228-
UNIT_ASSERT_STRING_CONTAINS(resultFuture.GetValueSync().GetIssues().ToString(), "Callable not expected in effects tx: Unwrap");
1227+
UNIT_ASSERT_C(resultFuture.GetValueSync().IsSuccess(), resultFuture.GetValueSync().GetIssues().ToString());
12291228
}
12301229
}
12311230

ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp

Lines changed: 122 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3174,7 +3174,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
31743174
CompareYson(R"([[4u]])", FormatResultSetYson(result.GetResultSet(1)));
31753175
}
31763176

3177-
/*{
3177+
{
31783178
auto result = client.ExecuteQuery(R"(
31793179
DELETE FROM `/Root/ColumnShard` ON SELECT * FROM `/Root/DataShard` WHERE Col1 > 9;
31803180
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
@@ -3223,7 +3223,127 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
32233223
DELETE FROM `/Root/ColumnShard` WHERE Col2 = "not found";
32243224
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
32253225
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
3226-
}*/
3226+
}
3227+
}
3228+
3229+
Y_UNIT_TEST_TWIN(TableSink_HtapComplex, withOltpSink) {
3230+
NKikimrConfig::TAppConfig appConfig;
3231+
appConfig.MutableTableServiceConfig()->SetEnableOlapSink(true);
3232+
appConfig.MutableTableServiceConfig()->SetEnableOltpSink(withOltpSink);
3233+
appConfig.MutableTableServiceConfig()->SetEnableHtapTx(true);
3234+
auto settings = TKikimrSettings()
3235+
.SetAppConfig(appConfig)
3236+
.SetWithSampleTables(false);
3237+
TKikimrRunner kikimr(settings);
3238+
Tests::NCommon::TLoggerInit(kikimr).Initialize();
3239+
3240+
auto session = kikimr.GetTableClient().CreateSession().GetValueSync().GetSession();
3241+
3242+
const TString query = R"(
3243+
CREATE TABLE `/Root/ColumnSrc` (
3244+
Col1 Uint64 NOT NULL,
3245+
Col2 String NOT NULL,
3246+
Col3 Int32 NOT NULL,
3247+
PRIMARY KEY (Col1)
3248+
)
3249+
PARTITION BY HASH(Col1)
3250+
WITH (STORE = COLUMN, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 10);
3251+
3252+
CREATE TABLE `/Root/RowSrc` (
3253+
Col1 Uint64 NOT NULL,
3254+
Col2 String NOT NULL,
3255+
Col3 Int32 NOT NULL,
3256+
PRIMARY KEY (Col1)
3257+
)
3258+
WITH (UNIFORM_PARTITIONS = 2, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 2);
3259+
3260+
CREATE TABLE `/Root/ColumnDst` (
3261+
Col1 Uint64 NOT NULL,
3262+
Col2 String,
3263+
Col3 Int32,
3264+
PRIMARY KEY (Col1)
3265+
)
3266+
PARTITION BY HASH(Col1)
3267+
WITH (STORE = COLUMN, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 10);
3268+
3269+
CREATE TABLE `/Root/RowDst` (
3270+
Col1 Uint64 NOT NULL,
3271+
Col2 String,
3272+
Col3 Int32,
3273+
PRIMARY KEY (Col1)
3274+
)
3275+
WITH (UNIFORM_PARTITIONS = 2, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 2);
3276+
)";
3277+
3278+
auto result = session.ExecuteSchemeQuery(query).GetValueSync();
3279+
UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
3280+
3281+
auto client = kikimr.GetQueryClient();
3282+
3283+
{
3284+
auto result = client.ExecuteQuery(R"(
3285+
UPSERT INTO `/Root/ColumnSrc` (Col1, Col2, Col3) VALUES
3286+
(1u, "test1", 10), (2u, "test2", 11);
3287+
REPLACE INTO `/Root/ColumnSrc` (Col1, Col2, Col3) VALUES
3288+
(3u, "test3", 12), (4u, "test", 13);
3289+
UPSERT INTO `/Root/RowSrc` (Col1, Col2, Col3) VALUES
3290+
(10u, "test1", 10), (20u, "test2", 11);
3291+
REPLACE INTO `/Root/RowSrc` (Col1, Col2, Col3) VALUES
3292+
(30u, "test3", 12), (40u, "test", 13);
3293+
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
3294+
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
3295+
}
3296+
3297+
{
3298+
auto result = client.ExecuteQuery(R"(
3299+
$data = SELECT c.Col1 as Col1, c.Col2 As Col2, r.Col3 AS Col3
3300+
FROM `/Root/ColumnSrc`as c
3301+
JOIN `/Root/RowSrc` as r
3302+
ON c.Col1 + 10 = r.Col3;
3303+
UPSERT INTO `/Root/ColumnDst` SELECT * FROM $data;
3304+
REPLACE INTO `/Root/RowDst` SELECT * FROM $data;
3305+
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
3306+
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
3307+
}
3308+
3309+
{
3310+
auto result = client.ExecuteQuery(R"(
3311+
SELECT COUNT(*) FROM `/Root/ColumnDst`;
3312+
SELECT COUNT(*) FROM `/Root/RowDst`;
3313+
DELETE FROM `/Root/ColumnDst` WHERE 1=1;
3314+
DELETE FROM `/Root/RowDst` WHERE 1=1;
3315+
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
3316+
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
3317+
CompareYson(R"([[3u]])", FormatResultSetYson(result.GetResultSet(0)));
3318+
CompareYson(R"([[3u]])", FormatResultSetYson(result.GetResultSet(1)));
3319+
}
3320+
3321+
{
3322+
auto result = client.ExecuteQuery(R"(
3323+
$prepare = SELECT *
3324+
FROM `/Root/ColumnSrc`
3325+
WHERE Col2 LIKE '%test%test%';
3326+
$data = SELECT c.Col1 as Col1, c.Col2 As Col2, r.Col3 AS Col3
3327+
FROM `/Root/RowSrc`as c
3328+
LEFT OUTER JOIN $prepare as r
3329+
ON c.Col1 + 10 = r.Col3;
3330+
UPSERT INTO `/Root/ColumnDst` SELECT * FROM $data;
3331+
REPLACE INTO `/Root/RowDst` SELECT * FROM $data;
3332+
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
3333+
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
3334+
}
3335+
3336+
{
3337+
auto result = client.ExecuteQuery(R"(
3338+
SELECT COUNT(*) FROM `/Root/ColumnDst`;
3339+
SELECT COUNT(*) FROM `/Root/RowDst`;
3340+
DELETE FROM `/Root/ColumnDst` WHERE 1=1;
3341+
DELETE FROM `/Root/RowDst` WHERE 1=1;
3342+
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
3343+
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
3344+
CompareYson(R"([[4u]])", FormatResultSetYson(result.GetResultSet(0)));
3345+
CompareYson(R"([[4u]])", FormatResultSetYson(result.GetResultSet(1)));
3346+
}
32273347
}
32283348

32293349
Y_UNIT_TEST_TWIN(TableSink_HtapInteractive, withOltpSink) {

0 commit comments

Comments
 (0)