Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 43 additions & 21 deletions ydb/core/tx/columnshard/columnshard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,7 @@ void TColumnShard::TrySwitchToWork(const TActorContext& ctx) {
ctx.Send(SelfId(), new NActors::TEvents::TEvWakeup());
ctx.Send(SelfId(), new TEvPrivate::TEvPeriodicWakeup());
ctx.Send(SelfId(), new TEvPrivate::TEvPingSnapshotsUsage());
ctx.Send(SelfId(), new TEvPrivate::TEvReportExecutorStatistics());
ctx.Send(SelfId(), new TEvPrivate::TEvReportBaseStatistics());
ctx.Send(SelfId(), new TEvPrivate::TEvBuildStatisticsPipe());
NYDBTest::TControllers::GetColumnShardController()->OnSwitchToWork(TabletID());
AFL_VERIFY(!!StartInstant);
Counters.GetCSCounters().Initialization.OnSwitchToWork(TMonotonic::Now() - *StartInstant, TMonotonic::Now() - CreateInstant);
Expand Down Expand Up @@ -162,9 +161,12 @@ void TColumnShard::Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const TAc
if (clientId == StatsReportPipe) {
if (ev->Get()->Status == NKikimrProto::OK) {
LOG_S_DEBUG("Connected to " << tabletId << " at tablet " << TabletID());
ExecutorStatsEvInflight++;
ActorContext().Send(SelfId(), new TEvPrivate::TEvBuildStatisticsPipe());
} else {
LOG_S_INFO("Failed to connect to " << tabletId << " at tablet " << TabletID());
StatsReportPipe = {};
LastStats = {};
ActorContext().Send(SelfId(), new TEvPrivate::TEvBuildStatisticsPipe());
}
return;
}
Expand All @@ -184,7 +186,6 @@ void TColumnShard::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TAc
LOG_S_DEBUG("Client pipe reset to " << tabletId << " at tablet " << TabletID());

if (clientId == StatsReportPipe) {
StatsReportPipe = {};
LastStats = {};
return;
}
Expand Down Expand Up @@ -438,20 +439,14 @@ void TColumnShard::FillColumnTableStats(const TActorContext& ctx, std::unique_pt
}

void TColumnShard::SendPeriodicStats(bool withExecutor) {
if (!CurrentSchemeShardId || !TablesManager.GetTabletPathIdOptional()) {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("Disabled periodic stats at tablet", TabletID());
if (!TablesManager.GetTabletPathIdOptional()) {
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("Table Manager not ready", TabletID());
return;
}

const auto& tabletSchemeShardLocalPathId = TablesManager.GetTabletPathIdVerified().SchemeShardLocalPathId;
const TActorContext& ctx = ActorContext();

if (!StatsReportPipe) {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("reate periodic stats pipe to ", CurrentSchemeShardId)("at tablet", TabletID());
NTabletPipe::TClientConfig clientConfig;
StatsReportPipe = ctx.Register(NTabletPipe::CreateClient(ctx.SelfID, CurrentSchemeShardId, clientConfig));
}

if (!LastStats) {
LastStats = std::make_unique<TEvDataShard::TEvPeriodicTableStats>(TabletID(), tabletSchemeShardLocalPathId.GetRawValue());
}
Expand All @@ -475,21 +470,48 @@ void TColumnShard::SendPeriodicStats(bool withExecutor) {
}

void TColumnShard::Handle(TEvPrivate::TEvReportBaseStatistics::TPtr& /*ev*/) {
auto statistics = AppDataVerified().ColumnShardConfig.GetStatistics();
auto scheduleDuration = TDuration::MilliSeconds(statistics.GetReportBaseStatisticsPeriodMs() + RandomNumber<ui32>(JitterIntervalMS));
ActorContext().Schedule(scheduleDuration, new TEvPrivate::TEvReportBaseStatistics);
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "TEvReportBaseStatistics")("ReportBaseStatisticsPeriodMs", statistics.GetReportBaseStatisticsPeriodMs())("scheduleDuration", scheduleDuration);
BaseStatsEvInflight--;
ScheduleBaseStatistics();
SendPeriodicStats(false);
return;
}

void TColumnShard::Handle(TEvPrivate::TEvReportExecutorStatistics::TPtr& /*ev*/) {
ExecutorStatsEvInflight--;
ScheduleExecutorStatistics();
SendPeriodicStats(true);
}

void TColumnShard::ScheduleBaseStatistics() {
auto statistics = AppDataVerified().ColumnShardConfig.GetStatistics();
auto scheduleDuration = TDuration::MilliSeconds(statistics.GetReportBaseStatisticsPeriodMs() + RandomNumber<ui32>(JitterIntervalMS));
if (!BaseStatsEvInflight) {
BaseStatsEvInflight++;
ActorContext().Schedule(scheduleDuration, new TEvPrivate::TEvReportBaseStatistics);
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "TEvReportBaseStatistics")("ReportBaseStatisticsPeriodMs", statistics.GetReportBaseStatisticsPeriodMs())("scheduleDuration", scheduleDuration);
}
}

void TColumnShard::ScheduleExecutorStatistics() {
auto statistics = AppDataVerified().ColumnShardConfig.GetStatistics();
auto scheduleDuration = TDuration::MilliSeconds(statistics.GetReportExecutorStatisticsPeriodMs() + RandomNumber<ui32>(JitterIntervalMS));
ActorContext().Schedule(scheduleDuration, new TEvPrivate::TEvReportExecutorStatistics);
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "TEvReportExecutorStatistics")("ReportExecutorStatisticsPeriodMs", statistics.GetReportExecutorStatisticsPeriodMs())("scheduleDuration", scheduleDuration);
SendPeriodicStats(true);
return;
if (!ExecutorStatsEvInflight) {
ExecutorStatsEvInflight++;
ActorContext().Schedule(scheduleDuration, new TEvPrivate::TEvReportExecutorStatistics);
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "TEvReportExecutorStatistics")("ReportExecutorStatisticsPeriodMs", statistics.GetReportExecutorStatisticsPeriodMs())("scheduleDuration", scheduleDuration);
}
}


