Skip to content

Commit 38281eb

Browse files
the race between TEvLockStatus and TEvProposeTransaction (#4758)
1 parent 6149afe commit 38281eb

File tree

18 files changed

+438
-35
lines changed

18 files changed

+438
-35
lines changed

ydb/core/kqp/session_actor/kqp_query_state.cpp

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -323,8 +323,14 @@ void TKqpQueryState::AddOffsetsToTransaction() {
323323
const auto& operations = GetTopicOperations();
324324

325325
TMaybe<TString> consumer;
326-
if (operations.HasConsumer())
326+
if (operations.HasConsumer()) {
327327
consumer = operations.GetConsumer();
328+
}
329+
330+
TMaybe<ui32> supportivePartition;
331+
if (operations.HasSupportivePartition()) {
332+
supportivePartition = operations.GetSupportivePartition();
333+
}
328334

329335
TopicOperations = NTopic::TTopicOperations();
330336

@@ -334,7 +340,7 @@ void TKqpQueryState::AddOffsetsToTransaction() {
334340

335341
for (auto& partition : topic.partitions()) {
336342
if (partition.partition_offsets().empty()) {
337-
TopicOperations.AddOperation(path, partition.partition_id());
343+
TopicOperations.AddOperation(path, partition.partition_id(), supportivePartition);
338344
} else {
339345
for (auto& range : partition.partition_offsets()) {
340346
YQL_ENSURE(consumer.Defined());

ydb/core/kqp/topics/kqp_topics.cpp

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,17 @@
88

99
namespace NKikimr::NKqp::NTopic {
1010

11+
static void UpdateSupportivePartition(TMaybe<ui32>& lhs, const TMaybe<ui32>& rhs)
12+
{
13+
if (lhs) {
14+
if ((rhs != Nothing()) && (rhs != lhs)) {
15+
lhs = Max<ui32>();
16+
}
17+
} else {
18+
lhs = rhs;
19+
}
20+
}
21+
1122
//
1223
// TConsumerOperations
1324
//
@@ -78,7 +89,8 @@ void TTopicPartitionOperations::AddOperation(const TString& topic, ui32 partitio
7889
Operations_[consumer].AddOperation(consumer, range);
7990
}
8091

81-
void TTopicPartitionOperations::AddOperation(const TString& topic, ui32 partition)
92+
void TTopicPartitionOperations::AddOperation(const TString& topic, ui32 partition,
93+
TMaybe<ui32> supportivePartition)
8294
{
8395
Y_ABORT_UNLESS(Topic_.Empty() || Topic_ == topic);
8496
Y_ABORT_UNLESS(Partition_.Empty() || Partition_ == partition);
@@ -88,6 +100,8 @@ void TTopicPartitionOperations::AddOperation(const TString& topic, ui32 partitio
88100
Partition_ = partition;
89101
}
90102

103+
UpdateSupportivePartition(SupportivePartition_, supportivePartition);
104+
91105
HasWriteOperations_ = true;
92106
}
93107

@@ -112,6 +126,9 @@ void TTopicPartitionOperations::BuildTopicTxs(THashMap<ui64, NKikimrPQ::TDataTra
112126
NKikimrPQ::TPartitionOperation* o = tx.MutableOperations()->Add();
113127
o->SetPartitionId(*Partition_);
114128
o->SetPath(*Topic_);
129+
if (SupportivePartition_.Defined()) {
130+
o->SetSupportivePartition(*SupportivePartition_);
131+
}
115132
}
116133
}
117134

@@ -127,6 +144,8 @@ void TTopicPartitionOperations::Merge(const TTopicPartitionOperations& rhs)
127144
TabletId_ = rhs.TabletId_;
128145
}
129146

147+
UpdateSupportivePartition(SupportivePartition_, rhs.SupportivePartition_);
148+
130149
for (auto& [key, value] : rhs.Operations_) {
131150
Operations_[key].Merge(value);
132151
}
@@ -240,10 +259,11 @@ void TTopicOperations::AddOperation(const TString& topic, ui32 partition,
240259
HasReadOperations_ = true;
241260
}
242261

243-
void TTopicOperations::AddOperation(const TString& topic, ui32 partition)
262+
void TTopicOperations::AddOperation(const TString& topic, ui32 partition,
263+
TMaybe<ui32> supportivePartition)
244264
{
245265
TTopicPartition key{topic, partition};
246-
Operations_[key].AddOperation(topic, partition);
266+
Operations_[key].AddOperation(topic, partition, supportivePartition);
247267
HasWriteOperations_ = true;
248268
}
249269

ydb/core/kqp/topics/kqp_topics.h

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,8 @@ class TTopicPartitionOperations {
4949
void AddOperation(const TString& topic, ui32 partition,
5050
const TString& consumer,
5151
const Ydb::Topic::OffsetsRange& range);
52-
void AddOperation(const TString& topic, ui32 partition);
52+
void AddOperation(const TString& topic, ui32 partition,
53+
TMaybe<ui32> supportivePartition);
5354

5455
void BuildTopicTxs(THashMap<ui64, NKikimrPQ::TDataTransaction> &txs);
5556

@@ -67,6 +68,7 @@ class TTopicPartitionOperations {
6768
THashMap<TString, TConsumerOperations> Operations_;
6869
bool HasWriteOperations_ = false;
6970
TMaybe<ui64> TabletId_;
71+
TMaybe<ui32> SupportivePartition_;
7072
};
7173

7274
struct TTopicPartition {
@@ -98,7 +100,8 @@ class TTopicOperations {
98100
void AddOperation(const TString& topic, ui32 partition,
99101
const TString& consumer,
100102
const Ydb::Topic::OffsetsRange& range);
101-
void AddOperation(const TString& topic, ui32 partition);
103+
void AddOperation(const TString& topic, ui32 partition,
104+
TMaybe<ui32> supportivePartition);
102105

103106
void FillSchemeCacheNavigate(NSchemeCache::TSchemeCacheNavigate& navigate,
104107
TMaybe<TString> consumer);

ydb/core/persqueue/partition.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3042,6 +3042,8 @@ void TPartition::Handle(TEvPQ::TEvCheckPartitionStatusRequest::TPtr& ev, const T
30423042

30433043
void TPartition::HandleOnInit(TEvPQ::TEvDeletePartition::TPtr& ev, const TActorContext&)
30443044
{
3045+
Y_ABORT_UNLESS(IsSupportive());
3046+
30453047
PendingEvents.emplace_back(ev->ReleaseBase().Release());
30463048
}
30473049

ydb/core/persqueue/partition_id.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,14 +29,15 @@ class TPartitionId {
2929

3030
size_t GetHash() const
3131
{
32-
return MultiHash(OriginalPartitionId, WriteId);
32+
return MultiHash(MultiHash(OriginalPartitionId, WriteId), InternalPartitionId);
3333
}
3434

3535
bool IsEqual(const TPartitionId& rhs) const
3636
{
3737
return
3838
(OriginalPartitionId == rhs.OriginalPartitionId) &&
39-
(WriteId == rhs.WriteId);
39+
(WriteId == rhs.WriteId) &&
40+
(InternalPartitionId == rhs.InternalPartitionId);
4041
}
4142

4243
void ToStream(IOutputStream& s) const

ydb/core/persqueue/partition_write.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,9 @@ void TPartition::ReplyOwnerOk(const TActorContext& ctx, const ui64 dst, const TS
4141
r->SetOwnerCookie(cookie);
4242
r->SetStatus(PartitionConfig ? PartitionConfig->GetStatus() : NKikimrPQ::ETopicPartitionStatus::Active);
4343
r->SetSeqNo(seqNo);
44+
if (IsSupportive()) {
45+
r->SetSupportivePartition(Partition.InternalPartitionId);
46+
}
4447

4548
ctx.Send(Tablet, response.Release());
4649
}

ydb/core/persqueue/pq_impl.cpp

Lines changed: 59 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -2001,7 +2001,8 @@ void TPersQueue::HandleWriteRequest(const ui64 responseCookie, const TActorId& p
20012001
"Tablet " << TabletID() <<
20022002
" Write in transaction." <<
20032003
" Partition: " << req.GetPartition() <<
2004-
", WriteId: " << req.GetWriteId());
2004+
", WriteId: " << req.GetWriteId() <<
2005+
", NeedSupportivePartition: " << req.GetNeedSupportivePartition());
20052006
}
20062007

20072008
for (ui32 i = 0; i < req.CmdWriteSize(); ++i) {
@@ -2198,7 +2199,8 @@ void TPersQueue::HandleReserveBytesRequest(const ui64 responseCookie, const TAct
21982199
"Tablet " << TabletID() <<
21992200
" Reserve bytes in transaction." <<
22002201
" Partition: " << req.GetPartition() <<
2201-
", WriteId: " << req.GetWriteId());
2202+
", WriteId: " << req.GetWriteId() <<
2203+
", NeedSupportivePartition: " << req.GetNeedSupportivePartition());
22022204
}
22032205

22042206
InitResponseBuilder(responseCookie, 1, COUNTER_LATENCY_PQ_RESERVE_BYTES);
@@ -2574,6 +2576,8 @@ void TPersQueue::HandleWriteRequestForSupportivePartition(const ui64 responseCoo
25742576
const NKikimrClient::TPersQueuePartitionRequest& req,
25752577
const TActorContext& ctx)
25762578
{
2579+
Y_ABORT_UNLESS(req.HasWriteId());
2580+
25772581
const TPartitionInfo& partition = GetPartitionInfo(req);
25782582
const TActorId& actorId = partition.Actor;
25792583

@@ -2648,6 +2652,14 @@ void TPersQueue::HandleEventForSupportivePartition(const ui64 responseCookie,
26482652
sender);
26492653
}
26502654
} else {
2655+
if (!req.GetNeedSupportivePartition()) {
2656+
ReplyError(ctx,
2657+
responseCookie,
2658+
NPersQueue::NErrorCode::PRECONDITION_FAILED,
2659+
"lost messages");
2660+
return;
2661+
}
2662+
26512663
//
26522664
// этап 1:
26532665
// - создать запись в TxWrites
@@ -2665,6 +2677,7 @@ void TPersQueue::HandleEventForSupportivePartition(const ui64 responseCookie,
26652677
TPartitionId partitionId(originalPartitionId, writeId, NextSupportivePartitionId++);
26662678

26672679
writeInfo.Partitions.emplace(originalPartitionId, partitionId);
2680+
TxWritesChanged = true;
26682681
AddSupportivePartition(partitionId);
26692682

26702683
Y_ABORT_UNLESS(Partitions.contains(partitionId));
@@ -3115,6 +3128,38 @@ void TPersQueue::Handle(TEvPersQueue::TEvProposeTransaction::TPtr& ev, const TAc
31153128

31163129
}
31173130

3131+
bool TPersQueue::CheckTxWriteOperation(const NKikimrPQ::TPartitionOperation& operation,
3132+
ui64 writeId) const
3133+
{
3134+
TPartitionId partitionId(operation.GetPartitionId(),
3135+
writeId,
3136+
operation.GetSupportivePartition());
3137+
return Partitions.contains(partitionId);
3138+
}
3139+
3140+
bool TPersQueue::CheckTxWriteOperations(const NKikimrPQ::TDataTransaction& txBody) const
3141+
{
3142+
if (!txBody.HasWriteId()) {
3143+
return true;
3144+
}
3145+
3146+
ui64 writeId = txBody.GetWriteId();
3147+
3148+
for (auto& operation : txBody.GetOperations()) {
3149+
auto isWrite = [](const NKikimrPQ::TPartitionOperation& o) {
3150+
return !o.HasBegin();
3151+
};
3152+
3153+
if (isWrite(operation)) {
3154+
if (!CheckTxWriteOperation(operation, writeId)) {
3155+
return false;
3156+
}
3157+
}
3158+
}
3159+
3160+
return true;
3161+
}
3162+
31183163
void TPersQueue::HandleDataTransaction(TAutoPtr<TEvPersQueue::TEvProposeTransaction> ev,
31193164
const TActorContext& ctx)
31203165
{
@@ -3123,23 +3168,6 @@ void TPersQueue::HandleDataTransaction(TAutoPtr<TEvPersQueue::TEvProposeTransact
31233168
Y_ABORT_UNLESS(event.HasData());
31243169
const NKikimrPQ::TDataTransaction& txBody = event.GetData();
31253170

3126-
for (auto& operation : txBody.GetOperations()) {
3127-
Y_ABORT_UNLESS(!operation.HasPath() || (operation.GetPath() == TopicPath));
3128-
3129-
bool isWriteOperation = !operation.HasBegin();
3130-
3131-
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE,
3132-
"Tablet " << TabletID() <<
3133-
" tx=" << event.GetTxId() <<
3134-
", write_id=" << txBody.GetWriteId() <<
3135-
", path=" << operation.GetPath() <<
3136-
", partition=" << operation.GetPartitionId() <<
3137-
", consumer=" << operation.GetConsumer() <<
3138-
", begin=" << operation.GetBegin() <<
3139-
", end=" << operation.GetEnd() <<
3140-
", is_write=" << isWriteOperation);
3141-
}
3142-
31433171
if (TabletState != NKikimrPQ::ENormal) {
31443172
SendProposeTransactionAbort(ActorIdFromProto(event.GetSourceActor()),
31453173
event.GetTxId(),
@@ -3158,6 +3186,13 @@ void TPersQueue::HandleDataTransaction(TAutoPtr<TEvPersQueue::TEvProposeTransact
31583186
return;
31593187
}
31603188

3189+
if (!CheckTxWriteOperations(txBody)) {
3190+
SendProposeTransactionAbort(ActorIdFromProto(event.GetSourceActor()),
3191+
event.GetTxId(),
3192+
ctx);
3193+
return;
3194+
}
3195+
31613196
TMaybe<TPartitionId> partitionId = FindPartitionId(txBody);
31623197
if (!partitionId.Defined()) {
31633198
SendProposeTransactionAbort(ActorIdFromProto(event.GetSourceActor()),
@@ -3392,7 +3427,8 @@ void TPersQueue::BeginWriteTxs(const TActorContext& ctx)
33923427
CanProcessPlanStepQueue() ||
33933428
CanProcessWriteTxs() ||
33943429
CanProcessDeleteTxs() ||
3395-
CanProcessTxWrites()
3430+
CanProcessTxWrites() ||
3431+
TxWritesChanged
33963432
;
33973433
if (!canProcess) {
33983434
return;
@@ -3441,6 +3477,8 @@ void TPersQueue::EndWriteTxs(const NKikimrClient::TResponse& resp,
34413477
return;
34423478
}
34433479

3480+
TxWritesChanged = false;
3481+
34443482
SendReplies(ctx);
34453483
CheckChangedTxStates(ctx);
34463484
CreateSupportivePartitionActors(ctx);
@@ -4473,6 +4511,7 @@ void TPersQueue::Handle(TEvPQ::TEvDeletePartitionDone::TPtr& ev, const TActorCon
44734511
}
44744512
TxWrites.erase(writeId);
44754513
}
4514+
TxWritesChanged = true;
44764515

44774516
TryWriteTxs(ctx);
44784517
}

ydb/core/persqueue/pq_impl.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,7 @@ class TPersQueue : public NKeyValue::TKeyValueFlat {
205205
};
206206

207207
THashMap<ui64, TTxWriteInfo> TxWrites;
208+
bool TxWritesChanged = false;
208209
ui32 NextSupportivePartitionId = 100'000;
209210

210211
TActorId CacheActor;
@@ -494,6 +495,10 @@ class TPersQueue : public NKeyValue::TKeyValueFlat {
494495

495496
void BeginDeleteTx(const TDistributedTransaction& tx);
496497
void BeginDeletePartitions(TTxWriteInfo& writeInfo);
498+
499+
bool CheckTxWriteOperation(const NKikimrPQ::TPartitionOperation& operation,
500+
ui64 writeId) const;
501+
bool CheckTxWriteOperations(const NKikimrPQ::TDataTransaction& txBody) const;
497502
};
498503

499504

ydb/core/persqueue/ut/pqtablet_ut.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ struct TGetOwnershipRequestParams {
7474
TMaybe<ui32> Partition;
7575
TMaybe<ui64> MsgNo;
7676
TMaybe<ui64> WriteId;
77+
TMaybe<bool> NeedSupportivePartition;
7778
TMaybe<TString> Owner; // o
7879
TMaybe<ui64> Cookie;
7980
};
@@ -550,6 +551,9 @@ std::unique_ptr<TEvPersQueue::TEvRequest> TPQTabletFixture::MakeGetOwnershipRequ
550551
if (params.WriteId.Defined()) {
551552
request->SetWriteId(*params.WriteId);
552553
}
554+
if (params.NeedSupportivePartition.Defined()) {
555+
request->SetNeedSupportivePartition(*params.NeedSupportivePartition);
556+
}
553557
if (params.Cookie.Defined()) {
554558
request->SetCookie(*params.Cookie);
555559
}
@@ -1281,6 +1285,7 @@ Y_UNIT_TEST_F(ProposeTx_Command_After_Propose, TPQTabletFixture)
12811285

12821286
SyncGetOwnership({.Partition=partitionId,
12831287
.WriteId=writeId,
1288+
.NeedSupportivePartition=true,
12841289
.Owner="-=[ 0wn3r ]=-",
12851290
.Cookie=4},
12861291
{.Cookie=4,

0 commit comments

Comments
 (0)