Skip to content

Commit 88deb58

Browse files
authored
Merge c125e84 into d32315a
2 parents d32315a + c125e84 commit 88deb58

File tree

13 files changed

+519
-142
lines changed

13 files changed

+519
-142
lines changed

ydb/core/persqueue/events/internal.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,7 @@ struct TEvPQ {
183183
EvGetWriteInfoRequest,
184184
EvGetWriteInfoResponse,
185185
EvGetWriteInfoError,
186+
EvReadingPartitionStatusRequest,
186187
EvEnd
187188
};
188189

@@ -1076,6 +1077,15 @@ struct TEvPQ {
10761077
{
10771078
}
10781079
};
1080+
1081+
struct TEvReadingPartitionStatusRequest : public TEventPB<TEvReadingPartitionStatusRequest, NKikimrPQ::TEvReadingPartitionStatusRequest, EvReadingPartitionStatusRequest> {
1082+
TEvReadingPartitionStatusRequest() = default;
1083+
1084+
TEvReadingPartitionStatusRequest(const TString& consumer, ui32 partitionId) {
1085+
Record.SetConsumer(consumer);
1086+
Record.SetPartitionId(partitionId);
1087+
}
1088+
};
10791089
};
10801090

10811091
} //NKikimr

ydb/core/persqueue/partition.cpp

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,10 @@ auto GetStepAndTxId(const E& event)
4141
return GetStepAndTxId(event.Step, event.TxId);
4242
}
4343

