Skip to content

Commit 53bd587

Browse files
authored
Merge fce9335 into 1cd7468
2 parents 1cd7468 + fce9335 commit 53bd587

File tree

12 files changed

+485
-125
lines changed

12 files changed

+485
-125
lines changed

ydb/core/persqueue/events/internal.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,7 @@ struct TEvPQ {
183183
EvGetWriteInfoRequest,
184184
EvGetWriteInfoResponse,
185185
EvGetWriteInfoError,
186+
EvReadingPartitionFinishedRequest,
186187
EvEnd
187188
};
188189

@@ -1076,6 +1077,15 @@ struct TEvPQ {
10761077
{
10771078
}
10781079
};
1080+
1081+
struct TEvReadingPartitionFinishedRequest : public TEventPB<TEvReadingPartitionFinishedRequest, NKikimrPQ::TEvReadingPartitionFinishedRequest, EvReadingPartitionFinishedRequest> {
1082+
TEvReadingPartitionFinishedRequest() = default;
1083+
1084+
TEvReadingPartitionFinishedRequest(const TString& consumer, ui32 partitionId) {
1085+
Record.SetConsumer(consumer);
1086+
Record.SetPartitionId(partitionId);
1087+
}
1088+
};
10791089
};
10801090

10811091
} //NKikimr

ydb/core/persqueue/partition.cpp

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,10 @@ auto GetStepAndTxId(const E& event)
4141
return GetStepAndTxId(event.Step, event.TxId);
4242
}
4343

