Skip to content

Commit 3c6f759

Browse files
authored
Merge 0b57472 into 186c199
2 parents 186c199 + 0b57472 commit 3c6f759

14 files changed

+235
-45
lines changed

ydb/core/protos/counters_statistics_aggregator.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,4 +17,5 @@ enum ETxTypes {
1717
TXTYPE_SCAN_RESPONSE = 7 [(TxTypeOpts) = {Name: "TxScanResponse"}];
1818
TXTYPE_SAVE_QUERY_RESPONSE = 8 [(TxTypeOpts) = {Name: "TxSaveQueryResponse"}];
1919
TXTYPE_SCHEDULE_SCAN = 9 [(TxTypeOpts) = {Name: "TxScheduleScan"}];
20+
TXTYPE_DELETE_QUERY_RESPONSE = 10 [(TxTypeOpts) = {Name: "TxDeleteQueryResponse"}];
2021
}

ydb/core/statistics/aggregator/aggregator_impl.cpp

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -407,6 +407,10 @@ void TStatisticsAggregator::Handle(TEvStatistics::TEvStatTableCreationResponse::
407407
PendingSaveStatistics = false;
408408
SaveStatisticsToTable();
409409
}
410+
if (PendingDeleteStatistics) {
411+
PendingDeleteStatistics = false;
412+
DeleteStatisticsFromTable();
413+
}
410414
}
411415

412416
void TStatisticsAggregator::Initialize() {
@@ -484,6 +488,17 @@ void TStatisticsAggregator::SaveStatisticsToTable() {
484488
std::move(columnNames), std::move(data)));
485489
}
486490