44+
bool TPartition::LastOffsetHasBeenCommited(const TUserInfo& userInfo) const {
45+
return !IsActive() && static_cast<ui64>(std::max<i64>(userInfo.Offset, 0)) == EndOffset;
46+
}
47+
4448
struct TMirrorerInfo {
4549
TMirrorerInfo(const TActorId& actor, const TTabletCountersBase& baseline)
4650
: Actor(actor) {
@@ -85,6 +89,10 @@ TString TPartition::LogPrefix() const {
8589
return TStringBuilder() << "" << SelfId() << " " << state << " Partition: " << Partition << " ";
8690
}
8791

92+
bool TPartition::IsActive() const {
93+
return PartitionConfig == nullptr || PartitionConfig->GetStatus() == NKikimrPQ::ETopicPartitionStatus::Active;
94+
}
95+
8896
bool TPartition::CanWrite() const {
8997
if (PartitionConfig == nullptr) {
9098
// Old format without AllPartitions configuration field.
@@ -99,16 +107,20 @@ bool TPartition::CanWrite() const {
99107
// Pending configuration tx inactivate this partition.
100108
return false;
101109
}
102-
if (ClosedInternalPartition)
110+
111+
if (ClosedInternalPartition) {
103112
return false;
113+
}
104114

105-
return PartitionConfig->GetStatus() == NKikimrPQ::ETopicPartitionStatus::Active;
115+
return IsActive();
106116
}
107117

108118
bool TPartition::CanEnqueue() const {
109-
if (ClosedInternalPartition)
119+
if (ClosedInternalPartition) {
110120
return false;
111-
return PartitionConfig == nullptr || PartitionConfig->GetStatus() == NKikimrPQ::ETopicPartitionStatus::Active;
121+
}
122+
123+
return IsActive();
112124
}
113125

114126
ui64 GetOffsetEstimate(const std::deque<TDataKey>& container, TInstant timestamp, ui64 offset) {
@@ -708,6 +720,8 @@ void TPartition::Handle(TEvPQ::TEvPartitionStatus::TPtr& ev, const TActorContext
708720
clientInfo->SetAvgReadSpeedPerMin(userInfo.AvgReadBytes[1].GetValue());
709721
clientInfo->SetAvgReadSpeedPerHour(userInfo.AvgReadBytes[2].GetValue());
710722
clientInfo->SetAvgReadSpeedPerDay(userInfo.AvgReadBytes[3].GetValue());
723+
724+
clientInfo->SetReadingFinished(LastOffsetHasBeenCommited(userInfo));
711725
}
712726

713727
}
@@ -1831,6 +1845,10 @@ void TPartition::OnProcessTxsAndUserActsWriteComplete(ui64 cookie, const TActorC
18311845
} else {
18321846
TabletCounters.Cumulative()[COUNTER_PQ_WRITE_TIMESTAMP_CACHE_HIT].Increment(1);
18331847
}
1848+
1849+
if (LastOffsetHasBeenCommited(userInfo)) {
1850+
SendReadingFinished(user);
1851+
}
18341852
} else {
18351853
auto ui = UsersInfoStorage->GetIfExists(user);
18361854
if (ui && ui->LabeledCounters) {

ydb/core/persqueue/partition.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,9 +92,12 @@ class TPartition : public TActorBootstrapped<TPartition> {
9292
struct THasDataReq;
9393
struct THasDataDeadline;
9494

95+
bool IsActive() const;
9596
bool CanWrite() const;
9697
bool CanEnqueue() const;
9798

99+
bool LastOffsetHasBeenCommited(const TUserInfo& userInfo) const;
100+
98101
void ReplyError(const TActorContext& ctx, const ui64 dst, NPersQueue::NErrorCode::EErrorCode errorCode, const TString& error);
99102
void ReplyPropose(const TActorContext& ctx, const NKikimrPQ::TEvProposeTransaction& event, NKikimrPQ::TEvProposeTransactionResult::EStatus statusCode);
100103
void ReplyErrorForStoredWrites(const TActorContext& ctx);
@@ -104,6 +107,7 @@ class TPartition : public TActorBootstrapped<TPartition> {
104107
void ReplyOwnerOk(const TActorContext& ctx, const ui64 dst, const TString& ownerCookie, ui64 seqNo);
105108

106109
void ReplyWrite(const TActorContext& ctx, ui64 dst, const TString& sourceId, ui64 seqNo, ui16 partNo, ui16 totalParts, ui64 offset, TInstant writeTimestamp, bool already, ui64 maxSeqNo, TDuration partitionQuotedTime, TDuration topicQuotedTime, TDuration queueTime, TDuration writeTime);
110+
void SendReadingFinished(const TString& consumer);
107111

108112
void AddNewWriteBlob(std::pair<TKey, ui32>& res, TEvKeyValue::TEvRequest* request, bool headCleared, const TActorContext& ctx);
109113
void AnswerCurrentWrites(const TActorContext& ctx);

ydb/core/persqueue/partition_read.cpp

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,10 @@ namespace NKikimr::NPQ {
2929

3030
static const ui32 MAX_USER_ACTS = 1000;
3131

32+
void TPartition::SendReadingFinished(const TString& consumer) {
33+
Send(Tablet, new TEvPQ::TEvReadingPartitionStatusRequest(consumer, Partition.OriginalPartitionId));
34+
}
35+
3236
void TPartition::FillReadFromTimestamps(const NKikimrPQ::TPQTabletConfig& config, const TActorContext& ctx) {
3337
TSet<TString> hasReadRule;
3438

@@ -87,6 +91,9 @@ void TPartition::FillReadFromTimestamps(const NKikimrPQ::TPQTabletConfig& config
8791
void TPartition::ProcessHasDataRequests(const TActorContext& ctx) {
8892
if (!InitDone)
8993
return;
94+
95+
auto now = ctx.Now();
96+
9097
for (auto it = HasDataRequests.begin(); it != HasDataRequests.end();) {
9198
if (it->Offset < EndOffset) {
9299
TAutoPtr<TEvPersQueue::TEvHasDataInfoResponse> res(new TEvPersQueue::TEvHasDataInfoResponse());
@@ -98,15 +105,16 @@ void TPartition::ProcessHasDataRequests(const TActorContext& ctx) {
98105
ctx.Send(it->Sender, res.Release());
99106
if (!it->ClientId.empty()) {
100107
auto& userInfo = UsersInfoStorage->GetOrCreate(it->ClientId, ctx);
101-
userInfo.ForgetSubscription(ctx.Now());
108+
userInfo.ForgetSubscription(now);
102109
}
103110
it = HasDataRequests.erase(it);
104111
} else {
105112
break;
106113
}
107114
}
115+
108116
for (auto it = HasDataDeadlines.begin(); it != HasDataDeadlines.end();) {
109-
if (it->Deadline <= ctx.Now()) {
117+
if (it->Deadline <= now) {
110118
auto jt = HasDataRequests.find(it->Request);
111119
if (jt != HasDataRequests.end()) {
112120
TAutoPtr<TEvPersQueue::TEvHasDataInfoResponse> res(new TEvPersQueue::TEvHasDataInfoResponse());
@@ -118,7 +126,7 @@ void TPartition::ProcessHasDataRequests(const TActorContext& ctx) {
118126
ctx.Send(it->Request.Sender, res.Release());
119127
if (!it->Request.ClientId.empty()) {
120128
auto& userInfo = UsersInfoStorage->GetOrCreate(it->Request.ClientId, ctx);
121-
userInfo.ForgetSubscription(ctx.Now());
129+
userInfo.ForgetSubscription(now);
122130
}
123131
HasDataRequests.erase(jt);
124132
}

ydb/core/persqueue/pq_impl.cpp

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1769,6 +1769,8 @@ void TPersQueue::Handle(TEvPersQueue::TEvPartitionClientInfo::TPtr& ev, const TA
17691769

17701770
void TPersQueue::Handle(TEvPersQueue::TEvStatus::TPtr& ev, const TActorContext& ctx)
17711771
{
1772+
ReadBalancerActorId = ev->Sender;
1773+
17721774
if (!ConfigInited) {
17731775
THolder<TEvPersQueue::TEvStatusResponse> res = MakeHolder<TEvPersQueue::TEvStatusResponse>();
17741776
auto& resp = res->Record;
@@ -1779,8 +1781,8 @@ void TPersQueue::Handle(TEvPersQueue::TEvStatus::TPtr& ev, const TActorContext&
17791781
}
17801782

17811783
ui32 cnt = 0;
1782-
for (auto& p : Partitions) {
1783-
cnt += p.second.InitDone;
1784+
for (auto& [_, partitionInfo] : Partitions) {
1785+
cnt += partitionInfo.InitDone;
17841786
}
17851787

17861788
TActorId ans = CreateStatusProxyActor(TabletID(), ev->Sender, cnt, ev->Cookie, ctx);
@@ -4245,6 +4247,8 @@ NTabletPipe::TClientConfig TPersQueue::GetPipeClientConfig()
42454247

42464248
void TPersQueue::Handle(TEvPQ::TEvSubDomainStatus::TPtr& ev, const TActorContext& ctx)
42474249
{
4250+
ReadBalancerActorId = ev->Sender;
4251+
42484252
const TEvPQ::TEvSubDomainStatus& event = *ev->Get();
42494253
SubDomainOutOfSpace = event.SubDomainOutOfSpace();
42504254

@@ -4275,7 +4279,8 @@ void TPersQueue::Handle(TEvPersQueue::TEvProposeTransactionAttach::TPtr &ev, con
42754279
ctx.Send(ev->Sender, new TEvPersQueue::TEvProposeTransactionAttachResult(TabletID(), txId, status), 0, ev->Cookie);
42764280
}
42774281

4278-
void TPersQueue::Handle(TEvPQ::TEvCheckPartitionStatusRequest::TPtr& ev, const TActorContext& ctx) {
4282+
void TPersQueue::Handle(TEvPQ::TEvCheckPartitionStatusRequest::TPtr& ev, const TActorContext& ctx)
4283+
{
42794284
auto& record = ev->Get()->Record;
42804285
auto it = Partitions.find(TPartitionId(TPartitionId(record.GetPartition())));
42814286
if (it == Partitions.end()) {
@@ -4295,7 +4300,8 @@ void TPersQueue::Handle(TEvPQ::TEvCheckPartitionStatusRequest::TPtr& ev, const T
42954300
}
42964301
}
42974302

4298-
void TPersQueue::ProcessCheckPartitionStatusRequests(const TPartitionId& partitionId) {
4303+
void TPersQueue::ProcessCheckPartitionStatusRequests(const TPartitionId& partitionId)
4304+
{
42994305
Y_ABORT_UNLESS(!partitionId.WriteId.Defined());
43004306
ui32 originalPartitionId = partitionId.OriginalPartitionId;
43014307
auto sit = CheckPartitionStatusRequests.find(originalPartitionId);
@@ -4318,6 +4324,13 @@ void TPersQueue::Handle(NLongTxService::TEvLongTxService::TEvLockStatus::TPtr& e
43184324
}
43194325
}
43204326

4327+
void TPersQueue::Handle(TEvPQ::TEvReadingPartitionStatusRequest::TPtr& ev, const TActorContext& ctx)
4328+
{
4329+
if (ReadBalancerActorId) {
4330+
ctx.Send(ReadBalancerActorId, ev->Release().Release());
4331+
}
4332+
}
4333+
43214334
TString TPersQueue::LogPrefix() const {
43224335
return TStringBuilder() << SelfId() << " ";
43234336
}
@@ -4372,6 +4385,7 @@ bool TPersQueue::HandleHook(STFUNC_SIG)
43724385
HFuncTraced(TEvMediatorTimecast::TEvRegisterTabletResult, Handle);
43734386
HFuncTraced(TEvPQ::TEvCheckPartitionStatusRequest, Handle);
43744387
HFuncTraced(NLongTxService::TEvLongTxService::TEvLockStatus, Handle);
4388+
HFuncTraced(TEvPQ::TEvReadingPartitionStatusRequest, Handle);
43754389
default:
43764390
return false;
43774391
}

ydb/core/persqueue/pq_impl.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ class TPersQueue : public NKeyValue::TKeyValueFlat {
8989
void Handle(TEvPersQueue::TEvHasDataInfo::TPtr& ev, const TActorContext& ctx);
9090
void Handle(TEvPersQueue::TEvPartitionClientInfo::TPtr& ev, const TActorContext& ctx);
9191
void Handle(TEvPQ::TEvSubDomainStatus::TPtr& ev, const TActorContext& ctx);
92+
void Handle(TEvPQ::TEvReadingPartitionStatusRequest::TPtr& ev, const TActorContext& ctx);
9293

9394
bool OnRenderAppHtmlPage(NMon::TEvRemoteHttpInfo::TPtr ev, const TActorContext& ctx) override;
9495

@@ -204,6 +205,7 @@ class TPersQueue : public NKeyValue::TKeyValueFlat {
204205
ui32 NextSupportivePartitionId = 100'000;
205206

206207
TActorId CacheActor;
208+
TActorId ReadBalancerActorId;
207209

208210
TSet<TChangeNotification> ChangeConfigNotification;
209211
NKikimrPQ::TPQTabletConfig NewConfig;

0 commit comments

Comments
 (0)