Skip to content

Commit 84a7930

Browse files
authored
Merge 1004f9e into 0004636
2 parents 0004636 + 1004f9e commit 84a7930

File tree

3 files changed

+54
-27
lines changed

3 files changed

+54
-27
lines changed

ydb/core/tx/columnshard/columnshard.cpp

Lines changed: 42 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -89,8 +89,7 @@ void TColumnShard::TrySwitchToWork(const TActorContext& ctx) {
8989
ctx.Send(SelfId(), new NActors::TEvents::TEvWakeup());
9090
ctx.Send(SelfId(), new TEvPrivate::TEvPeriodicWakeup());
9191
ctx.Send(SelfId(), new TEvPrivate::TEvPingSnapshotsUsage());
92-
ctx.Send(SelfId(), new TEvPrivate::TEvReportExecutorStatistics());
93-
ctx.Send(SelfId(), new TEvPrivate::TEvReportBaseStatistics());
92+
ctx.Send(SelfId(), new TEvPrivate::TEvBuildStatisticsPipe());
9493
NYDBTest::TControllers::GetColumnShardController()->OnSwitchToWork(TabletID());
9594
AFL_VERIFY(!!StartInstant);
9695
Counters.GetCSCounters().Initialization.OnSwitchToWork(TMonotonic::Now() - *StartInstant, TMonotonic::Now() - CreateInstant);
@@ -162,9 +161,12 @@ void TColumnShard::Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const TAc
162161
if (clientId == StatsReportPipe) {
163162
if (ev->Get()->Status == NKikimrProto::OK) {
164163
LOG_S_DEBUG("Connected to " << tabletId << " at tablet " << TabletID());
164+
ExecutorStatsEvInflight++;
165+
ActorContext().Send(SelfId(), new TEvPrivate::TEvBuildStatisticsPipe());
165166
} else {
166167
LOG_S_INFO("Failed to connect to " << tabletId << " at tablet " << TabletID());
167-
StatsReportPipe = {};
168+
LastStats = {};
169+
ActorContext().Send(SelfId(), new TEvPrivate::TEvBuildStatisticsPipe());
168170
}
169171
return;
170172
}
@@ -184,7 +186,6 @@ void TColumnShard::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TAc
184186
LOG_S_DEBUG("Client pipe reset to " << tabletId << " at tablet " << TabletID());
185187

186188
if (clientId == StatsReportPipe) {
187-
StatsReportPipe = {};
188189
LastStats = {};
189190
return;
190191
}
@@ -438,20 +439,14 @@ void TColumnShard::FillColumnTableStats(const TActorContext& ctx, std::unique_pt
438439
}
439440

440441
void TColumnShard::SendPeriodicStats(bool withExecutor) {
441-
if (!CurrentSchemeShardId || !TablesManager.GetTabletPathIdOptional()) {
442-
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("Disabled periodic stats at tablet", TabletID());
442+
if (!TablesManager.GetTabletPathIdOptional()) {
443+
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("Table Manager not ready", TabletID());
443444
return;
444445
}
445446

446447
const auto& tabletSchemeShardLocalPathId = TablesManager.GetTabletPathIdVerified().SchemeShardLocalPathId;
447448
const TActorContext& ctx = ActorContext();
448449

449-
if (!StatsReportPipe) {
450-
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("reate periodic stats pipe to ", CurrentSchemeShardId)("at tablet", TabletID());
451-
NTabletPipe::TClientConfig clientConfig;
452-
StatsReportPipe = ctx.Register(NTabletPipe::CreateClient(ctx.SelfID, CurrentSchemeShardId, clientConfig));
453-
}
454-
455450
if (!LastStats) {
456451
LastStats = std::make_unique<TEvDataShard::TEvPeriodicTableStats>(TabletID(), tabletSchemeShardLocalPathId.GetRawValue());
457452
}
@@ -475,21 +470,47 @@ void TColumnShard::SendPeriodicStats(bool withExecutor) {
475470
}
476471

477472
void TColumnShard::Handle(TEvPrivate::TEvReportBaseStatistics::TPtr& /*ev*/) {
478-
auto statistics = AppDataVerified().ColumnShardConfig.GetStatistics();
479-
auto scheduleDuration = TDuration::MilliSeconds(statistics.GetReportBaseStatisticsPeriodMs() + RandomNumber<ui32>(JitterIntervalMS));
480-
ActorContext().Schedule(scheduleDuration, new TEvPrivate::TEvReportBaseStatistics);
481-
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "TEvReportBaseStatistics")("ReportBaseStatisticsPeriodMs", statistics.GetReportBaseStatisticsPeriodMs())("scheduleDuration", scheduleDuration);
473+
BaseStatsEvInflight--;
474+
ScheduleBaseStatistics();
482475
SendPeriodicStats(false);
483-
return;
484476
}
485477

486478
void TColumnShard::Handle(TEvPrivate::TEvReportExecutorStatistics::TPtr& /*ev*/) {
479+
ExecutorStatsEvInflight--;
480+
ScheduleExecutorStatistics();
481+
SendPeriodicStats(true);
482+
}
483+
484+
void TColumnShard::ScheduleBaseStatistics() {
485+
auto statistics = AppDataVerified().ColumnShardConfig.GetStatistics();
486+
auto scheduleDuration = TDuration::MilliSeconds(statistics.GetReportBaseStatisticsPeriodMs() + RandomNumber<ui32>(JitterIntervalMS));
487+
if (!BaseStatsEvInflight) {
488+
BaseStatsEvInflight++;
489+
ActorContext().Schedule(scheduleDuration, new TEvPrivate::TEvReportBaseStatistics);
490+
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "TEvReportBaseStatistics")("ReportBaseStatisticsPeriodMs", statistics.GetReportBaseStatisticsPeriodMs())("scheduleDuration", scheduleDuration);
491+
}
492+
}
493+
494+
void TColumnShard::ScheduleExecutorStatistics() {
487495
auto statistics = AppDataVerified().ColumnShardConfig.GetStatistics();
488496
auto scheduleDuration = TDuration::MilliSeconds(statistics.GetReportExecutorStatisticsPeriodMs() + RandomNumber<ui32>(JitterIntervalMS));
489-
ActorContext().Schedule(scheduleDuration, new TEvPrivate::TEvReportExecutorStatistics);
490-
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "TEvReportExecutorStatistics")("ReportExecutorStatisticsPeriodMs", statistics.GetReportExecutorStatisticsPeriodMs())("scheduleDuration", scheduleDuration);
491-
SendPeriodicStats(true);
492-
return;
497+
if (!ExecutorStatsEvInflight) {
498+
ExecutorStatsEvInflight++;
499+
ActorContext().Schedule(scheduleDuration, new TEvPrivate::TEvReportExecutorStatistics);
500+
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "TEvReportExecutorStatistics")("ReportExecutorStatisticsPeriodMs", statistics.GetReportExecutorStatisticsPeriodMs())("scheduleDuration", scheduleDuration);
501+
}
502+
}
503+
504+
505+
void TColumnShard::Handle(TEvPrivate::TEvBuildStatisticsPipe::TPtr& /*ev*/) {
506+
if (!CurrentSchemeShardId) {
507+
ActorContext().Schedule(TDuration::Seconds(1000), new TEvPrivate::TEvBuildStatisticsPipe);
508+
}
509+
510+
StatsReportPipe = ActorContext().Register(NTabletPipe::CreateClient(ActorContext().SelfID, CurrentSchemeShardId, {}));
511+
512+
ScheduleBaseStatistics();
513+
ScheduleExecutorStatistics();
493514
}
494515

