Skip to content

Commit d227d31

Browse files
duplicated code in tests (#3149)
1 parent b2f7f35 commit d227d31

File tree

1 file changed

+67
-115
lines changed

1 file changed

+67
-115
lines changed

ydb/core/persqueue/ut/partition_ut.cpp

Lines changed: 67 additions & 115 deletions
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,8 @@ class TPartitionFixture : public NUnitTest::TBaseFixture {
263263
void SendGetWriteInfo(ui32 internalPartitionId);
264264
void ShadowPartitionCountersTest(bool isFirstClass);
265265

266+
void TestWriteSubDomainOutOfSpace(TDuration quotaWaitDuration, bool ignoreQuotaDeadline);
267+
266268
TMaybe<TTestContext> Ctx;
267269
TMaybe<TFinalizer> Finalizer;
268270

@@ -1042,6 +1044,69 @@ void TPartitionFixture::ShadowPartitionCountersTest(bool isFirstClass) {
10421044

10431045
}
10441046

1047+
void TPartitionFixture::TestWriteSubDomainOutOfSpace(TDuration quotaWaitDuration, bool ignoreQuotaDeadline)
1048+
{
1049+
Ctx->Runtime->GetAppData().FeatureFlags.SetEnableTopicDiskSubDomainQuota(true);
1050+
Ctx->Runtime->GetAppData().PQConfig.MutableQuotingConfig()->SetQuotaWaitDurationMs(quotaWaitDuration.MilliSeconds());
1051+
Ctx->Runtime->SetLogPriority( NKikimrServices::PERSQUEUE, NActors::NLog::PRI_DEBUG);
1052+
1053+
CreatePartition({
1054+
.Partition=TPartitionId{1},
1055+
.Begin=0, .End=10,
1056+
//
1057+
// partition configuration
1058+
//
1059+
.Config={.Version=1, .Consumers={{.Consumer="client-1", .Offset=3}}}
1060+
},
1061+
//
1062+
// tablet configuration
1063+
//
1064+
{.Version=2, .Consumers={{.Consumer="client-1"}}});
1065+
1066+
SendSubDomainStatus(true);
1067+
1068+
ui64 cookie = 1;
1069+
ui64 messageNo = 0;
1070+
1071+
SendChangeOwner(cookie, "owner1", Ctx->Edge, true);
1072+
auto ownerEvent = Ctx->Runtime->GrabEdgeEvent<TEvPQ::TEvProxyResponse>(TDuration::Seconds(1));
1073+
UNIT_ASSERT(ownerEvent != nullptr);
1074+
auto ownerCookie = ownerEvent->Response->GetPartitionResponse().GetCmdGetOwnershipResult().GetOwnerCookie();
1075+
1076+
TAutoPtr<IEventHandle> handle;
1077+
std::function<bool(const TEvPQ::TEvProxyResponse&)> truth = [&](const TEvPQ::TEvProxyResponse& e) {
1078+
return cookie == e.Cookie;
1079+
};
1080+
1081+
TString data = "data for write";
1082+
1083+
// First message will be processed because used storage 0 and limit 0. That is, the limit is not exceeded.
1084+
SendWrite(++cookie, messageNo, ownerCookie, (messageNo + 1) * 100, data, ignoreQuotaDeadline);
1085+
messageNo++;
1086+
1087+
SendDiskStatusResponse();
1088+
{
1089+
auto event = Ctx->Runtime->GrabEdgeEventIf<TEvPQ::TEvProxyResponse>(handle, truth, TDuration::Seconds(1));
1090+
UNIT_ASSERT(event != nullptr);
1091+
}
1092+
1093+
// Second message will not be processed because the limit is exceeded.
1094+
SendWrite(++cookie, messageNo, ownerCookie, (messageNo + 1) * 100, data, ignoreQuotaDeadline);
1095+
messageNo++;
1096+
1097+
SendDiskStatusResponse();
1098+
auto event = Ctx->Runtime->GrabEdgeEventIf<TEvPQ::TEvProxyResponse>(handle, truth, TDuration::Seconds(1));
1099+
UNIT_ASSERT(event == nullptr);
1100+
1101+
// SudDomain quota available - second message will be processed..
1102+
SendSubDomainStatus(false);
1103+
SendDiskStatusResponse();
1104+
1105+
event = Ctx->Runtime->GrabEdgeEventIf<TEvPQ::TEvProxyResponse>(handle, truth, TDuration::Seconds(1));
1106+
UNIT_ASSERT(event != nullptr);
1107+
UNIT_ASSERT_EQUAL(NMsgBusProxy::MSTATUS_OK, event->Response->GetStatus());
1108+
}
1109+
10451110
Y_UNIT_TEST_F(Batching, TPartitionFixture)
10461111
{
10471112
CreatePartition();
@@ -1666,125 +1731,12 @@ Y_UNIT_TEST_F(WriteSubDomainOutOfSpace, TPartitionFixture)
16661731

16671732
Y_UNIT_TEST_F(WriteSubDomainOutOfSpace_DisableExpiration, TPartitionFixture)
16681733
{
1669-
Ctx->Runtime->GetAppData().FeatureFlags.SetEnableTopicDiskSubDomainQuota(true);
1670-
// disable write request expiration while thes wait quota
1671-
Ctx->Runtime->GetAppData().PQConfig.MutableQuotingConfig()->SetQuotaWaitDurationMs(0);
1672-
Ctx->Runtime->SetLogPriority( NKikimrServices::PERSQUEUE, NActors::NLog::PRI_DEBUG);
1673-
1674-
CreatePartition({
1675-
.Partition=TPartitionId{1},
1676-
.Begin=0, .End=10,
1677-
//
1678-
// partition configuration
1679-
//
1680-
.Config={.Version=1, .Consumers={{.Consumer="client-1", .Offset=3}}}
1681-
},
1682-
//
1683-
// tablet configuration
1684-
//
1685-
{.Version=2, .Consumers={{.Consumer="client-1"}}});
1686-
1687-
SendSubDomainStatus(true);
1688-
1689-
ui64 cookie = 1;
1690-
ui64 messageNo = 0;
1691-
1692-
SendChangeOwner(cookie, "owner1", Ctx->Edge, true);
1693-
auto ownerEvent = Ctx->Runtime->GrabEdgeEvent<TEvPQ::TEvProxyResponse>(TDuration::Seconds(1));
1694-
UNIT_ASSERT(ownerEvent != nullptr);
1695-
auto ownerCookie = ownerEvent->Response->GetPartitionResponse().GetCmdGetOwnershipResult().GetOwnerCookie();
1696-
1697-
TAutoPtr<IEventHandle> handle;
1698-
std::function<bool(const TEvPQ::TEvProxyResponse&)> truth = [&](const TEvPQ::TEvProxyResponse& e) {
1699-
return cookie == e.Cookie;
1700-
};
1701-
1702-
TString data = "data for write";
1703-
1704-
// First message will be processed because used storage 0 and limit 0. That is, the limit is not exceeded.
1705-
SendWrite(++cookie, messageNo, ownerCookie, (messageNo + 1) * 100, data);
1706-
messageNo++;
1707-
1708-
SendDiskStatusResponse();
1709-
{
1710-
auto event = Ctx->Runtime->GrabEdgeEventIf<TEvPQ::TEvProxyResponse>(handle, truth, TDuration::Seconds(1));
1711-
UNIT_ASSERT(event != nullptr);
1712-
}
1713-
1714-
// Second message will not be processed because the limit is exceeded.
1715-
SendWrite(++cookie, messageNo, ownerCookie, (messageNo + 1) * 100, data);
1716-
messageNo++;
1717-
1718-
SendDiskStatusResponse();
1719-
auto event = Ctx->Runtime->GrabEdgeEventIf<TEvPQ::TEvProxyResponse>(handle, truth, TDuration::Seconds(1));
1720-
UNIT_ASSERT(event == nullptr);
1721-
1722-
// SudDomain quota available - second message will be processed..
1723-
SendSubDomainStatus(false);
1724-
SendDiskStatusResponse();
1725-
1726-
event = Ctx->Runtime->GrabEdgeEventIf<TEvPQ::TEvProxyResponse>(handle, truth, TDuration::Seconds(1));
1727-
UNIT_ASSERT(event != nullptr);
1728-
UNIT_ASSERT_EQUAL(NMsgBusProxy::MSTATUS_OK, event->Response->GetStatus());
1734+
TestWriteSubDomainOutOfSpace(TDuration::MilliSeconds(0), false);
17291735
}
17301736

17311737
Y_UNIT_TEST_F(WriteSubDomainOutOfSpace_IgnoreQuotaDeadline, TPartitionFixture)
17321738
{
1733-
Ctx->Runtime->GetAppData().FeatureFlags.SetEnableTopicDiskSubDomainQuota(true);
1734-
Ctx->Runtime->GetAppData().PQConfig.MutableQuotingConfig()->SetQuotaWaitDurationMs(300);
1735-
1736-
CreatePartition({
1737-
.Partition=TPartitionId{1},
1738-
.Begin=0, .End=10,
1739-
//
1740-
// partition configuration
1741-
//
1742-
.Config={.Version=1, .Consumers={{.Consumer="client-1", .Offset=3}}}
1743-
},
1744-
//
1745-
// tablet configuration
1746-
//
1747-
{.Version=2, .Consumers={{.Consumer="client-1"}}});
1748-
1749-
SendSubDomainStatus(true);
1750-
1751-
ui64 cookie = 1;
1752-
ui64 messageNo = 0;
1753-
1754-
SendChangeOwner(cookie, "owner1", Ctx->Edge, true);
1755-
auto ownerEvent = Ctx->Runtime->GrabEdgeEvent<TEvPQ::TEvProxyResponse>(TDuration::Seconds(1));
1756-
UNIT_ASSERT(ownerEvent != nullptr);
1757-
auto ownerCookie = ownerEvent->Response->GetPartitionResponse().GetCmdGetOwnershipResult().GetOwnerCookie();
1758-
1759-
TAutoPtr<IEventHandle> handle;
1760-
std::function<bool(const TEvPQ::TEvProxyResponse&)> truth = [&](const TEvPQ::TEvProxyResponse& e) { return cookie == e.Cookie; };
1761-
1762-
TString data = "data for write";
1763-
1764-
// First message will be processed because used storage 0 and limit 0. That is, the limit is not exceeded.
1765-
SendWrite(++cookie, messageNo, ownerCookie, (messageNo + 1) * 100, data, true);
1766-
messageNo++;
1767-
SendDiskStatusResponse();
1768-
{
1769-
auto event = Ctx->Runtime->GrabEdgeEventIf<TEvPQ::TEvProxyResponse>(handle, truth, TDuration::Seconds(1));
1770-
UNIT_ASSERT(event != nullptr);
1771-
}
1772-
1773-
// Second message will not be processed because the limit is exceeded.
1774-
SendWrite(++cookie, messageNo, ownerCookie, (messageNo + 1) * 100, data, true);
1775-
messageNo++;
1776-
1777-
SendDiskStatusResponse();
1778-
auto event = Ctx->Runtime->GrabEdgeEventIf<TEvPQ::TEvProxyResponse>(handle, truth, TDuration::Seconds(1));
1779-
UNIT_ASSERT(event == nullptr);
1780-
1781-
// SudDomain quota available - second message will be processed..
1782-
SendSubDomainStatus(false);
1783-
SendDiskStatusResponse();
1784-
1785-
event = Ctx->Runtime->GrabEdgeEventIf<TEvPQ::TEvProxyResponse>(handle, truth, TDuration::Seconds(1));
1786-
UNIT_ASSERT(event != nullptr);
1787-
UNIT_ASSERT_EQUAL(NMsgBusProxy::MSTATUS_OK, event->Response->GetStatus());
1739+
TestWriteSubDomainOutOfSpace(TDuration::MilliSeconds(300), true);
17881740
}
17891741

17901742
Y_UNIT_TEST_F(GetPartitionWriteInfoSuccess, TPartitionFixture) {

0 commit comments

Comments
 (0)