44+
bool LastOffsetHasBeenCommited(const TUserInfo& userInfo, ui64 EndOffset) {
45+
return static_cast<ui64>(std::max<i64>(userInfo.Offset, 0)) == EndOffset;
46+
}
47+
4448
struct TMirrorerInfo {
4549
TMirrorerInfo(const TActorId& actor, const TTabletCountersBase& baseline)
4650
: Actor(actor) {
@@ -85,6 +89,10 @@ TString TPartition::LogPrefix() const {
8589
return TStringBuilder() << "" << SelfId() << " " << state << " Partition: " << Partition << " ";
8690
}
8791

92+
bool TPartition::IsActive() const {
93+
return PartitionConfig == nullptr || PartitionConfig->GetStatus() == NKikimrPQ::ETopicPartitionStatus::Active;
94+
}
95+
8896
bool TPartition::CanWrite() const {
8997
if (PartitionConfig == nullptr) {
9098
// Old format without AllPartitions configuration field.
@@ -99,16 +107,20 @@ bool TPartition::CanWrite() const {
99107
// Pending configuration tx inactivate this partition.
100108
return false;
101109
}
102-
if (ClosedInternalPartition)
110+
111+
if (ClosedInternalPartition) {
103112
return false;
113+
}
104114

105-
return PartitionConfig->GetStatus() == NKikimrPQ::ETopicPartitionStatus::Active;
115+
return IsActive();
106116
}
107117

108118
bool TPartition::CanEnqueue() const {
109-
if (ClosedInternalPartition)
119+
if (ClosedInternalPartition) {
110120
return false;
111-
return PartitionConfig == nullptr || PartitionConfig->GetStatus() == NKikimrPQ::ETopicPartitionStatus::Active;
121+
}
122+
123+
return IsActive();
112124
}
113125

114126
ui64 GetOffsetEstimate(const std::deque<TDataKey>& container, TInstant timestamp, ui64 offset) {
@@ -643,6 +655,8 @@ void TPartition::Handle(TEvPQ::TEvPartitionStatus::TPtr& ev, const TActorContext
643655

644656
result.SetWriteBytesQuota(TotalPartitionWriteSpeed);
645657

658+
bool inactivePartition = !IsActive();
659+
646660
TVector<ui64> resSpeed;
647661
resSpeed.resize(4);
648662
ui64 maxQuota = 0;
@@ -708,6 +722,8 @@ void TPartition::Handle(TEvPQ::TEvPartitionStatus::TPtr& ev, const TActorContext
708722
clientInfo->SetAvgReadSpeedPerMin(userInfo.AvgReadBytes[1].GetValue());
709723
clientInfo->SetAvgReadSpeedPerHour(userInfo.AvgReadBytes[2].GetValue());
710724
clientInfo->SetAvgReadSpeedPerDay(userInfo.AvgReadBytes[3].GetValue());
725+
726+
clientInfo->SetReadingFinished(inactivePartition && LastOffsetHasBeenCommited(userInfo, EndOffset));
711727
}
712728

713729
}
@@ -1831,6 +1847,10 @@ void TPartition::OnProcessTxsAndUserActsWriteComplete(ui64 cookie, const TActorC
18311847
} else {
18321848
TabletCounters.Cumulative()[COUNTER_PQ_WRITE_TIMESTAMP_CACHE_HIT].Increment(1);
18331849
}
1850+
1851+
if (!IsActive() && LastOffsetHasBeenCommited(userInfo, EndOffset)) {
1852+
SendReadingFinished(user);
1853+
}
18341854
} else {
18351855
auto ui = UsersInfoStorage->GetIfExists(user);
18361856
if (ui && ui->LabeledCounters) {

ydb/core/persqueue/partition.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ class TPartition : public TActorBootstrapped<TPartition> {
9292
struct THasDataReq;
9393
struct THasDataDeadline;
9494

95+
bool IsActive() const;
9596
bool CanWrite() const;
9697
bool CanEnqueue() const;
9798

@@ -104,6 +105,7 @@ class TPartition : public TActorBootstrapped<TPartition> {
104105
void ReplyOwnerOk(const TActorContext& ctx, const ui64 dst, const TString& ownerCookie, ui64 seqNo);
105106

106107
void ReplyWrite(const TActorContext& ctx, ui64 dst, const TString& sourceId, ui64 seqNo, ui16 partNo, ui16 totalParts, ui64 offset, TInstant writeTimestamp, bool already, ui64 maxSeqNo, TDuration partitionQuotedTime, TDuration topicQuotedTime, TDuration queueTime, TDuration writeTime);
108+
void SendReadingFinished(const TString& consumer);
107109

108110
void AddNewWriteBlob(std::pair<TKey, ui32>& res, TEvKeyValue::TEvRequest* request, bool headCleared, const TActorContext& ctx);
109111
void AnswerCurrentWrites(const TActorContext& ctx);

ydb/core/persqueue/partition_read.cpp

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,10 @@ namespace NKikimr::NPQ {
2929

3030
static const ui32 MAX_USER_ACTS = 1000;
3131

32+
void TPartition::SendReadingFinished(const TString& consumer) {
33+
Send(Tablet, new TEvPQ::TEvReadingPartitionFinishedRequest(consumer, Partition.OriginalPartitionId));
34+
}
35+
3236
void TPartition::FillReadFromTimestamps(const NKikimrPQ::TPQTabletConfig& config, const TActorContext& ctx) {
3337
TSet<TString> hasReadRule;
3438

@@ -87,6 +91,9 @@ void TPartition::FillReadFromTimestamps(const NKikimrPQ::TPQTabletConfig& config
8791
void TPartition::ProcessHasDataRequests(const TActorContext& ctx) {
8892
if (!InitDone)
8993
return;
94+
95+
auto now = ctx.Now();
96+
9097
for (auto it = HasDataRequests.begin(); it != HasDataRequests.end();) {
9198
if (it->Offset < EndOffset) {
9299
TAutoPtr<TEvPersQueue::TEvHasDataInfoResponse> res(new TEvPersQueue::TEvHasDataInfoResponse());
@@ -98,15 +105,16 @@ void TPartition::ProcessHasDataRequests(const TActorContext& ctx) {
98105
ctx.Send(it->Sender, res.Release());
99106
if (!it->ClientId.empty()) {
100107
auto& userInfo = UsersInfoStorage->GetOrCreate(it->ClientId, ctx);
101-
userInfo.ForgetSubscription(ctx.Now());
108+
userInfo.ForgetSubscription(now);
102109
}
103110
it = HasDataRequests.erase(it);
104111
} else {
105112
break;
106113
}
107114
}
115+
108116
for (auto it = HasDataDeadlines.begin(); it != HasDataDeadlines.end();) {
109-
if (it->Deadline <= ctx.Now()) {
117+
if (it->Deadline <= now) {
110118
auto jt = HasDataRequests.find(it->Request);
111119
if (jt != HasDataRequests.end()) {
112120
TAutoPtr<TEvPersQueue::TEvHasDataInfoResponse> res(new TEvPersQueue::TEvHasDataInfoResponse());
@@ -118,7 +126,7 @@ void TPartition::ProcessHasDataRequests(const TActorContext& ctx) {
118126
ctx.Send(it->Request.Sender, res.Release());
119127
if (!it->Request.ClientId.empty()) {
120128
auto& userInfo = UsersInfoStorage->GetOrCreate(it->Request.ClientId, ctx);
121-
userInfo.ForgetSubscription(ctx.Now());
129+
userInfo.ForgetSubscription(now);
122130
}
123131
HasDataRequests.erase(jt);
124132
}

ydb/core/persqueue/pq_impl.cpp

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1769,6 +1769,8 @@ void TPersQueue::Handle(TEvPersQueue::TEvPartitionClientInfo::TPtr& ev, const TA
17691769

17701770
void TPersQueue::Handle(TEvPersQueue::TEvStatus::TPtr& ev, const TActorContext& ctx)
17711771
{
1772+
ReadBalancerActorId = ev->Sender;
1773+
17721774
if (!ConfigInited) {
17731775
THolder<TEvPersQueue::TEvStatusResponse> res = MakeHolder<TEvPersQueue::TEvStatusResponse>();
17741776
auto& resp = res->Record;
@@ -4275,7 +4277,8 @@ void TPersQueue::Handle(TEvPersQueue::TEvProposeTransactionAttach::TPtr &ev, con
42754277
ctx.Send(ev->Sender, new TEvPersQueue::TEvProposeTransactionAttachResult(TabletID(), txId, status), 0, ev->Cookie);
42764278
}
42774279

4278-
void TPersQueue::Handle(TEvPQ::TEvCheckPartitionStatusRequest::TPtr& ev, const TActorContext& ctx) {
4280+
void TPersQueue::Handle(TEvPQ::TEvCheckPartitionStatusRequest::TPtr& ev, const TActorContext& ctx)
4281+
{
42794282
auto& record = ev->Get()->Record;
42804283
auto it = Partitions.find(TPartitionId(TPartitionId(record.GetPartition())));
42814284
if (it == Partitions.end()) {
@@ -4295,7 +4298,8 @@ void TPersQueue::Handle(TEvPQ::TEvCheckPartitionStatusRequest::TPtr& ev, const T
42954298
}
42964299
}
42974300

4298-
void TPersQueue::ProcessCheckPartitionStatusRequests(const TPartitionId& partitionId) {
4301+
void TPersQueue::ProcessCheckPartitionStatusRequests(const TPartitionId& partitionId)
4302+
{
42994303
Y_ABORT_UNLESS(!partitionId.WriteId.Defined());
43004304
ui32 originalPartitionId = partitionId.OriginalPartitionId;
43014305
auto sit = CheckPartitionStatusRequests.find(originalPartitionId);
@@ -4318,6 +4322,13 @@ void TPersQueue::Handle(NLongTxService::TEvLongTxService::TEvLockStatus::TPtr& e
43184322
}
43194323
}
43204324

4325+
void TPersQueue::Handle(TEvPQ::TEvReadingPartitionFinishedRequest::TPtr& ev, const TActorContext& ctx)
4326+
{
4327+
if (ReadBalancerActorId) {
4328+
ctx.Send(ReadBalancerActorId, ev->Release().Release());
4329+
}
4330+
}
4331+
43214332
TString TPersQueue::LogPrefix() const {
43224333
return TStringBuilder() << SelfId() << " ";
43234334
}
@@ -4372,6 +4383,7 @@ bool TPersQueue::HandleHook(STFUNC_SIG)
43724383
HFuncTraced(TEvMediatorTimecast::TEvRegisterTabletResult, Handle);
43734384
HFuncTraced(TEvPQ::TEvCheckPartitionStatusRequest, Handle);
43744385
HFuncTraced(NLongTxService::TEvLongTxService::TEvLockStatus, Handle);
4386+
HFuncTraced(TEvPQ::TEvReadingPartitionFinishedRequest, Handle);
43754387
default:
43764388
return false;
43774389
}

ydb/core/persqueue/pq_impl.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ class TPersQueue : public NKeyValue::TKeyValueFlat {
8989
void Handle(TEvPersQueue::TEvHasDataInfo::TPtr& ev, const TActorContext& ctx);
9090
void Handle(TEvPersQueue::TEvPartitionClientInfo::TPtr& ev, const TActorContext& ctx);
9191
void Handle(TEvPQ::TEvSubDomainStatus::TPtr& ev, const TActorContext& ctx);
92+
void Handle(TEvPQ::TEvReadingPartitionFinishedRequest::TPtr& ev, const TActorContext& ctx);
9293

9394
bool OnRenderAppHtmlPage(NMon::TEvRemoteHttpInfo::TPtr ev, const TActorContext& ctx) override;
9495

@@ -204,6 +205,7 @@ class TPersQueue : public NKeyValue::TKeyValueFlat {
204205
ui32 NextSupportivePartitionId = 100'000;
205206

206207
TActorId CacheActor;
208+
TActorId ReadBalancerActorId;
207209

208210
TSet<TChangeNotification> ChangeConfigNotification;
209211
NKikimrPQ::TPQTabletConfig NewConfig;

0 commit comments

Comments
 (0)