Skip to content

Commit 4d0d91a

Browse files
committed
fix initialization
1 parent b040b2f commit 4d0d91a

File tree

4 files changed

+15
-4
lines changed

4 files changed

+15
-4
lines changed

ydb/core/persqueue/pq_impl.cpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1781,10 +1781,12 @@ void TPersQueue::Handle(TEvPersQueue::TEvStatus::TPtr& ev, const TActorContext&
17811781
}
17821782

17831783
ui32 cnt = 0;
1784-
for (auto& p : Partitions) {
1785-
cnt += p.second.InitDone;
1784+
for (auto& [_, partitionInfo] : Partitions) {
1785+
cnt += partitionInfo.InitDone;
17861786
}
17871787

1788+
Cerr << ">>>>> InitDone=" << cnt << Endl;
1789+
17881790
TActorId ans = CreateStatusProxyActor(TabletID(), ev->Sender, cnt, ev->Cookie, ctx);
17891791
for (auto& p : Partitions) {
17901792
if (!p.second.InitDone)

ydb/core/persqueue/read_balancer.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -768,6 +768,8 @@ void TPersQueueReadBalancer::RequestTabletIfNeeded(const ui64 tabletId, const TA
768768
AggregatedStats.Cookies[tabletId] = cookie;
769769
}
770770

771+
LOG_DEBUG(ctx, NKikimrServices::PERSQUEUE_READ_BALANCER,
772+
TStringBuilder() << "Send TEvPersQueue::TEvStatus TabletId: " << tabletId << " Cookie: " << cookie);
771773
NTabletPipe::SendData(ctx, pipeClient, new TEvPersQueue::TEvStatus("", true), cookie);
772774
}
773775
NTabletPipe::SendData(ctx, pipeClient, new TEvPQ::TEvSubDomainStatus(SubDomainOutOfSpace));
@@ -780,7 +782,6 @@ void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvStatusResponse::TPtr& ev, c
780782
ui64 tabletId = record.GetTabletId();
781783
ui64 cookie = ev->Cookie;
782784

783-
784785
if ((0 != cookie && cookie != AggregatedStats.Cookies[tabletId]) || (0 == cookie && !AggregatedStats.Cookies.contains(tabletId))) {
785786
return;
786787
}

ydb/core/persqueue/read_balancer.h

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -257,7 +257,14 @@ class TPersQueueReadBalancer : public TActor<TPersQueueReadBalancer>, public TTa
257257
}
258258
RegisterEvents.clear();
259259

260-
ctx.Send(ctx.SelfID, new TEvents::TEvWakeup());
260+
for (auto& [_, partitionInfo] : PartitionsInfo) {
261+
RequestTabletIfNeeded(partitionInfo.TabletId, ctx, true);
262+
}
263+
264+
auto wakeupInterval = AppData(ctx)->PQConfig.GetBalancerWakeupIntervalSec();
265+
Y_ABORT_UNLESS(0 < wakeupInterval);
266+
ctx.Schedule(TDuration::Seconds(wakeupInterval), new TEvents::TEvWakeup());
267+
261268
ctx.Send(ctx.SelfID, new TEvPersQueue::TEvUpdateACL());
262269
}
263270

ydb/core/tx/schemeshard/ut_stats/ut_stats.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,7 @@ Y_UNIT_TEST_SUITE(TSchemeshardStatsBatchingTest) {
303303
TTestEnv env(runtime);
304304

305305
runtime.SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NActors::NLog::PRI_TRACE);
306+
runtime.SetLogPriority(NKikimrServices::PERSQUEUE_READ_BALANCER, NActors::NLog::PRI_TRACE);
306307

307308
auto& appData = runtime.GetAppData();
308309

0 commit comments

Comments
 (0)