Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions ydb/core/tx/datashard/datashard__read_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2645,6 +2645,16 @@ void TDataShard::Handle(TEvDataShard::TEvRead::TPtr& ev, const TActorContext& ct
return;
}

if (State == TShardState::PreOffline ||
State == TShardState::Offline)
{
replyWithError(
Ydb::StatusIds::NOT_FOUND,
TStringBuilder() << "Shard " << TabletID() << " finished splitting/merging"
<< " (node# " << SelfId().NodeId() << " state# " << DatashardStateName(State) << ")");
return;
}

if (!IsStateNewReadAllowed()) {
replyWithError(
Ydb::StatusIds::OVERLOADED,
Expand Down
8 changes: 6 additions & 2 deletions ydb/core/tx/datashard/datashard_ut_common_kqp.h
Original file line number Diff line number Diff line change
Expand Up @@ -186,10 +186,14 @@ namespace NKqpHelpers {
return KqpSimpleExec(runtime, query, true, database);
}

inline TString KqpSimpleBegin(TTestActorRuntime& runtime, TString& sessionId, TString& txId, const TString& query) {
inline auto KqpSimpleBeginSend(TTestActorRuntime& runtime, TString& sessionId, const TString& query) {
sessionId = CreateSessionRPC(runtime);
return SendRequest(runtime, MakeSimpleRequestRPC(query, sessionId, /* txId */ {}, false /* commitTx */));
}

inline TString KqpSimpleBegin(TTestActorRuntime& runtime, TString& sessionId, TString& txId, const TString& query) {
txId.clear();
auto response = AwaitResponse(runtime, SendRequest(runtime, MakeSimpleRequestRPC(query, sessionId, txId, false /* commitTx */)));
auto response = AwaitResponse(runtime, KqpSimpleBeginSend(runtime, sessionId, query));
if (response.operation().status() != Ydb::StatusIds::SUCCESS) {
return TStringBuilder() << "ERROR: " << response.operation().status();
}
Expand Down
72 changes: 69 additions & 3 deletions ydb/core/tx/datashard/datashard_ut_read_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3970,7 +3970,7 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) {
Y_UNIT_TEST(HandleMvccGoneInContinue) {
// TODO
}
};
}

Y_UNIT_TEST_SUITE(DataShardReadIteratorSysTables) {
Y_UNIT_TEST(ShouldRead) {
Expand Down Expand Up @@ -4054,7 +4054,7 @@ Y_UNIT_TEST_SUITE(DataShardReadIteratorSysTables) {

UNIT_ASSERT_VALUES_EQUAL(record.GetStatus().GetCode(), Ydb::StatusIds::UNSUPPORTED);
}
};
}

Y_UNIT_TEST_SUITE(DataShardReadIteratorState) {
Y_UNIT_TEST(ShouldCalculateQuota) {
Expand Down Expand Up @@ -4105,7 +4105,7 @@ Y_UNIT_TEST_SUITE(DataShardReadIteratorState) {
UNIT_ASSERT_VALUES_EQUAL(state.Quota.Bytes, 131729);
UNIT_ASSERT(state.State == NDataShard::TReadIteratorState::EState::Executing);
}
};
}

Y_UNIT_TEST_SUITE(DataShardReadIteratorPageFaults) {
Y_UNIT_TEST(CancelPageFaultedReadThenDropTable) {
Expand Down Expand Up @@ -4755,4 +4755,70 @@ Y_UNIT_TEST_SUITE(DataShardReadIteratorConsistency) {

}

Y_UNIT_TEST_SUITE(DataShardReadIteratorLatency) {

Y_UNIT_TEST(ReadSplitLatency) {
TPortManager pm;
TServerSettings serverSettings(pm.GetPort(2134));
serverSettings.SetDomainName("Root")
.SetUseRealThreads(false);
TServer::TPtr server = new TServer(serverSettings);

auto& runtime = *server->GetRuntime();
auto sender = runtime.AllocateEdgeActor();

runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);

InitRoot(server, sender);

TDisableDataShardLogBatching disableDataShardLogBatching;

CreateShardedTable(server, sender, "/Root", "table-1", 1);

// Insert initial data
ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 10), (2, 20), (3, 30), (4, 40), (5, 50);");
ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (6, 60), (7, 70), (8, 80), (9, 90), (10, 100);");

// Copy table (this will ensure original shards stay alive after split)
{
auto senderCopy = runtime.AllocateEdgeActor();
ui64 txId = AsyncCreateCopyTable(server, senderCopy, "/Root", "table-2", "/Root/table-1");
WaitTxNotification(server, senderCopy, txId);
}

TBlockEvents<TEvDataShard::TEvRead> blockedReads(runtime);

Cerr << "... starting read from table-1" << Endl;
TString readSessionId;
auto readFuture = KqpSimpleBeginSend(runtime, readSessionId, R"(
SELECT * FROM `/Root/table-1` ORDER BY key;
)");

runtime.WaitFor("blocked TEvRead", [&]{ return blockedReads.size() >= 1; });

{
Cerr << "... splitting table-1" << Endl;
SetSplitMergePartCountLimit(server->GetRuntime(), -1);
auto shards1before = GetTableShards(server, sender, "/Root/table-1");
ui64 txId = AsyncSplitTable(server, sender, "/Root/table-1", shards1before.at(0), 5);
Cerr << "... split txId# " << txId << " started" << Endl;
WaitTxNotification(server, sender, txId);
Cerr << "... split txId# " << txId << " finished" << Endl;
}

runtime.SimulateSleep(TDuration::MilliSeconds(1));

auto readStartTs = runtime.GetCurrentTime();
blockedReads.Unblock();
blockedReads.Stop();
auto readResponse = runtime.WaitFuture(std::move(readFuture));
UNIT_ASSERT_VALUES_EQUAL(readResponse.operation().status(), Ydb::StatusIds::SUCCESS);
auto readLatency = runtime.GetCurrentTime() - readStartTs;
Cerr << "... read latency was " << readLatency << Endl;
UNIT_ASSERT_C(readLatency < TDuration::MilliSeconds(100),
"unexpected read latency " << readLatency);
}

}

} // namespace NKikimr
Loading