@@ -31,14 +31,18 @@ static const ui32 MAX_INLINE_SIZE = 1000;
31
31
32
32
static constexpr NPersQueue::NErrorCode::EErrorCode InactivePartitionErrorCode = NPersQueue::NErrorCode::WRITE_ERROR_PARTITION_IS_FULL;
33
33
34
- void TPartition::ReplyOwnerOk (const TActorContext& ctx, const ui64 dst, const TString& cookie) {
34
+ void TPartition::ReplyOwnerOk (const TActorContext& ctx, const ui64 dst, const TString& cookie, ui64 seqNo ) {
35
35
LOG_DEBUG_S (ctx, NKikimrServices::PERSQUEUE, " TPartition::ReplyOwnerOk. Partition: " << Partition);
36
36
37
37
THolder<TEvPQ::TEvProxyResponse> response = MakeHolder<TEvPQ::TEvProxyResponse>(dst);
38
38
NKikimrClient::TResponse& resp = *response->Response ;
39
39
resp.SetStatus (NMsgBusProxy::MSTATUS_OK);
40
40
resp.SetErrorCode (NPersQueue::NErrorCode::OK);
41
- resp.MutablePartitionResponse ()->MutableCmdGetOwnershipResult ()->SetOwnerCookie (cookie);
41
+ auto * r = resp.MutablePartitionResponse ()->MutableCmdGetOwnershipResult ();
42
+ r->SetOwnerCookie (cookie);
43
+ r->SetStatus (PartitionConfig ? PartitionConfig->GetStatus () : NKikimrPQ::ETopicPartitionStatus::Active);
44
+ r->SetSeqNo (seqNo);
45
+
42
46
ctx.Send (Tablet, response.Release ());
43
47
}
44
48
@@ -146,8 +150,12 @@ void TPartition::ProcessChangeOwnerRequest(TAutoPtr<TEvPQ::TEvChangeOwner> ev, c
146
150
auto &owner = ev->Owner ;
147
151
auto it = Owners.find (owner);
148
152
if (it == Owners.end ()) {
149
- Owners[owner];
150
- it = Owners.find (owner);
153
+ if (ev->RegisterIfNotExists ) {
154
+ Owners[owner];
155
+ it = Owners.find (owner);
156
+ } else {
157
+ return ReplyError (ctx, ev->Cookie , NPersQueue::NErrorCode::SOURCEID_DELETED, " SourceId isn't registered" );
158
+ }
151
159
}
152
160
if (it->second .NeedResetOwner || ev->Force ) { // change owner
153
161
Y_ABORT_UNLESS (ReservedSize >= it->second .ReservedSize );
@@ -346,10 +354,13 @@ void TPartition::AnswerCurrentWrites(const TActorContext& ctx) {
346
354
if (!already && partNo + 1 == totalParts && !writeResponse.Msg .HeartbeatVersion )
347
355
++offset;
348
356
} else if (response.IsOwnership ()) {
349
- const TString& ownerCookie = response.GetOwnership ().OwnerCookie ;
357
+ const auto & r = response.GetOwnership ();
358
+ const TString& ownerCookie = r.OwnerCookie ;
350
359
auto it = Owners.find (TOwnerInfo::GetOwnerFromOwnerCookie (ownerCookie));
351
360
if (it != Owners.end () && it->second .OwnerCookie == ownerCookie) {
352
- ReplyOwnerOk (ctx, response.GetCookie (), ownerCookie);
361
+ auto sit = SourceIdStorage.GetInMemorySourceIds ().find (NSourceIdEncoding::EncodeSimple (it->first ));
362
+ auto seqNo = sit == SourceIdStorage.GetInMemorySourceIds ().end () ? 0 : sit->second .SeqNo ;
363
+ ReplyOwnerOk (ctx, response.GetCookie (), ownerCookie, seqNo);
353
364
} else {
354
365
ReplyError (ctx, response.GetCookie (), NPersQueue::NErrorCode::WRONG_COOKIE, " new GetOwnership request is dropped already" );
355
366
}
0 commit comments