Skip to content

Commit f6b9297

Browse files
authored
ydb_persqueue_public: fix flapping test (#4723)
1 parent 22bd429 commit f6b9297

File tree

3 files changed

+21
-8
lines changed

3 files changed

+21
-8
lines changed

ydb/public/sdk/cpp/client/ydb_persqueue_public/ut/retry_policy_ut.cpp

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -379,11 +379,11 @@ Y_UNIT_TEST_SUITE(RetryPolicy) {
379379
}
380380
Y_UNIT_TEST(RetryWithBatching) {
381381
auto setup = std::make_shared<TPersQueueYdbSdkTestSetup>(TEST_CASE_NAME);
382-
auto settings = setup->GetWriteSessionSettings();
383382
auto retryPolicy = std::make_shared<TYdbPqTestRetryPolicy>();
384-
settings.BatchFlushInterval(TDuration::Seconds(1000)); // Batch on size, not on time.
385-
settings.BatchFlushSizeBytes(100);
386-
settings.RetryPolicy(retryPolicy);
383+
auto settings = setup->GetWriteSessionSettings()
384+
.BatchFlushInterval(TDuration::Seconds(1000)) // Batch on size, not on time.
385+
.BatchFlushSizeBytes(100)
386+
.RetryPolicy(retryPolicy);
387387
auto& client = setup->GetPersQueueClient();
388388
auto writer = client.CreateWriteSession(settings);
389389
auto event = *writer->GetEvent(true);
@@ -393,6 +393,8 @@ Y_UNIT_TEST_SUITE(RetryPolicy) {
393393
TString message = "1234567890";
394394
ui64 seqNo = 0;
395395
setup->KickTablets();
396+
setup->WaitForTabletsDown();
397+
396398
writer->Write(std::move(continueToken), message, ++seqNo);
397399
retryPolicy->ExpectBreakDown();
398400
retryPolicy->WaitForRetriesSync(3);

ydb/public/sdk/cpp/client/ydb_persqueue_public/ut/ut_utils/sdk_test_setup.h

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,18 @@ class SDKTestSetup {
224224
Server.AnnoyingClient->KickNodeInHive(Server.CleverServer->GetRuntime(), i);
225225
}
226226
}
227-
227+
228+
void WaitForTabletsDown() {
229+
// After calling KickTablets wait until the tablets are in fact dead.
230+
231+
auto describeResult = Server.AnnoyingClient->Ls(GetTestTopicPath());
232+
UNIT_ASSERT_C(describeResult->Record.GetPathDescription().HasPersQueueGroup(), describeResult->Record);
233+
auto persQueueGroup = describeResult->Record.GetPathDescription().GetPersQueueGroup();
234+
for (const auto& p : persQueueGroup.GetPartitions()) {
235+
Server.AnnoyingClient->WaitForTabletDown(Server.CleverServer->GetRuntime(), p.GetTabletId(), true, TDuration::Max());
236+
}
237+
}
238+
228239
void AllowTablets() {
229240
for (ui32 i = 0; i < Server.CleverServer->StaticNodes() + Server.CleverServer->DynamicNodes(); i++) {
230241
Server.AnnoyingClient->MarkNodeInHive(Server.CleverServer->GetRuntime(), i, true);

ydb/public/sdk/cpp/client/ydb_persqueue_public/ut/ut_utils/test_server.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -137,12 +137,12 @@ class TTestServer {
137137
if (killPq)
138138
{
139139
THashSet<ui64> restartedTablets;
140-
for (const auto& p : persQueueGroup.GetPartitions())
141-
if (restartedTablets.insert(p.GetTabletId()).second)
142-
{
140+
for (const auto& p : persQueueGroup.GetPartitions()) {
141+
if (restartedTablets.insert(p.GetTabletId()).second) {
143142
Log << TLOG_INFO << "Kill PQ tablet " << p.GetTabletId();
144143
AnnoyingClient->KillTablet(*CleverServer, p.GetTabletId());
145144
}
145+
}
146146
}
147147

148148
CleverServer->GetRuntime()->DispatchEvents();

0 commit comments

Comments
 (0)