495516
} // namespace NKikimr::NColumnShard

ydb/core/tx/columnshard/columnshard_impl.h

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -266,8 +266,9 @@ class TColumnShard: public TActor<TColumnShard>, public NTabletFlatExecutor::TTa
266266

267267
void Handle(TEvPrivate::TEvScanStats::TPtr& ev, const TActorContext& ctx);
268268
void Handle(TEvPrivate::TEvReadFinished::TPtr& ev, const TActorContext& ctx);
269-
void Handle(TEvPrivate::TEvReportBaseStatistics::TPtr& ev);
270-
void Handle(TEvPrivate::TEvReportExecutorStatistics::TPtr& ev);
269+
void Handle(TEvPrivate::TEvReportBaseStatistics::TPtr&);
270+
void Handle(TEvPrivate::TEvReportExecutorStatistics::TPtr&);
271+
void Handle(TEvPrivate::TEvBuildStatisticsPipe::TPtr&);
271272
void Handle(TEvPrivate::TEvPeriodicWakeup::TPtr& ev, const TActorContext& ctx);
272273
void Handle(NActors::TEvents::TEvWakeup::TPtr& ev, const TActorContext& ctx);
273274
void Handle(TEvPrivate::TEvPingSnapshotsUsage::TPtr& ev, const TActorContext& ctx);
@@ -525,7 +526,6 @@ class TColumnShard: public TActor<TColumnShard>, public NTabletFlatExecutor::TTa
525526
NOlap::NDataAccessorControl::TDataAccessorsManagerContainer DataAccessorsManager;
526527
NBackgroundTasks::TControlInterfaceContainer<NOlap::NColumnFetching::TColumnDataManager> ColumnDataManager;
527528

