Skip to content

Commit 83c2452

Browse files
authored
Do not invalidate request with INTERNAL_ERROR in case of cancelation tx on the follower using depended reads (#8761)
The data query on the table with READ_REPLICA_SETTINGS set and STALE_RO one shard tx in case of depended reads (for example joint table with itself) can perform via legacy propose transaction routine on the follower. It can cause YQL_ENSURE if the query was cancelled. Actually no need to send TEvCancelTransactionProposal event on the folower datashard, we can just skip such shards. Changelog category Bugfix
1 parent cea9d83 commit 83c2452

File tree

2 files changed

+102
-12
lines changed

2 files changed

+102
-12
lines changed

ydb/core/kqp/executer_actor/kqp_data_executer.cpp

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -616,10 +616,11 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
616616
state.State = TShardState::EState::Finished;
617617

618618
YQL_ENSURE(state.DatashardState.Defined());
619-
YQL_ENSURE(!state.DatashardState->Follower);
620-
621-
Send(MakePipePerNodeCacheID(/* allowFollowers */ false), new TEvPipeCache::TEvForward(
622-
new TEvDataShard::TEvCancelTransactionProposal(TxId), shardId, /* subscribe */ false));
619+
//nothing to cancel on follower
620+
if (!state.DatashardState->Follower) {
621+
Send(MakePipePerNodeCacheID(/* allowFollowers */ false), new TEvPipeCache::TEvForward(
622+
new TEvDataShard::TEvCancelTransactionProposal(TxId), shardId, /* subscribe */ false));
623+
}
623624
}
624625
}
625626
}

ydb/core/kqp/ut/query/kqp_limits_ut.cpp

Lines changed: 97 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -761,8 +761,16 @@ Y_UNIT_TEST_SUITE(KqpLimits) {
761761
WaitForZeroSessions(counters);
762762
}
763763

764-
Y_UNIT_TEST(CancelAfterRoTx) {
765-
TKikimrRunner kikimr;
764+
void DoCancelAfterRo(bool follower, bool streamLookup, bool dependedRead) {
765+
NKikimrConfig::TAppConfig appConfig;
766+
appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(streamLookup);
767+
768+
auto setting = NKikimrKqp::TKqpSetting();
769+
auto serverSettings = TKikimrSettings()
770+
.SetAppConfig(appConfig)
771+
.SetKqpSettings({setting});
772+
773+
TKikimrRunner kikimr(serverSettings);
766774
NKqp::TKqpCounters counters(kikimr.GetTestServer().GetRuntime()->GetAppData().Counters);
767775

768776
{
@@ -772,13 +780,73 @@ Y_UNIT_TEST_SUITE(KqpLimits) {
772780
int maxTimeoutMs = 500;
773781
bool wasCanceled = false;
774782

775-
for (int i = 1; i <= maxTimeoutMs; i++) {
776-
auto result = session.ExecuteDataQuery(R"(
783+
if (follower) {
784+
AssertSuccessResult(session.ExecuteSchemeQuery(R"(
785+
--!syntax_v1
786+
CREATE TABLE `/Root/OneShardWithFolower` (
787+
Key Uint64,
788+
Text String,
789+
Data Int32,
790+
PRIMARY KEY (Key)
791+
)
792+
WITH (
793+
READ_REPLICAS_SETTINGS = "ANY_AZ:1"
794+
);
795+
)").GetValueSync());
796+
797+
AssertSuccessResult(session.ExecuteDataQuery(R"(
798+
--!syntax_v1
799+
REPLACE INTO `/Root/OneShardWithFolower` (Key, Text, Data) VALUES
800+
(101u, "Value1", 1),
801+
(201u, "Value1", 2),
802+
(301u, "Value1", 3),
803+
(401u, "Value1", 1),
804+
(501u, "Value1", 2),
805+
(601u, "Value1", 3),
806+
(701u, "Value1", 1),
807+
(801u, "Value1", 2),
808+
(102u, "Value2", 3),
809+
(202u, "Value2", 1),
810+
(302u, "Value2", 2),
811+
(402u, "Value2", 3),
812+
(502u, "Value2", 1),
813+
(602u, "Value2", 2),
814+
(702u, "Value2", 3),
815+
(802u, "Value2", 1),
816+
(103u, "Value3", 2),
817+
(203u, "Value3", 3),
818+
(303u, "Value3", 1),
819+
(403u, "Value3", 2),
820+
(503u, "Value3", 3),
821+
(603u, "Value3", 1),
822+
(703u, "Value3", 2),
823+
(803u, "Value3", 3);
824+
)", TTxControl::BeginTx().CommitTx()).GetValueSync());
825+
}
826+
827+
const TString q = follower ?
828+
(dependedRead ?
829+
TString(R"(
830+
DECLARE $id AS Uint64;
831+
--JOIN with same table to make depended read
832+
SELECT t1.Data as Data, t1.Key as Key, t1.Text as Text FROM `/Root/OneShardWithFolower` as t1
833+
INNER JOIN OneShardWithFolower as t2 ON t1.Key = t2.Key WHERE t1.Text = "Value1" ORDER BY t1.Key;
834+
)"):
835+
TString(R"(
836+
DECLARE $id AS Uint64;
837+
SELECT * FROM `/Root/OneShardWithFolower` WHERE Text = "Value1" ORDER BY Key
838+
)")
839+
):
840+
TString(R"(
777841
DECLARE $id AS Uint64;
778-
SELECT * FROM `/Root/EightShard` WHERE Text = "Value1" ORDER BY Key;
779-
)",
780-
TTxControl::BeginTx(
781-
TTxSettings::SerializableRW()).CommitTx(),
842+
SELECT * FROM `/Root/EightShard` WHERE Text = "Value1" ORDER BY Key
843+
)");
844+
845+
const auto txCtrl = follower ? TTxControl::BeginTx(TTxSettings::StaleRO()).CommitTx() :
846+
TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx();
847+
848+
for (int i = 1; i <= maxTimeoutMs; i++) {
849+
auto result = session.ExecuteDataQuery(q, txCtrl,
782850
TExecDataQuerySettings().CancelAfter(TDuration::MilliSeconds(i))
783851
).GetValueSync();
784852

@@ -803,6 +871,27 @@ Y_UNIT_TEST_SUITE(KqpLimits) {
803871
WaitForZeroSessions(counters);
804872
}
805873

874+
Y_UNIT_TEST(CancelAfterRoTx) {
875+
// false, false has no sense since we use TEvRead to read without followers
876+
DoCancelAfterRo(false, true, false);
877+
}
878+
879+
Y_UNIT_TEST(CancelAfterRoTxWithFollowerLegacy) {
880+
DoCancelAfterRo(true, false, false);
881+
}
882+
883+
Y_UNIT_TEST(CancelAfterRoTxWithFollowerLegacyDependedRead) {
884+
DoCancelAfterRo(true, false, true);
885+
}
886+
887+
Y_UNIT_TEST(CancelAfterRoTxWithFollowerStreamLookup) {
888+
DoCancelAfterRo(true, true, false);
889+
}
890+
891+
Y_UNIT_TEST(CancelAfterRoTxWithFollowerStreamLookupDepededRead) {
892+
DoCancelAfterRo(true, true, true);
893+
}
894+
806895
Y_UNIT_TEST(QueryExecTimeout) {
807896
NKikimrConfig::TAppConfig appConfig;
808897
appConfig.MutableTableServiceConfig()->MutableResourceManager()->SetMkqlLightProgramMemoryLimit(10'000'000'000);

0 commit comments

Comments
 (0)