Skip to content

Commit 24a3abb

Browse files
authored
Moved sessions tests to C++ SDK repo (#17512)
1 parent e040fdf commit 24a3abb

File tree

8 files changed

+1178
-1290
lines changed

8 files changed

+1178
-1290
lines changed

ydb/public/sdk/cpp/tests/integration/sessions/main.cpp

+802
Large diffs are not rendered by default.

ydb/services/ydb/sdk_sessions_ut/ya.make renamed to ydb/public/sdk/cpp/tests/integration/sessions/ya.make

+4-5
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
UNITTEST_FOR(ydb/services/ydb)
1+
GTEST()
2+
INCLUDE(${ARCADIA_ROOT}/ydb/public/tools/ydb_recipe/recipe.inc)
23

34
FORK_SUBTESTS()
45

@@ -11,15 +12,13 @@ ELSE()
1112
ENDIF()
1213

1314
SRCS(
14-
sdk_sessions_ut.cpp
15+
main.cpp
1516
)
1617

1718
PEERDIR(
1819
ydb/public/sdk/cpp/src/library/grpc/client
19-
ydb/core/testlib/default
20-
ydb/core/testlib
2120
ydb/public/sdk/cpp/src/client/table
22-
ydb/public/lib/ut_helpers
21+
ydb/public/sdk/cpp/src/client/query
2322
)
2423

2524
YQL_LAST_ABI_VERSION()
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,367 @@
1+
#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/table/table.h>
2+
3+
#include <ydb/public/api/grpc/ydb_table_v1.grpc.pb.h>
4+
5+
#include <library/cpp/testing/gtest/gtest.h>
6+
7+
#include <random>
8+
#include <thread>
9+
10+
using namespace NYdb;
11+
using namespace NYdb::NTable;
12+
13+
class YdbSdkSessionsPool : public ::testing::TestWithParam<ui32> {
14+
protected:
15+
void SetUp() override {
16+
ui32 maxActiveSessions = GetParam();
17+
Driver = std::make_unique<NYdb::TDriver>(TDriverConfig().SetEndpoint(std::getenv("YDB_ENDPOINT")));
18+
19+
auto clientSettings = TClientSettings().SessionPoolSettings(
20+
TSessionPoolSettings()
21+
.MaxActiveSessions(maxActiveSessions)
22+
.KeepAliveIdleThreshold(TDuration::MilliSeconds(10))
23+
.CloseIdleThreshold(TDuration::MilliSeconds(10)));
24+
Client = std::make_unique<NYdb::NTable::TTableClient>(*Driver, clientSettings);
25+
}
26+
27+
void TearDown() override {
28+
Driver->Stop(true);
29+
}
30+
31+
protected:
32+
std::unique_ptr<NYdb::TDriver> Driver;
33+
std::unique_ptr<NYdb::NTable::TTableClient> Client;
34+
};
35+
36+
class YdbSdkSessionsPool1Session : public YdbSdkSessionsPool {};
37+
38+
enum class EAction: ui8 {
39+
CreateFuture,
40+
ExtractValue,
41+
Return
42+
};
43+
using TPlan = std::vector<std::pair<EAction, ui32>>;
44+
45+
46+
void CheckPlan(TPlan plan) {
47+
std::unordered_map<ui32, EAction> sessions;
48+
for (const auto& [action, sessionId]: plan) {
49+
if (action == EAction::CreateFuture) {
50+
ASSERT_FALSE(sessions.contains(sessionId));
51+
} else {
52+
ASSERT_TRUE(sessions.contains(sessionId));
53+
switch (sessions.at(sessionId)) {
54+
case EAction::CreateFuture: {
55+
ASSERT_EQ(action, EAction::ExtractValue);
56+
break;
57+
}
58+
case EAction::ExtractValue: {
59+
ASSERT_EQ(action, EAction::Return);
60+
break;
61+
}
62+
default: {
63+
ASSERT_TRUE(false);
64+
}
65+
}
66+
}
67+
sessions[sessionId] = action;
68+
}
69+
}
70+
71+
void RunPlan(const TPlan& plan, NYdb::NTable::TTableClient& client) {
72+
std::unordered_map<ui32, NThreading::TFuture<NYdb::NTable::TCreateSessionResult>> sessionFutures;
73+
std::unordered_map<ui32, NYdb::NTable::TCreateSessionResult> sessions;
74+
75+
ui32 requestedSessions = 0;
76+
77+
for (const auto& [action, sessionId]: plan) {
78+
switch (action) {
79+
case EAction::CreateFuture: {
80+
sessionFutures.emplace(sessionId, client.GetSession());
81+
++requestedSessions;
82+
std::this_thread::sleep_for(std::chrono::milliseconds(1));
83+
if (requestedSessions > client.GetActiveSessionsLimit()) {
84+
ASSERT_EQ(client.GetActiveSessionCount(), client.GetActiveSessionsLimit());
85+
}
86+
ASSERT_FALSE(sessionFutures.at(sessionId).HasValue());
87+
break;
88+
}
89+
case EAction::ExtractValue: {
90+
auto it = sessionFutures.find(sessionId);
91+
auto session = it->second.ExtractValueSync();
92+
sessionFutures.erase(it);
93+
sessions.emplace(sessionId, std::move(session));
94+
break;
95+
}
96+
case EAction::Return: {
97+
sessions.erase(sessionId);
98+
--requestedSessions;
99+
break;
100+
}
101+
}
102+
ASSERT_LE(client.GetActiveSessionCount(), client.GetActiveSessionsLimit());
103+
ASSERT_GE(client.GetActiveSessionCount(), static_cast<i64>(sessions.size()));
104+
ASSERT_LE(client.GetActiveSessionCount(), static_cast<i64>(sessions.size() + sessionFutures.size()));
105+
}
106+
}
107+
108+
int GetRand(std::mt19937& rng, int min, int max) {
109+
std::uniform_int_distribution<std::mt19937::result_type> dist(min, max);
110+
return dist(rng);
111+
}
112+
113+
114+
TPlan GenerateRandomPlan(ui32 numSessions) {
115+
TPlan plan;
116+
std::random_device dev;
117+
std::mt19937 rng(dev());
118+
119+
for (ui32 i = 0; i < numSessions; ++i) {
120+
std::uniform_int_distribution<std::mt19937::result_type> dist(0, plan.size());
121+
ui32 prevPos = 0;
122+
for (EAction action: {EAction::CreateFuture, EAction::ExtractValue, EAction::Return}) {
123+
int pos = GetRand(rng, prevPos, plan.size());
124+
plan.emplace(plan.begin() + pos, std::make_pair(action, i));
125+
prevPos = pos + 1;
126+
}
127+
}
128+
return plan;
129+
}
130+
131+
132+
TEST_P(YdbSdkSessionsPool1Session, GetSession) {
133+
ASSERT_EQ(Client->GetActiveSessionsLimit(), 1);
134+
ASSERT_EQ(Client->GetActiveSessionCount(), 0);
135+
ASSERT_EQ(Client->GetCurrentPoolSize(), 0);
136+
137+
{
138+
auto session = Client->GetSession().ExtractValueSync();
139+
140+
ASSERT_EQ(session.GetStatus(), EStatus::SUCCESS);
141+
ASSERT_EQ(Client->GetActiveSessionCount(), 1);
142+
ASSERT_EQ(Client->GetCurrentPoolSize(), 0);
143+
}
144+
145+
ASSERT_EQ(Client->GetActiveSessionCount(), 0);
146+
ASSERT_EQ(Client->GetCurrentPoolSize(), 1);
147+
}
148+
149+
void TestWaitQueue(NYdb::NTable::TTableClient& client, ui32 activeSessionsLimit) {
150+
std::vector<NThreading::TFuture<NYdb::NTable::TCreateSessionResult>> sessionFutures;
151+
std::vector<NYdb::NTable::TCreateSessionResult> sessions;
152+
153+
// exhaust the pool
154+
for (ui32 i = 0; i < activeSessionsLimit; ++i) {
155+
sessions.emplace_back(client.GetSession().ExtractValueSync());
156+
}
157+
ASSERT_EQ(client.GetActiveSessionCount(), activeSessionsLimit);
158+
159+
// next should be in the wait queue
160+
for (ui32 i = 0; i < activeSessionsLimit * 10; ++i) {
161+
sessionFutures.emplace_back(client.GetSession());
162+
}
163+
ASSERT_EQ(client.GetActiveSessionCount(), activeSessionsLimit);
164+
165+
// next should be a fake session
166+
{
167+
auto brokenSession = client.GetSession().ExtractValueSync();
168+
ASSERT_FALSE(brokenSession.IsSuccess());
169+
}
170+
171+
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
172+
for (auto& sessionFuture: sessionFutures) {
173+
ASSERT_FALSE(sessionFuture.HasValue());
174+
}
175+
176+
for (auto& sessionFuture: sessionFutures) {
177+
sessions.erase(sessions.begin());
178+
sessions.emplace_back(sessionFuture.ExtractValueSync());
179+
}
180+
ASSERT_EQ(client.GetActiveSessionCount(), activeSessionsLimit);
181+
}
182+
183+
TEST_P(YdbSdkSessionsPool, WaitQueue) {
184+
TestWaitQueue(*Client, GetParam());
185+
}
186+
187+
TEST_P(YdbSdkSessionsPool1Session, RunSmallPlan) {
188+
TPlan plan{
189+
{EAction::CreateFuture, 1},
190+
{EAction::ExtractValue, 1},
191+
{EAction::CreateFuture, 2},
192+
{EAction::Return, 1},
193+
{EAction::ExtractValue, 2},
194+
{EAction::Return, 2}
195+
};
196+
CheckPlan(plan);
197+
RunPlan(plan, *Client);
198+
199+
ASSERT_EQ(Client->GetActiveSessionCount(), 0);
200+
ASSERT_EQ(Client->GetCurrentPoolSize(), 1);
201+
}
202+
203+
TEST_P(YdbSdkSessionsPool1Session, CustomPlan) {
204+
TPlan plan{
205+
{EAction::CreateFuture, 1}
206+
};
207+
CheckPlan(plan);
208+
RunPlan(plan, *Client);
209+
210+
std::this_thread::sleep_for(std::chrono::milliseconds(10000));
211+
212+
ASSERT_EQ(Client->GetActiveSessionCount(), 0);
213+
}
214+
215+
ui32 RunStressTestSync(ui32 n, ui32 activeSessionsLimit, NYdb::NTable::TTableClient& client) {
216+
std::vector<NThreading::TFuture<NYdb::NTable::TCreateSessionResult>> sessionFutures;
217+
std::vector<NYdb::NTable::TCreateSessionResult> sessions;
218+
std::mt19937 rng(0);
219+
ui32 successCount = 0;
220+
221+
for (ui32 i = 0; i < activeSessionsLimit * 12; ++i) {
222+
sessionFutures.emplace_back(client.GetSession());
223+
}
224+
225+
for (ui32 i = 0; i < n; ++i) {
226+
switch (static_cast<EAction>(GetRand(rng, 0, 2))) {
227+
case EAction::CreateFuture: {
228+
sessionFutures.emplace_back(client.GetSession());
229+
break;
230+
}
231+
case EAction::ExtractValue: {
232+
if (sessionFutures.empty()) {
233+
break;
234+
}
235+
auto ind = GetRand(rng, 0, sessionFutures.size() - 1);
236+
auto sessionFuture = sessionFutures[ind];
237+
if (sessionFuture.HasValue()) {
238+
auto session = sessionFuture.ExtractValueSync();
239+
if (session.IsSuccess()) {
240+
++successCount;
241+
}
242+
sessions.emplace_back(std::move(session));
243+
sessionFutures.erase(sessionFutures.begin() + ind);
244+
break;
245+
}
246+
break;
247+
}
248+
case EAction::Return: {
249+
if (sessions.empty()) {
250+
break;
251+
}
252+
auto ind = GetRand(rng, 0, sessions.size() - 1);
253+
sessions.erase(sessions.begin() + ind);
254+
break;
255+
}
256+
}
257+
}
258+
return successCount;
259+
}
260+
261+
TEST_P(YdbSdkSessionsPool, StressTestSync) {
262+
ui32 activeSessionsLimit = GetParam();
263+
264+
RunStressTestSync(1000, activeSessionsLimit, *Client);
265+
266+
std::this_thread::sleep_for(std::chrono::milliseconds(10000));
267+
268+
ASSERT_EQ(Client->GetActiveSessionCount(), 0);
269+
ASSERT_EQ(Client->GetCurrentPoolSize(), activeSessionsLimit);
270+
}
271+
272+
ui32 RunStressTestAsync(ui32 n, ui32 nThreads, NYdb::NTable::TTableClient& client) {
273+
std::atomic<ui32> successCount(0);
274+
std::atomic<ui32> jobIndex(0);
275+
276+
auto job = [&client, &successCount, &jobIndex, n]() mutable {
277+
std::mt19937 rng(++jobIndex);
278+
for (ui32 i = 0; i < n; ++i) {
279+
std::this_thread::sleep_for(std::chrono::milliseconds(GetRand(rng, 1, 100)));
280+
auto sessionFuture = client.GetSession();
281+
std::this_thread::sleep_for(std::chrono::milliseconds(GetRand(rng, 1, 100)));
282+
auto session = sessionFuture.ExtractValueSync();
283+
std::this_thread::sleep_for(std::chrono::milliseconds(GetRand(rng, 1, 100)));
284+
successCount += session.IsSuccess();
285+
}
286+
};
287+
288+
std::vector<std::thread> threads;
289+
for (ui32 i = 0; i < nThreads; i++) {
290+
threads.emplace_back(job);
291+
}
292+
for (auto& thread: threads) {
293+
thread.join();
294+
}
295+
296+
return successCount;
297+
}
298+
299+
TEST_P(YdbSdkSessionsPool, StressTestAsync) {
300+
ui32 activeSessionsLimit = GetParam();
301+
ui32 iterations = (activeSessionsLimit == 1) ? 100 : 1000;
302+
303+
RunStressTestAsync(iterations, 10, *Client);
304+
305+
std::this_thread::sleep_for(std::chrono::milliseconds(10000));
306+
307+
ASSERT_EQ(Client->GetActiveSessionCount(), 0);
308+
ASSERT_EQ(Client->GetCurrentPoolSize(), activeSessionsLimit);
309+
}
310+
311+
void TestPeriodicTask(ui32 activeSessionsLimit, NYdb::NTable::TTableClient& client) {
312+
std::vector<NThreading::TFuture<NYdb::NTable::TCreateSessionResult>> sessionFutures;
313+
std::vector<NYdb::NTable::TCreateSessionResult> sessions;
314+
315+
for (ui32 i = 0; i < activeSessionsLimit; ++i) {
316+
sessions.emplace_back(client.GetSession().ExtractValueSync());
317+
ASSERT_TRUE(sessions.back().IsSuccess());
318+
}
319+
320+
for (ui32 i = 0; i < activeSessionsLimit; ++i) {
321+
sessionFutures.emplace_back(client.GetSession());
322+
}
323+
324+
std::this_thread::sleep_for(std::chrono::milliseconds(100));
325+
326+
for (auto& sessionFuture : sessionFutures) {
327+
ASSERT_FALSE(sessionFuture.HasValue());
328+
}
329+
330+
// Wait for wait session timeout
331+
std::this_thread::sleep_for(std::chrono::milliseconds(10000));
332+
333+
for (auto& sessionFuture : sessionFutures) {
334+
ASSERT_TRUE(sessionFuture.HasValue());
335+
ASSERT_FALSE(sessionFuture.ExtractValueSync().IsSuccess());
336+
}
337+
338+
ASSERT_EQ(client.GetActiveSessionCount(), activeSessionsLimit);
339+
340+
sessionFutures.clear();
341+
sessions.clear();
342+
343+
std::this_thread::sleep_for(std::chrono::milliseconds(10000));
344+
ASSERT_EQ(client.GetActiveSessionCount(), 0);
345+
ASSERT_EQ(client.GetCurrentPoolSize(), activeSessionsLimit);
346+
}
347+
348+
TEST_P(YdbSdkSessionsPool, PeriodicTask) {
349+
TestPeriodicTask(GetParam(), *Client);
350+
}
351+
352+
TEST_P(YdbSdkSessionsPool1Session, FailTest) {
353+
// This test reproduces bug from KIKIMR-18063
354+
auto sessionFromPool = Client->GetSession().ExtractValueSync();
355+
auto futureInWaitPool = Client->GetSession();
356+
357+
{
358+
auto standaloneSessionThatWillBeBroken = Client->CreateSession().ExtractValueSync();
359+
auto res = standaloneSessionThatWillBeBroken.GetSession().ExecuteDataQuery("SELECT COUNT(*) FROM `Root/Test`;",
360+
TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(),
361+
NYdb::NTable::TExecDataQuerySettings().ClientTimeout(TDuration::MicroSeconds(10))).GetValueSync();
362+
}
363+
}
364+
365+
INSTANTIATE_TEST_SUITE_P(, YdbSdkSessionsPool, ::testing::Values(1, 10));
366+
367+
INSTANTIATE_TEST_SUITE_P(, YdbSdkSessionsPool1Session, ::testing::Values(1));

0 commit comments

Comments
 (0)