491+
void TStatisticsAggregator::DeleteStatisticsFromTable() {
492+
if (!IsStatisticsTableCreated) {
493+
PendingDeleteStatistics = true;
494+
return;
495+
}
496+
497+
PendingDeleteStatistics = false;
498+
499+
Register(CreateDeleteStatisticsQuery(ScanTableId.PathId));
500+
}
501+
487502
void TStatisticsAggregator::ScheduleNextScan() {
488503
while (!ScanTablesByTime.empty()) {
489504
auto& topTable = ScanTablesByTime.top();
@@ -539,6 +554,38 @@ void TStatisticsAggregator::ResetScanState(NIceDb::TNiceDb& db) {
539554
ColumnNames.clear();
540555
}
541556

557+
void TStatisticsAggregator::RescheduleScanTable(NIceDb::TNiceDb& db) {
558+
if (ScanTablesByTime.empty()) {
559+
return;
560+
}
561+
auto& topTable = ScanTablesByTime.top();
562+
auto pathId = topTable.PathId;
563+
if (pathId == ScanTableId.PathId) {
564+
TScanTable scanTable;
565+
scanTable.PathId = pathId;
566+
scanTable.SchemeShardId = topTable.SchemeShardId;
567+
scanTable.LastUpdateTime = ScanStartTime;
568+
569+
ScanTablesByTime.pop();
570+
ScanTablesByTime.push(scanTable);
571+
572+
db.Table<Schema::ScanTables>().Key(pathId.OwnerId, pathId.LocalPathId).Update(
573+
NIceDb::TUpdate<Schema::ScanTables::LastUpdateTime>(ScanStartTime.MicroSeconds()));
574+
}
575+
}
576+
577+
void TStatisticsAggregator::DropScanTable(NIceDb::TNiceDb& db) {
578+
if (ScanTablesByTime.empty()) {
579+
return;
580+
}
581+
auto& topTable = ScanTablesByTime.top();
582+
auto pathId = topTable.PathId;
583+
if (pathId == ScanTableId.PathId) {
584+
ScanTablesByTime.pop();
585+
db.Table<Schema::ScanTables>().Key(pathId.OwnerId, pathId.LocalPathId).Delete();
586+
}
587+
}
588+
542589
template <typename T, typename S>
543590
void PrintContainerStart(const T& container, size_t count, TStringStream& str,
544591
std::function<S(const typename T::value_type&)> extractor)

ydb/core/statistics/aggregator/aggregator_impl.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ class TStatisticsAggregator : public TActor<TStatisticsAggregator>, public NTabl
4747
struct TTxStatisticsScanResponse;
4848
struct TTxSaveQueryResponse;
4949
struct TTxScheduleScan;
50+
struct TTxDeleteQueryResponse;
5051

5152
struct TEvPrivate {
5253
enum EEv {
@@ -107,20 +108,24 @@ class TStatisticsAggregator : public TActor<TStatisticsAggregator>, public NTabl
107108
void Handle(TEvPipeCache::TEvDeliveryProblem::TPtr& ev);
108109
void Handle(TEvStatistics::TEvStatTableCreationResponse::TPtr& ev);
109110
void Handle(TEvStatistics::TEvSaveStatisticsQueryResponse::TPtr& ev);
111+
void Handle(TEvStatistics::TEvDeleteStatisticsQueryResponse::TPtr& ev);
110112
void Handle(TEvPrivate::TEvScheduleScan::TPtr& ev);
111113

112114
void Initialize();
113115
void Navigate();
114116
void Resolve();
115117
void NextRange();
116118
void SaveStatisticsToTable();
119+
void DeleteStatisticsFromTable();
117120
void ScheduleNextScan();
118121

119122
void PersistSysParam(NIceDb::TNiceDb& db, ui64 id, const TString& value);
120123
void PersistScanTableId(NIceDb::TNiceDb& db);
121124
void PersistScanStartTime(NIceDb::TNiceDb& db);
122125

123126
void ResetScanState(NIceDb::TNiceDb& db);
127+
void RescheduleScanTable(NIceDb::TNiceDb& db);
128+
void DropScanTable(NIceDb::TNiceDb& db);
124129

125130
STFUNC(StateInit) {
126131
StateInitImpl(ev, SelfId());
@@ -203,6 +208,7 @@ class TStatisticsAggregator : public TActor<TStatisticsAggregator>, public NTabl
203208

204209
bool IsStatisticsTableCreated = false;
205210
bool PendingSaveStatistics = false;
211+
bool PendingDeleteStatistics = false;
206212

207213
std::vector<NScheme::TTypeInfo> KeyColumnTypes;
208214
TVector<TKeyDesc::TColumnOp> Columns;
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
#include "aggregator_impl.h"
2+
3+
#include <ydb/core/tx/datashard/datashard.h>
4+
5+
namespace NKikimr::NStat {
6+
7+
struct TStatisticsAggregator::TTxDeleteQueryResponse : public TTxBase {
8+
9+
TTxDeleteQueryResponse(TSelf* self)
10+
: TTxBase(self)
11+
{}
12+
13+
TTxType GetTxType() const override { return TXTYPE_DELETE_QUERY_RESPONSE; }
14+
15+
bool Execute(TTransactionContext& txc, const TActorContext&) override {
16+
SA_LOG_D("[" << Self->TabletID() << "] TTxDeleteQueryResponse::Execute");
17+
18+
NIceDb::TNiceDb db(txc.DB);
19+
20+
Self->ResetScanState(db);
21+
22+
return true;
23+
}
24+
25+
void Complete(const TActorContext&) override {
26+
SA_LOG_D("[" << Self->TabletID() << "] TTxDeleteQueryResponse::Complete");
27+
28+
Self->ScheduleNextScan();
29+
}
30+
};
31+
void TStatisticsAggregator::Handle(TEvStatistics::TEvDeleteStatisticsQueryResponse::TPtr&) {
32+
Execute(new TTxDeleteQueryResponse(this), TActivationContext::AsActorContext());
33+
}
34+
35+
} // NKikimr::NStat

ydb/core/statistics/aggregator/tx_navigate.cpp

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,12 @@ struct TStatisticsAggregator::TTxNavigate : public TTxBase {
2626
if (entry.Status != NSchemeCache::TSchemeCacheNavigate::EStatus::Ok) {
2727
Cancelled = true;
2828

29-
if (!Self->ScanTablesByTime.empty()) {
30-
auto& topTable = Self->ScanTablesByTime.top();
31-
auto pathId = topTable.PathId;
32-
if (pathId == Self->ScanTableId.PathId) {
33-
Self->ScanTablesByTime.pop();
34-
db.Table<Schema::ScanTables>().Key(pathId.OwnerId, pathId.LocalPathId).Delete();
35-
}
29+
if (entry.Status == NSchemeCache::TSchemeCacheNavigate::EStatus::PathErrorUnknown) {
30+
Self->DropScanTable(db);
31+
Self->DeleteStatisticsFromTable();
32+
} else {
33+
Self->RescheduleScanTable(db);
34+
Self->ScheduleNextScan();
3635
}
3736

3837
Self->ResetScanState(db);
@@ -70,7 +69,6 @@ struct TStatisticsAggregator::TTxNavigate : public TTxBase {
7069
SA_LOG_D("[" << Self->TabletID() << "] TTxNavigate::Complete");
7170

7271
if (Cancelled) {
73-
Self->ScheduleNextScan();
7472
return;
7573
}
7674

ydb/core/statistics/aggregator/tx_resolve.cpp

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,12 @@ struct TStatisticsAggregator::TTxResolve : public TTxBase {
2626
if (entry.Status != NSchemeCache::TSchemeCacheRequest::EStatus::OkData) {
2727
Cancelled = true;
2828

29-
if (!Self->ScanTablesByTime.empty()) {
30-
auto& topTable = Self->ScanTablesByTime.top();
31-
auto pathId = topTable.PathId;
32-
if (pathId == Self->ScanTableId.PathId) {
33-
Self->ScanTablesByTime.pop();
34-
db.Table<Schema::ScanTables>().Key(pathId.OwnerId, pathId.LocalPathId).Delete();
35-
}
29+
if (entry.Status == NSchemeCache::TSchemeCacheRequest::EStatus::PathErrorNotExist) {
30+
Self->DropScanTable(db);
31+
Self->DeleteStatisticsFromTable();
32+
} else {
33+
Self->RescheduleScanTable(db);
34+
Self->ScheduleNextScan();
3635
}
3736

3837
Self->ResetScanState(db);
@@ -58,7 +57,6 @@ struct TStatisticsAggregator::TTxResolve : public TTxBase {
5857
SA_LOG_D("[" << Self->TabletID() << "] TTxResolve::Complete");
5958

6059
if (Cancelled) {
61-
Self->ScheduleNextScan();
6260
return;
6361
}
6462

ydb/core/statistics/aggregator/tx_save_query_response.cpp

Lines changed: 2 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -17,30 +17,8 @@ struct TStatisticsAggregator::TTxSaveQueryResponse : public TTxBase {
1717

1818
NIceDb::TNiceDb db(txc.DB);
1919

20-
if (!Self->ScanTablesByTime.empty()) {
21-
auto& topTable = Self->ScanTablesByTime.top();
22-
auto pathId = topTable.PathId;
23-
if (pathId == Self->ScanTableId.PathId) {
24-
TScanTable scanTable;
25-
scanTable.PathId = pathId;
26-
scanTable.SchemeShardId = topTable.SchemeShardId;
27-
scanTable.LastUpdateTime = Self->ScanStartTime;
28-
29-
Self->ScanTablesByTime.pop();
30-
Self->ScanTablesByTime.push(scanTable);
31-
32-
db.Table<Schema::ScanTables>().Key(pathId.OwnerId, pathId.LocalPathId).Update(
33-
NIceDb::TUpdate<Schema::ScanTables::LastUpdateTime>(Self->ScanStartTime.MicroSeconds()));
34-
}
35-
}
36-
37-
Self->ScanTableId.PathId = TPathId();
38-
Self->PersistScanTableId(db);
39-
40-
for (auto& [tag, _] : Self->CountMinSketches) {
41-
db.Table<Schema::Statistics>().Key(tag).Delete();
42-
}
43-
Self->CountMinSketches.clear();
20+
Self->RescheduleScanTable(db);
21+
Self->ResetScanState(db);
4422

4523
return true;
4624
}

ydb/core/statistics/aggregator/tx_schemeshard_stats.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,9 @@ struct TStatisticsAggregator::TTxSchemeShardStats : public TTxBase {
6262
void Complete(const TActorContext&) override {
6363
SA_LOG_D("[" << Self->TabletID() << "] TTxSchemeShardStats::Complete");
6464

65-
Self->ScheduleNextScan();
65+
if (!Self->ScanTableId.PathId) {
66+
Self->ScheduleNextScan();
67+
}
6668
}
6769
};
6870

ydb/core/statistics/aggregator/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ SRCS(
1313
schema.h
1414
schema.cpp
1515
tx_configure.cpp
16+
tx_delete_query_response.cpp
1617
tx_init.cpp
1718
tx_init_schema.cpp
1819
tx_navigate.cpp

ydb/core/statistics/events.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,8 @@ struct TEvStatistics {
6969
EvScanTable,
7070
EvScanTableResponse,
7171

72+
EvDeleteStatisticsQueryResponse,
73+
7274
EvEnd
7375
};
7476

@@ -159,6 +161,13 @@ struct TEvStatistics {
159161
std::optional<TString> Data;
160162
};
161163

164+
struct TEvDeleteStatisticsQueryResponse : public TEventLocal<
165+
TEvDeleteStatisticsQueryResponse,
166+
EvDeleteStatisticsQueryResponse>
167+
{
168+
bool Success = true;
169+
};
170+
162171
struct TEvScanTable : public TEventPB<
163172
TEvScanTable,
164173
NKikimrStat::TEvScanTable,

0 commit comments

Comments
 (0)