Skip to content

Commit 3d44e54

Browse files
authored
Fix overloaded EvWrite Prepare (#27964)
1 parent 46dbad3 commit 3d44e54

File tree

2 files changed

+49
-25
lines changed

2 files changed

+49
-25
lines changed

ydb/core/kqp/runtime/kqp_write_actor.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -668,6 +668,7 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
668668
const auto metadata = ShardedWriteController->GetMessageMetadata(shardId);
669669
if (metadata && seqNo + 1 == metadata->NextOverloadSeqNo) {
670670
CA_LOG_D("Retry Overloaded ShardID=" << shardId);
671+
ResetShardRetries(shardId, metadata->Cookie);
671672
SendDataToShard(shardId);
672673
}
673674
}
@@ -1077,6 +1078,7 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
10771078

10781079
const auto metadata = ShardedWriteController->GetMessageMetadata(shardId);
10791080
YQL_ENSURE(metadata);
1081+
YQL_ENSURE(metadata->SendAttempts == 0 || InconsistentTx);
10801082
if (metadata->SendAttempts >= MessageSettings.MaxWriteAttempts) {
10811083
CA_LOG_W("ShardId=" << shardId
10821084
<< " for table '" << TablePath

ydb/core/kqp/ut/effects/kqp_overload_ut.cpp

Lines changed: 47 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#include <ydb/core/kqp/ut/common/kqp_ut_common.h>
22
#include <ydb/core/tx/data_events/events.h>
33
#include <ydb/core/tx/datashard/datashard.h>
4+
#include <ydb/core/testlib/tablet_helpers.h>
45

56
#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/proto/accessor.h>
67

@@ -15,7 +16,7 @@ using namespace NYdb::NQuery;
1516

1617
Y_UNIT_TEST_SUITE(KqpOverload) {
1718

18-
Y_UNIT_TEST(OltpOverloaded) {
19+
Y_UNIT_TEST_TWIN(OltpOverloaded, Distributed) {
1920
TKikimrSettings settings;
2021
settings.SetUseRealThreads(false);
2122
settings.AppConfig.MutableTableServiceConfig()->SetEnableOltpSink(true);
@@ -27,50 +28,67 @@ Y_UNIT_TEST_SUITE(KqpOverload) {
2728
auto& runtime = *kikimr.GetTestServer().GetRuntime();
2829
Y_UNUSED(runtime);
2930

31+
auto edgeActor = runtime.AllocateEdgeActor();
32+
33+
const auto& shards = GetTableShards(
34+
&kikimr.GetTestServer(),
35+
edgeActor,
36+
Distributed ? "/Root/TwoShard" : "/Root/KeyValue");
37+
UNIT_ASSERT(Distributed ? shards.size() == 2 : shards.size() == 1);
38+
const auto overloadedShard = shards[0];
39+
const auto overloadedShardActor = ResolveTablet(runtime, overloadedShard);
40+
3041
{
31-
const TString query(Q1_(R"(
32-
UPSERT INTO `/Root/KeyValue` (Key, Value) VALUES (1, 'value');
33-
SELECT * FROM `/Root/KeyValue`;
34-
)"));
42+
const TString query =
43+
Distributed
44+
? R"(
45+
UPSERT INTO `/Root/TwoShard` (Key, Value1) VALUES (1, 'value');
46+
SELECT * FROM `/Root/TwoShard`;
47+
)"
48+
: R"(
49+
UPSERT INTO `/Root/KeyValue` (Key, Value) VALUES (1, 'value');
50+
SELECT * FROM `/Root/KeyValue`;
51+
)";
3552

3653
std::vector<std::unique_ptr<IEventHandle>> requests;
3754
std::vector<std::unique_ptr<IEventHandle>> responses;
3855
bool blockResults = true;
3956

40-
size_t overloadSeqNo = 0;
57+
size_t overloadSeqNo = 0;
4158

4259
auto grab = [&](TAutoPtr<IEventHandle> &ev) -> auto {
4360
if (blockResults && ev->GetTypeRewrite() == NEvents::TDataEvents::TEvWriteResult::EventType) {
4461
auto* msg = ev->Get<NEvents::TDataEvents::TEvWriteResult>();
62+
if (msg->Record.GetOrigin() == overloadedShard) {
63+
auto overloadedResult = NEvents::TDataEvents::TEvWriteResult::BuildError(
64+
msg->Record.GetOrigin(),
65+
msg->Record.GetTxId(),
66+
NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED,
67+
"");
4568

46-
auto overloadedResult = NEvents::TDataEvents::TEvWriteResult::BuildError(
47-
msg->Record.GetOrigin(),
48-
msg->Record.GetTxId(),
49-
NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED,
50-
"");
51-
52-
UNIT_ASSERT(overloadSeqNo > 0);
53-
overloadedResult->Record.SetOverloadSubscribed(overloadSeqNo);
69+
UNIT_ASSERT(overloadSeqNo > 0);
70+
overloadedResult->Record.SetOverloadSubscribed(overloadSeqNo);
5471

55-
runtime.Send(ev->Recipient, ev->Sender, overloadedResult.release());
72+
runtime.Send(ev->Recipient, ev->Sender, overloadedResult.release());
5673

57-
auto overloadedReady = std::make_unique<TEvDataShard::TEvOverloadReady>(msg->Record.GetOrigin(), overloadSeqNo);
74+
auto overloadedReady = std::make_unique<TEvDataShard::TEvOverloadReady>(msg->Record.GetOrigin(), overloadSeqNo);
5875

59-
runtime.Send(ev->Recipient, ev->Sender, overloadedReady.release());
76+
runtime.Send(ev->Recipient, ev->Sender, overloadedReady.release());
6077

61-
responses.emplace_back(ev.Release());
78+
responses.emplace_back(ev.Release());
6279

63-
blockResults = false;
80+
blockResults = false;
6481

65-
return TTestActorRuntime::EEventAction::DROP;
66-
} else if (!blockResults && ev->GetTypeRewrite() == NEvents::TDataEvents::TEvWrite::EventType) {
82+
return TTestActorRuntime::EEventAction::DROP;
83+
}
84+
} else if (!blockResults && ev->GetTypeRewrite() == NEvents::TDataEvents::TEvWrite::EventType && ev->GetRecipientRewrite() == overloadedShardActor) {
6785
for(auto& ev : responses) {
6886
runtime.Send(ev.release());
6987
}
7088
responses.clear();
7189
requests.emplace_back(ev.Release());
7290
return TTestActorRuntime::EEventAction::DROP;
73-
} else if (ev->GetTypeRewrite() == NEvents::TDataEvents::TEvWrite::EventType) {
91+
} else if (ev->GetTypeRewrite() == NEvents::TDataEvents::TEvWrite::EventType && ev->GetRecipientRewrite() == overloadedShardActor) {
7492
auto* msg = ev->Get<NEvents::TDataEvents::TEvWrite>();
7593
overloadSeqNo = msg->Record.GetOverloadSubscribe();
7694
}
@@ -93,14 +111,16 @@ Y_UNIT_TEST_SUITE(KqpOverload) {
93111
return requests.size() >= requestsExpected;
94112
});
95113
runtime.DispatchEvents(opts);
96-
AFL_ENSURE(requests.size() == requestsExpected);
114+
UNIT_ASSERT(requests.size() == requestsExpected);
115+
UNIT_ASSERT(!blockResults);
116+
UNIT_ASSERT(overloadSeqNo > 0);
97117

98118
auto result = runtime.WaitFuture(future);
99119
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
100120
auto tx = result.GetTransaction();
101121
UNIT_ASSERT(tx);
102122

103-
123+
overloadSeqNo = 0;
104124
blockResults = true;
105125
++requestsExpected;
106126

@@ -109,7 +129,9 @@ Y_UNIT_TEST_SUITE(KqpOverload) {
109129
});
110130

111131
runtime.DispatchEvents(opts);
112-
AFL_ENSURE(requests.size() == requestsExpected);
132+
UNIT_ASSERT(requests.size() == requestsExpected);
133+
UNIT_ASSERT(!blockResults);
134+
UNIT_ASSERT(overloadSeqNo > 0);
113135

114136
auto commitResult = runtime.WaitFuture(commitFuture);
115137
UNIT_ASSERT_C(commitResult.IsSuccess(), commitResult.GetIssues().ToString());

0 commit comments

Comments
 (0)