void TColumnShard::Handle(TEvPrivate::TEvBuildStatisticsPipe::TPtr& /*ev*/) {
if (!CurrentSchemeShardId) {
ActorContext().Schedule(TDuration::Seconds(1), new TEvPrivate::TEvBuildStatisticsPipe);
return;
}

StatsReportPipe = ActorContext().Register(NTabletPipe::CreateClient(ActorContext().SelfID, CurrentSchemeShardId, {}));

ScheduleBaseStatistics();
ScheduleExecutorStatistics();
}

} // namespace NKikimr::NColumnShard
12 changes: 9 additions & 3 deletions ydb/core/tx/columnshard/columnshard_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -266,8 +266,9 @@ class TColumnShard: public TActor<TColumnShard>, public NTabletFlatExecutor::TTa

void Handle(TEvPrivate::TEvScanStats::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPrivate::TEvReadFinished::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPrivate::TEvReportBaseStatistics::TPtr& ev);
void Handle(TEvPrivate::TEvReportExecutorStatistics::TPtr& ev);
void Handle(TEvPrivate::TEvReportBaseStatistics::TPtr&);
void Handle(TEvPrivate::TEvReportExecutorStatistics::TPtr&);
void Handle(TEvPrivate::TEvBuildStatisticsPipe::TPtr&);
void Handle(TEvPrivate::TEvPeriodicWakeup::TPtr& ev, const TActorContext& ctx);
void Handle(NActors::TEvents::TEvWakeup::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPrivate::TEvPingSnapshotsUsage::TPtr& ev, const TActorContext& ctx);
Expand Down Expand Up @@ -441,6 +442,7 @@ class TColumnShard: public TActor<TColumnShard>, public NTabletFlatExecutor::TTa
HFunc(TEvPrivate::TEvPingSnapshotsUsage, Handle);
hFunc(TEvPrivate::TEvReportBaseStatistics, Handle);
hFunc(TEvPrivate::TEvReportExecutorStatistics, Handle);
hFunc(TEvPrivate::TEvBuildStatisticsPipe, Handle);

