Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions ydb/core/kqp/runtime/kqp_write_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -668,6 +668,7 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
const auto metadata = ShardedWriteController->GetMessageMetadata(shardId);
if (metadata && seqNo + 1 == metadata->NextOverloadSeqNo) {
CA_LOG_D("Retry Overloaded ShardID=" << shardId);
ResetShardRetries(shardId, metadata->Cookie);
SendDataToShard(shardId);
}
}
Expand Down Expand Up @@ -1077,6 +1078,7 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {

const auto metadata = ShardedWriteController->GetMessageMetadata(shardId);
YQL_ENSURE(metadata);
YQL_ENSURE(metadata->SendAttempts == 0 || InconsistentTx);
if (metadata->SendAttempts >= MessageSettings.MaxWriteAttempts) {
CA_LOG_W("ShardId=" << shardId
<< " for table '" << TablePath
Expand Down
72 changes: 47 additions & 25 deletions ydb/core/kqp/ut/effects/kqp_overload_ut.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include <ydb/core/kqp/ut/common/kqp_ut_common.h>
#include <ydb/core/tx/data_events/events.h>
#include <ydb/core/tx/datashard/datashard.h>
#include <ydb/core/testlib/tablet_helpers.h>

#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/proto/accessor.h>

Expand All @@ -15,7 +16,7 @@ using namespace NYdb::NQuery;

Y_UNIT_TEST_SUITE(KqpOverload) {

Y_UNIT_TEST(OltpOverloaded) {
Y_UNIT_TEST_TWIN(OltpOverloaded, Distributed) {
TKikimrSettings settings;
settings.SetUseRealThreads(false);
settings.AppConfig.MutableTableServiceConfig()->SetEnableOltpSink(true);
Expand All @@ -27,50 +28,67 @@ Y_UNIT_TEST_SUITE(KqpOverload) {
auto& runtime = *kikimr.GetTestServer().GetRuntime();
Y_UNUSED(runtime);

auto edgeActor = runtime.AllocateEdgeActor();

const auto& shards = GetTableShards(
&kikimr.GetTestServer(),
edgeActor,
Distributed ? "/Root/TwoShard" : "/Root/KeyValue");
UNIT_ASSERT(Distributed ? shards.size() == 2 : shards.size() == 1);
const auto overloadedShard = shards[0];
const auto overloadedShardActor = ResolveTablet(runtime, overloadedShard);

{
const TString query(Q1_(R"(
UPSERT INTO `/Root/KeyValue` (Key, Value) VALUES (1, 'value');
SELECT * FROM `/Root/KeyValue`;
)"));
const TString query =
Distributed
? R"(
UPSERT INTO `/Root/TwoShard` (Key, Value1) VALUES (1, 'value');
SELECT * FROM `/Root/TwoShard`;
)"
: R"(
UPSERT INTO `/Root/KeyValue` (Key, Value) VALUES (1, 'value');
SELECT * FROM `/Root/KeyValue`;
)";

std::vector<std::unique_ptr<IEventHandle>> requests;
std::vector<std::unique_ptr<IEventHandle>> responses;
bool blockResults = true;

size_t overloadSeqNo = 0;
size_t overloadSeqNo = 0;

auto grab = [&](TAutoPtr<IEventHandle> &ev) -> auto {
if (blockResults && ev->GetTypeRewrite() == NEvents::TDataEvents::TEvWriteResult::EventType) {
auto* msg = ev->Get<NEvents::TDataEvents::TEvWriteResult>();
if (msg->Record.GetOrigin() == overloadedShard) {
auto overloadedResult = NEvents::TDataEvents::TEvWriteResult::BuildError(
msg->Record.GetOrigin(),
msg->Record.GetTxId(),
NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED,
"");

auto overloadedResult = NEvents::TDataEvents::TEvWriteResult::BuildError(
msg->Record.GetOrigin(),
msg->Record.GetTxId(),
NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED,
"");

UNIT_ASSERT(overloadSeqNo > 0);
overloadedResult->Record.SetOverloadSubscribed(overloadSeqNo);
UNIT_ASSERT(overloadSeqNo > 0);
overloadedResult->Record.SetOverloadSubscribed(overloadSeqNo);

runtime.Send(ev->Recipient, ev->Sender, overloadedResult.release());
runtime.Send(ev->Recipient, ev->Sender, overloadedResult.release());

auto overloadedReady = std::make_unique<TEvDataShard::TEvOverloadReady>(msg->Record.GetOrigin(), overloadSeqNo);
auto overloadedReady = std::make_unique<TEvDataShard::TEvOverloadReady>(msg->Record.GetOrigin(), overloadSeqNo);

runtime.Send(ev->Recipient, ev->Sender, overloadedReady.release());
runtime.Send(ev->Recipient, ev->Sender, overloadedReady.release());

responses.emplace_back(ev.Release());
responses.emplace_back(ev.Release());

blockResults = false;
blockResults = false;

return TTestActorRuntime::EEventAction::DROP;
} else if (!blockResults && ev->GetTypeRewrite() == NEvents::TDataEvents::TEvWrite::EventType) {
return TTestActorRuntime::EEventAction::DROP;
}
} else if (!blockResults && ev->GetTypeRewrite() == NEvents::TDataEvents::TEvWrite::EventType && ev->GetRecipientRewrite() == overloadedShardActor) {
for(auto& ev : responses) {
runtime.Send(ev.release());
}
responses.clear();
requests.emplace_back(ev.Release());
return TTestActorRuntime::EEventAction::DROP;
} else if (ev->GetTypeRewrite() == NEvents::TDataEvents::TEvWrite::EventType) {
} else if (ev->GetTypeRewrite() == NEvents::TDataEvents::TEvWrite::EventType && ev->GetRecipientRewrite() == overloadedShardActor) {
auto* msg = ev->Get<NEvents::TDataEvents::TEvWrite>();
overloadSeqNo = msg->Record.GetOverloadSubscribe();
}
Expand All @@ -93,14 +111,16 @@ Y_UNIT_TEST_SUITE(KqpOverload) {
return requests.size() >= requestsExpected;
});
runtime.DispatchEvents(opts);
AFL_ENSURE(requests.size() == requestsExpected);
UNIT_ASSERT(requests.size() == requestsExpected);
UNIT_ASSERT(!blockResults);
UNIT_ASSERT(overloadSeqNo > 0);

auto result = runtime.WaitFuture(future);
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
auto tx = result.GetTransaction();
UNIT_ASSERT(tx);


overloadSeqNo = 0;
blockResults = true;
++requestsExpected;

Expand All @@ -109,7 +129,9 @@ Y_UNIT_TEST_SUITE(KqpOverload) {
});

runtime.DispatchEvents(opts);
AFL_ENSURE(requests.size() == requestsExpected);
UNIT_ASSERT(requests.size() == requestsExpected);
UNIT_ASSERT(!blockResults);
UNIT_ASSERT(overloadSeqNo > 0);

auto commitResult = runtime.WaitFuture(commitFuture);
UNIT_ASSERT_C(commitResult.IsSuccess(), commitResult.GetIssues().ToString());
Expand Down
Loading