528-
TActorId StatsReportPipe;
529529
std::vector<TActorId> ActorsToStop;
530530

531531
TInFlightReadsTracker InFlightReadsTracker;
@@ -549,8 +549,11 @@ class TColumnShard: public TActor<TColumnShard>, public NTabletFlatExecutor::TTa
549549
THashMap<TActorId, TActorId> PipeServersInterconnectSessions;
550550
TActorId ScanDiagnosticsActorId;
551551

552+
TActorId StatsReportPipe;
552553
std::unique_ptr<TEvDataShard::TEvPeriodicTableStats> LastStats;
553554
ui32 JitterIntervalMS = 200;
555+
ui32 BaseStatsEvInflight = 0;
556+
ui32 ExecutorStatsEvInflight = 0;
554557
void TryRegisterMediatorTimeCast();
555558
void UnregisterMediatorTimeCast();
556559

@@ -600,6 +603,8 @@ class TColumnShard: public TActor<TColumnShard>, public NTabletFlatExecutor::TTa
600603
ui64 MemoryUsage() const;
601604

602605
void SendPeriodicStats(bool);
606+
void ScheduleBaseStatistics();
607+
void ScheduleExecutorStatistics();
603608

604609
void FillOlapStats(const TActorContext& ctx, std::unique_ptr<TEvDataShard::TEvPeriodicTableStats>& ev, IExecutor* executor);
605610
void FillColumnTableStats(const TActorContext& ctx, std::unique_ptr<TEvDataShard::TEvPeriodicTableStats>& ev, IExecutor* executor);

ydb/core/tx/columnshard/columnshard_private_events.h

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ struct TEvPrivate {
4242
EvPeriodicWakeup,
4343
EvReportBaseStatistics,
4444
EvReportExecutorStatistics,
45+
EvBuildStatisticsPipe,
4546
EvEviction,
4647
EvS3Settings,
4748
EvExport,
@@ -81,7 +82,7 @@ struct TEvPrivate {
8182
EvRequestFilter,
8283
EvFilterRequestResourcesAllocated,
8384
EvFilterConstructionResult,
84-
85+
8586
EvReportScanDiagnostics,
8687
EvReportScanIteratorDiagnostics,
8788

@@ -294,8 +295,8 @@ struct TEvPrivate {
294295
};
295296

296297
struct TEvReportBaseStatistics: public TEventLocal<TEvReportBaseStatistics, EvReportBaseStatistics> {};
297-
298298
struct TEvReportExecutorStatistics: public TEventLocal<TEvReportExecutorStatistics, EvReportExecutorStatistics> {};
299+
struct TEvBuildStatisticsPipe: public TEventLocal<TEvBuildStatisticsPipe, EvBuildStatisticsPipe> {};
299300

300301
struct TEvPingSnapshotsUsage: public TEventLocal<TEvPingSnapshotsUsage, EvPingSnapshotsUsage> {
301302
TEvPingSnapshotsUsage() = default;
@@ -354,7 +355,7 @@ struct TEvPrivate {
354355
return WritesBuffer;
355356
}
356357
};
357-
358+
358359
struct TEvReportScanDiagnostics: public TEventLocal<TEvReportScanDiagnostics, EvReportScanDiagnostics> {
359360
TEvReportScanDiagnostics(TString&& requestMessage, TString&& dotGraph, TString&& ssaProgram, TString&& pkRangesFilter, bool isPublicScan)
360361
: RequestMessage(std::move(requestMessage))

0 commit comments

Comments
 (0)