HFunc(NEvents::TDataEvents::TEvWrite, Handle);
HFunc(TEvPrivate::TEvWriteDraft, Handle);
Expand Down Expand Up @@ -525,7 +527,6 @@ class TColumnShard: public TActor<TColumnShard>, public NTabletFlatExecutor::TTa
NOlap::NDataAccessorControl::TDataAccessorsManagerContainer DataAccessorsManager;
NBackgroundTasks::TControlInterfaceContainer<NOlap::NColumnFetching::TColumnDataManager> ColumnDataManager;

TActorId StatsReportPipe;
std::vector<TActorId> ActorsToStop;

TInFlightReadsTracker InFlightReadsTracker;
Expand All @@ -549,8 +550,11 @@ class TColumnShard: public TActor<TColumnShard>, public NTabletFlatExecutor::TTa
THashMap<TActorId, TActorId> PipeServersInterconnectSessions;
TActorId ScanDiagnosticsActorId;

TActorId StatsReportPipe;
std::unique_ptr<TEvDataShard::TEvPeriodicTableStats> LastStats;
ui32 JitterIntervalMS = 200;
ui32 BaseStatsEvInflight = 0;
ui32 ExecutorStatsEvInflight = 0;
void TryRegisterMediatorTimeCast();
void UnregisterMediatorTimeCast();

Expand Down Expand Up @@ -600,6 +604,8 @@ class TColumnShard: public TActor<TColumnShard>, public NTabletFlatExecutor::TTa
ui64 MemoryUsage() const;

void SendPeriodicStats(bool);
void ScheduleBaseStatistics();
void ScheduleExecutorStatistics();

void FillOlapStats(const TActorContext& ctx, std::unique_ptr<TEvDataShard::TEvPeriodicTableStats>& ev, IExecutor* executor);
void FillColumnTableStats(const TActorContext& ctx, std::unique_ptr<TEvDataShard::TEvPeriodicTableStats>& ev, IExecutor* executor);
Expand Down
7 changes: 4 additions & 3 deletions ydb/core/tx/columnshard/columnshard_private_events.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ struct TEvPrivate {
EvPeriodicWakeup,
EvReportBaseStatistics,
EvReportExecutorStatistics,
EvBuildStatisticsPipe,
EvEviction,
EvS3Settings,
EvExport,
Expand Down Expand Up @@ -81,7 +82,7 @@ struct TEvPrivate {
EvRequestFilter,
EvFilterRequestResourcesAllocated,
EvFilterConstructionResult,

EvReportScanDiagnostics,
EvReportScanIteratorDiagnostics,

Expand Down Expand Up @@ -294,8 +295,8 @@ struct TEvPrivate {
};

struct TEvReportBaseStatistics: public TEventLocal<TEvReportBaseStatistics, EvReportBaseStatistics> {};

struct TEvReportExecutorStatistics: public TEventLocal<TEvReportExecutorStatistics, EvReportExecutorStatistics> {};
struct TEvBuildStatisticsPipe: public TEventLocal<TEvBuildStatisticsPipe, EvBuildStatisticsPipe> {};

struct TEvPingSnapshotsUsage: public TEventLocal<TEvPingSnapshotsUsage, EvPingSnapshotsUsage> {
TEvPingSnapshotsUsage() = default;
Expand Down Expand Up @@ -354,7 +355,7 @@ struct TEvPrivate {
return WritesBuffer;
}
};

struct TEvReportScanDiagnostics: public TEventLocal<TEvReportScanDiagnostics, EvReportScanDiagnostics> {
TEvReportScanDiagnostics(TString&& requestMessage, TString&& dotGraph, TString&& ssaProgram, TString&& pkRangesFilter, bool isPublicScan)
: RequestMessage(std::move(requestMessage))
Expand Down
Loading