Skip to content

Commit b62d4c4

Browse files
refactoring the transaction processing cycle (#1196)
1 parent 543b859 commit b62d4c4

File tree

3 files changed

+200
-95
lines changed

3 files changed

+200
-95
lines changed

ydb/core/persqueue/partition.cpp

Lines changed: 119 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,22 @@ struct TMirrorerInfo {
5050
TTabletCountersBase Baseline;
5151
};
5252

53+
template <class T>
54+
T& TPartition::GetUserActionAndTransactionEventsFront()
55+
{
56+
Y_ABORT_UNLESS(!UserActionAndTransactionEvents.empty());
57+
auto* ptr = get_if<TSimpleSharedPtr<T>>(&UserActionAndTransactionEvents.front());
58+
Y_ABORT_UNLESS(ptr);
59+
return **ptr;
60+
}
61+
62+
template <class T>
63+
bool TPartition::UserActionAndTransactionEventsFrontIs() const
64+
{
65+
Y_ABORT_UNLESS(!UserActionAndTransactionEvents.empty());
66+
return get_if<TSimpleSharedPtr<T>>(&UserActionAndTransactionEvents.front());
67+
}
68+
5369
const TString& TPartition::TopicName() const {
5470
return TopicConverter->GetClientsideName();
5571
}
@@ -192,9 +208,10 @@ TPartition::TPartition(ui64 tabletId, ui32 partition, const TActorId& tablet, ui
192208
TabletCounters.Populate(Counters);
193209

194210
if (!distrTxs.empty()) {
195-
std::move(distrTxs.begin(), distrTxs.end(),
196-
std::back_inserter(DistrTxs));
197-
TxInProgress = DistrTxs.front().Predicate.Defined();
211+
for (auto& tx : distrTxs) {
212+
UserActionAndTransactionEvents.emplace_back(new TTransaction(std::move(tx)));
213+
}
214+
TxInProgress = GetUserActionAndTransactionEventsFront<TTransaction>().Predicate.Defined();
198215
}
199216
}
200217

@@ -566,7 +583,6 @@ void TPartition::Handle(TEvPQ::TEvChangePartitionConfig::TPtr& ev, const TActorC
566583

567584

568585
void TPartition::Handle(TEvPQ::TEvPipeDisconnected::TPtr& ev, const TActorContext& ctx) {
569-
570586
const TString& owner = ev->Get()->Owner;
571587
const TActorId& pipeClient = ev->Get()->PipeClient;
572588

@@ -854,7 +870,7 @@ void TPartition::Handle(TEvPersQueue::TEvProposeTransaction::TPtr& ev, const TAc
854870
return;
855871
}
856872

857-
if (ImmediateTxs.size() > MAX_TXS) {
873+
if (ImmediateTxCount >= MAX_TXS) {
858874
ReplyPropose(ctx,
859875
event,
860876
NKikimrPQ::TEvProposeTransactionResult::OVERLOADED);
@@ -1397,55 +1413,61 @@ void TPartition::Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorContext&
13971413

13981414
void TPartition::PushBackDistrTx(TSimpleSharedPtr<TEvPQ::TEvTxCalcPredicate> event)
13991415
{
1400-
DistrTxs.emplace_back(std::move(event));
1416+
UserActionAndTransactionEvents.emplace_back(new TTransaction(std::move(event)));
14011417
}
14021418

14031419
void TPartition::PushBackDistrTx(TSimpleSharedPtr<TEvPQ::TEvChangePartitionConfig> event)
14041420
{
1405-
DistrTxs.emplace_back(std::move(event), true);
1421+
UserActionAndTransactionEvents.emplace_back(new TTransaction(std::move(event), true));
14061422
}
14071423

14081424
void TPartition::PushFrontDistrTx(TSimpleSharedPtr<TEvPQ::TEvChangePartitionConfig> event)
14091425
{
1410-
DistrTxs.emplace_front(std::move(event), false);
1426+
UserActionAndTransactionEvents.emplace_front(new TTransaction(std::move(event), false));
14111427
}
14121428

14131429
void TPartition::PushBackDistrTx(TSimpleSharedPtr<TEvPQ::TEvProposePartitionConfig> event)
14141430
{
1415-
DistrTxs.emplace_back(std::move(event));
1431+
UserActionAndTransactionEvents.emplace_back(new TTransaction(std::move(event)));
14161432
}
14171433

14181434
void TPartition::AddImmediateTx(TSimpleSharedPtr<TEvPersQueue::TEvProposeTransaction> tx)
14191435
{
1420-
ImmediateTxs.push_back(std::move(tx));
1436+
UserActionAndTransactionEvents.emplace_back(std::move(tx));
1437+
++ImmediateTxCount;
14211438
}
14221439

14231440
void TPartition::AddUserAct(TSimpleSharedPtr<TEvPQ::TEvSetClientInfo> act)
14241441
{
1425-
UserActs.push_back(std::move(act));
1426-
++UserActCount[UserActs.back()->ClientId];
1442+
TString clientId = act->ClientId;
1443+
UserActionAndTransactionEvents.emplace_back(std::move(act));
1444+
++UserActCount[clientId];
14271445
}
14281446

14291447
void TPartition::RemoveImmediateTx()
14301448
{
1431-
Y_ABORT_UNLESS(!ImmediateTxs.empty());
1449+
Y_ABORT_UNLESS(!UserActionAndTransactionEvents.empty());
1450+
Y_ABORT_UNLESS(UserActionAndTransactionEventsFrontIs<TEvPersQueue::TEvProposeTransaction>());
14321451

1433-
ImmediateTxs.pop_front();
1452+
UserActionAndTransactionEvents.pop_front();
1453+
--ImmediateTxCount;
14341454
}
14351455

14361456
void TPartition::RemoveUserAct()
14371457
{
1438-
Y_ABORT_UNLESS(!UserActs.empty());
1458+
Y_ABORT_UNLESS(!UserActionAndTransactionEvents.empty());
1459+
Y_ABORT_UNLESS(UserActionAndTransactionEventsFrontIs<TEvPQ::TEvSetClientInfo>());
14391460

1440-
auto p = UserActCount.find(UserActs.front()->ClientId);
1461+
TString clientId = GetUserActionAndTransactionEventsFront<TEvPQ::TEvSetClientInfo>().ClientId;
1462+
auto p = UserActCount.find(clientId);
14411463
Y_ABORT_UNLESS(p != UserActCount.end());
14421464

14431465
Y_ABORT_UNLESS(p->second > 0);
14441466
if (!--p->second) {
14451467
UserActCount.erase(p);
14461468
}
14471469

1448-
UserActs.pop_front();
1470+
UserActionAndTransactionEvents.pop_front();
14491471
}
14501472

14511473
size_t TPartition::GetUserActCount(const TString& consumer) const
@@ -1459,7 +1481,7 @@ size_t TPartition::GetUserActCount(const TString& consumer) const
14591481

14601482
void TPartition::ProcessTxsAndUserActs(const TActorContext& ctx)
14611483
{
1462-
if (UsersInfoWriteInProgress || (ImmediateTxs.empty() && UserActs.empty() && DistrTxs.empty()) || TxInProgress) {
1484+
if (UsersInfoWriteInProgress || UserActionAndTransactionEvents.empty() || TxInProgress) {
14631485
return;
14641486
}
14651487

@@ -1472,17 +1494,36 @@ void TPartition::ProcessTxsAndUserActs(const TActorContext& ctx)
14721494

14731495
void TPartition::ContinueProcessTxsAndUserActs(const TActorContext& ctx)
14741496
{
1475-
if (!DistrTxs.empty()) {
1476-
ProcessDistrTxs(ctx);
1497+
Y_ABORT_UNLESS(!UsersInfoWriteInProgress);
1498+
Y_ABORT_UNLESS(!TxInProgress);
1499+
1500+
if (!UserActionAndTransactionEvents.empty()) {
1501+
auto visitor = [this, &ctx](const auto& event) -> bool {
1502+
return this->ProcessUserActionOrTransaction(*event, ctx);
1503+
};
1504+
1505+
size_t index = UserActionAndTransactionEvents.front().index();
1506+
while (!UserActionAndTransactionEvents.empty()) {
1507+
auto& front = UserActionAndTransactionEvents.front();
1508+
1509+
if (index != front.index()) {
1510+
break;
1511+
}
1512+
1513+
if (!std::visit(visitor, front)) {
1514+
break;
1515+
}
1516+
1517+
if (TxInProgress) {
1518+
break;
1519+
}
1520+
}
14771521

14781522
if (TxInProgress) {
14791523
return;
14801524
}
14811525
}
14821526

1483-
ProcessUserActs(ctx);
1484-
ProcessImmediateTxs(ctx);
1485-
14861527
THolder<TEvKeyValue::TEvRequest> request(new TEvKeyValue::TEvRequest);
14871528
request->Record.SetCookie(SET_OFFSET_COOKIE);
14881529

@@ -1499,19 +1540,52 @@ void TPartition::ContinueProcessTxsAndUserActs(const TActorContext& ctx)
14991540

15001541
void TPartition::RemoveDistrTx()
15011542
{
1502-
Y_ABORT_UNLESS(!DistrTxs.empty());
1543+
Y_ABORT_UNLESS(!UserActionAndTransactionEvents.empty());
1544+
Y_ABORT_UNLESS(UserActionAndTransactionEventsFrontIs<TTransaction>());
15031545

1504-
DistrTxs.pop_front();
1546+
UserActionAndTransactionEvents.pop_front();
15051547
PendingPartitionConfig = nullptr;
15061548
}
15071549

1508-
void TPartition::ProcessDistrTxs(const TActorContext& ctx)
1550+
bool TPartition::ProcessUserActionOrTransaction(TTransaction& t,
1551+
const TActorContext& ctx)
15091552
{
15101553
Y_ABORT_UNLESS(!TxInProgress);
15111554

1512-
while (!TxInProgress && !DistrTxs.empty()) {
1513-
ProcessDistrTx(ctx);
1555+
if (t.Tx) {
1556+
t.Predicate = BeginTransaction(*t.Tx, ctx);
1557+
1558+
ctx.Send(Tablet,
1559+
MakeHolder<TEvPQ::TEvTxCalcPredicateResult>(t.Tx->Step,
1560+
t.Tx->TxId,
1561+
Partition,
1562+
*t.Predicate).Release());
1563+
1564+
TxInProgress = true;
1565+
} else if (t.ProposeConfig) {
1566+
t.Predicate = BeginTransaction(*t.ProposeConfig);
1567+
1568+
PendingPartitionConfig = GetPartitionConfig(t.ProposeConfig->Config, Partition);
1569+
//Y_VERIFY_DEBUG_S(PendingPartitionConfig, "Partition " << Partition << " config not found");
1570+
1571+
ctx.Send(Tablet,
1572+
MakeHolder<TEvPQ::TEvProposePartitionConfigResult>(t.ProposeConfig->Step,
1573+
t.ProposeConfig->TxId,
1574+
Partition).Release());
1575+
1576+
TxInProgress = true;
1577+
} else {
1578+
Y_ABORT_UNLESS(!ChangeConfig);
1579+
1580+
ChangeConfig = t.ChangeConfig;
1581+
PendingPartitionConfig = GetPartitionConfig(ChangeConfig->Config, Partition);
1582+
SendChangeConfigReply = t.SendReply;
1583+
BeginChangePartitionConfig(ChangeConfig->Config, ctx);
1584+
1585+
RemoveDistrTx();
15141586
}
1587+
1588+
return false;
15151589
}
15161590

15171591
bool TPartition::BeginTransaction(const TEvPQ::TEvTxCalcPredicate& tx,
@@ -1602,8 +1676,7 @@ void TPartition::EndTransaction(const TEvPQ::TEvTxCommit& event,
16021676

16031677
Y_ABORT_UNLESS(TxInProgress);
16041678

1605-
Y_ABORT_UNLESS(!DistrTxs.empty());
1606-
TTransaction& t = DistrTxs.front();
1679+
TTransaction& t = GetUserActionAndTransactionEventsFront<TTransaction>();
16071680

16081681
if (t.Tx) {
16091682
Y_ABORT_UNLESS(GetStepAndTxId(event) == GetStepAndTxId(*t.Tx));
@@ -1649,8 +1722,7 @@ void TPartition::EndTransaction(const TEvPQ::TEvTxRollback& event,
16491722

16501723
Y_ABORT_UNLESS(TxInProgress);
16511724

1652-
Y_ABORT_UNLESS(!DistrTxs.empty());
1653-
TTransaction& t = DistrTxs.front();
1725+
TTransaction& t = GetUserActionAndTransactionEventsFront<TTransaction>();
16541726

16551727
if (t.Tx) {
16561728
Y_ABORT_UNLESS(GetStepAndTxId(event) == GetStepAndTxId(*t.Tx));
@@ -1666,7 +1738,6 @@ void TPartition::EndTransaction(const TEvPQ::TEvTxRollback& event,
16661738
Y_ABORT_UNLESS(t.ChangeConfig);
16671739
}
16681740

1669-
16701741
RemoveDistrTx();
16711742
}
16721743

@@ -1717,7 +1788,6 @@ void TPartition::BeginChangePartitionConfig(const NKikimrPQ::TPQTabletConfig& co
17171788
void TPartition::OnProcessTxsAndUserActsWriteComplete(ui64 cookie, const TActorContext& ctx) {
17181789
Y_ABORT_UNLESS(cookie == SET_OFFSET_COOKIE);
17191790

1720-
17211791
if (ChangeConfig) {
17221792
EndChangePartitionConfig(ChangeConfig->Config,
17231793
ChangeConfig->TopicConverter,
@@ -1782,7 +1852,6 @@ void TPartition::OnProcessTxsAndUserActsWriteComplete(ui64 cookie, const TActorC
17821852
PendingPartitionConfig = nullptr;
17831853
}
17841854

1785-
17861855
ProcessTxsAndUserActs(ctx);
17871856

17881857
if (ChangeConfig && CurrentStateFunc() == &TThis::StateIdle) {
@@ -1847,56 +1916,17 @@ void TPartition::ResendPendingEvents(const TActorContext& ctx)
18471916
}
18481917
}
18491918

1850-
void TPartition::ProcessDistrTx(const TActorContext& ctx)
1919+
bool TPartition::ProcessUserActionOrTransaction(const TEvPersQueue::TEvProposeTransaction& event,
1920+
const TActorContext& ctx)
18511921
{
1852-
Y_ABORT_UNLESS(!TxInProgress);
1853-
1854-
Y_ABORT_UNLESS(!DistrTxs.empty());
1855-
TTransaction& t = DistrTxs.front();
1856-
1857-
if (t.Tx) {
1858-
t.Predicate = BeginTransaction(*t.Tx, ctx);
1859-
1860-
ctx.Send(Tablet,
1861-
MakeHolder<TEvPQ::TEvTxCalcPredicateResult>(t.Tx->Step,
1862-
t.Tx->TxId,
1863-
Partition,
1864-
*t.Predicate).Release());
1865-
1866-
TxInProgress = true;
1867-
} else if (t.ProposeConfig) {
1868-
t.Predicate = BeginTransaction(*t.ProposeConfig);
1869-
1870-
PendingPartitionConfig = GetPartitionConfig(t.ProposeConfig->Config, Partition);
1871-
//Y_VERIFY_DEBUG_S(PendingPartitionConfig, "Partition " << Partition << " config not found");
1872-
1873-
ctx.Send(Tablet,
1874-
MakeHolder<TEvPQ::TEvProposePartitionConfigResult>(t.ProposeConfig->Step,
1875-
t.ProposeConfig->TxId,
1876-
Partition).Release());
1877-
1878-
TxInProgress = true;
1879-
} else {
1880-
Y_ABORT_UNLESS(!ChangeConfig);
1881-
1882-
ChangeConfig = t.ChangeConfig;
1883-
PendingPartitionConfig = GetPartitionConfig(ChangeConfig->Config, Partition);
1884-
SendChangeConfigReply = t.SendReply;
1885-
BeginChangePartitionConfig(ChangeConfig->Config, ctx);
1886-
1887-
RemoveDistrTx();
1922+
if (AffectedUsers.size() >= MAX_USERS) {
1923+
return false;
18881924
}
1889-
}
18901925

1891-
void TPartition::ProcessImmediateTxs(const TActorContext& ctx)
1892-
{
1893-
Y_ABORT_UNLESS(!UsersInfoWriteInProgress);
1894-
1895-
while (!ImmediateTxs.empty() && (AffectedUsers.size() < MAX_USERS)) {
1896-
ProcessImmediateTx(ImmediateTxs.front()->Record, ctx);
1926+
ProcessImmediateTx(event.Record, ctx);
1927+
RemoveImmediateTx();
18971928

1898-
RemoveImmediateTx();
1899-
}
1929+
return true;
19001930
}
19011931

19021932
void TPartition::ProcessImmediateTx(const NKikimrPQ::TEvProposeTransaction& tx,
@@ -1948,15 +1978,17 @@ void TPartition::ProcessImmediateTx(const NKikimrPQ::TEvProposeTransaction& tx,
19481978
NKikimrPQ::TEvProposeTransactionResult::COMPLETE);
19491979
}
19501980

1951-
void TPartition::ProcessUserActs(const TActorContext& ctx)
1981+
bool TPartition::ProcessUserActionOrTransaction(TEvPQ::TEvSetClientInfo& act,
1982+
const TActorContext& ctx)
19521983
{
1953-
Y_ABORT_UNLESS(!UsersInfoWriteInProgress);
1984+
if (AffectedUsers.size() >= MAX_USERS) {
1985+
return false;
1986+
}
19541987

1955-
while (!UserActs.empty() && (AffectedUsers.size() < MAX_USERS)) {
1956-
ProcessUserAct(*UserActs.front(), ctx);
1988+
ProcessUserAct(act, ctx);
1989+
RemoveUserAct();
19571990

1958-
RemoveUserAct();
1959-
}
1991+
return true;
19601992
}
19611993

19621994
void TPartition::ProcessUserAct(TEvPQ::TEvSetClientInfo& act,

0 commit comments

Comments
 (0)