Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 54 additions & 24 deletions ydb/core/protos/statistics.proto
Original file line number Diff line number Diff line change
Expand Up @@ -68,56 +68,85 @@ message TEvPropagateStatisticsResponse {
message TEvStatisticsIsDisabled {
}

message TEvScanTable {
optional NKikimrProto.TPathID PathId = 1;
enum EColumnStatisticType {
TYPE_UNSPECIFIED = 0;
TYPE_COUNT_MIN_SKETCH = 1;
}

// table to gather statistics from
message TTable {
optional NKikimrProto.TPathID PathId = 1; // table path
repeated uint32 ColumnTags = 2; // list of columns to gather statistics from. Empty means asking for every column.
}

message TEvScanTableAccepted {
optional uint64 OperationId = 1;
// KQP -> SA
message TEvAnalyze {
optional uint64 Cookie = 1; // request cookie to match response item
repeated TTable Tables = 2; // list of analyzed tables and columns
repeated EColumnStatisticType Types = 3; // list of statistics types requested. Empty means asking for all available.
}

message TEvScanTableResponse {
// SA -> KQP
message TEvAnalyzeResponse {
optional uint64 Cookie = 1;
}

message TEvGetScanStatus {
// KQP -> SA
message TEvAnalyzeStatus {
optional NKikimrProto.TPathID PathId = 1;
}

message TEvGetScanStatusResponse {
// SA -> KQP
message TEvAnalyzeStatusResponse {
optional NKikimrProto.TPathID PathId = 1;

enum EStatus {
NO_OPERATION = 0;
ENQUEUED = 1;
IN_PROGRESS = 2;
STATUS_UNSPECIFIED = 0;
STATUS_NO_OPERATION = 1;
STATUS_ENQUEUED = 2;
STATUS_IN_PROGRESS = 3;
}
optional EStatus Status = 1;
optional EStatus Status = 2;
}

// SA -> Shard
message TEvAnalyzeTable {
optional TTable Table = 1; // analyzed table
repeated EColumnStatisticType Types = 2; // list of statistics types requested. Empty means asking for all available.
}

// Shard -> SA
message TEvAnalyzeTableResponse {
optional NKikimrProto.TPathID PathId = 1;
}


message TEvStatisticsRequest {
optional NKikimrDataEvents.TTableId TableId = 1;
optional TTable Table = 1;

optional bytes StartKey = 2;
// list of columns to gather statistics from. Empty means asking for every column.
repeated uint32 ColumnTags = 3;
// list of statistics types requested. Empty means asking for all available.
repeated uint32 Types = 4;

repeated EColumnStatisticType Types = 3;
}

message TStatistic {
optional uint32 Type = 1;
optional bytes Data = 2;
}

message TColumn {
message TColumnStatistics {
optional uint32 Tag = 1;
repeated TStatistic Statistics = 2;
}

message TEvStatisticsResponse {
repeated TColumn Columns = 1;
repeated TColumnStatistics Columns = 1;

enum EStatus {
SUCCESS = 1;
ABORTED = 2;
ERROR = 3;
STATUS_UNSPECIFIED = 0;
STATUS_SUCCESS = 1;
STATUS_ABORTED = 2;
STATUS_ERROR = 3;
}
optional EStatus Status = 2;
optional fixed64 ShardTabletId = 3;
Expand All @@ -144,10 +173,11 @@ message TEvAggregateKeepAliveAck {

message TEvAggregateStatisticsResponse {
optional uint64 Round = 1;
repeated TColumn Columns = 2;
repeated TColumnStatistics Columns = 2;
enum EErrorType {
UnavailableNode = 1;
NonLocalTablet = 2;
TYPE_UNSPECIFIED = 0;
TYPE_UNAVAILABLE_NODE = 1;
TYPE_NON_LOCAL_TABLET = 2;
}
message TFailedTablet {
optional EErrorType Error = 1;
Expand Down
15 changes: 8 additions & 7 deletions ydb/core/statistics/aggregator/aggregator_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -431,21 +431,21 @@ void TStatisticsAggregator::Handle(TEvStatistics::TEvStatTableCreationResponse::
}
}

void TStatisticsAggregator::Handle(TEvStatistics::TEvGetScanStatus::TPtr& ev) {
void TStatisticsAggregator::Handle(TEvStatistics::TEvAnalyzeStatus::TPtr& ev) {
auto& inRecord = ev->Get()->Record;
auto pathId = PathIdFromPathId(inRecord.GetPathId());

auto response = std::make_unique<TEvStatistics::TEvGetScanStatusResponse>();
auto response = std::make_unique<TEvStatistics::TEvAnalyzeStatusResponse>();
auto& outRecord = response->Record;

if (ScanTableId.PathId == pathId) {
outRecord.SetStatus(NKikimrStat::TEvGetScanStatusResponse::IN_PROGRESS);
outRecord.SetStatus(NKikimrStat::TEvAnalyzeStatusResponse::STATUS_IN_PROGRESS);
} else {
auto it = ScanOperationsByPathId.find(pathId);
if (it != ScanOperationsByPathId.end()) {
outRecord.SetStatus(NKikimrStat::TEvGetScanStatusResponse::ENQUEUED);
outRecord.SetStatus(NKikimrStat::TEvAnalyzeStatusResponse::STATUS_ENQUEUED);
} else {
outRecord.SetStatus(NKikimrStat::TEvGetScanStatusResponse::NO_OPERATION);
outRecord.SetStatus(NKikimrStat::TEvAnalyzeStatusResponse::STATUS_NO_OPERATION);
}
}
Send(ev->Sender, response.release(), 0, ev->Cookie);
Expand Down Expand Up @@ -518,8 +518,9 @@ void TStatisticsAggregator::NextRange() {
auto& range = ShardRanges.front();
auto request = std::make_unique<NStat::TEvStatistics::TEvStatisticsRequest>();
auto& record = request->Record;
record.MutableTableId()->SetOwnerId(ScanTableId.PathId.OwnerId);
record.MutableTableId()->SetTableId(ScanTableId.PathId.LocalPathId);
auto* path = record.MutableTable()->MutablePathId();
path->SetOwnerId(ScanTableId.PathId.OwnerId);
path->SetLocalId(ScanTableId.PathId.LocalPathId);
record.SetStartKey(StartKey.GetBuffer());

Send(MakePipePerNodeCacheID(false),
Expand Down
8 changes: 4 additions & 4 deletions ydb/core/statistics/aggregator/aggregator_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ class TStatisticsAggregator : public TActor<TStatisticsAggregator>, public NTabl
size_t PropagatePart(const std::vector<TNodeId>& nodeIds, const std::vector<TSSId>& ssIds,
size_t lastSSIndex, bool useSizeLimit);

void Handle(TEvStatistics::TEvScanTable::TPtr& ev);
void Handle(TEvStatistics::TEvAnalyze::TPtr& ev);
void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev);
void Handle(TEvTxProxySchemeCache::TEvResolveKeySetResult::TPtr& ev);
void Handle(NStat::TEvStatistics::TEvStatisticsResponse::TPtr& ev);
Expand All @@ -129,7 +129,7 @@ class TStatisticsAggregator : public TActor<TStatisticsAggregator>, public NTabl
void Handle(TEvStatistics::TEvSaveStatisticsQueryResponse::TPtr& ev);
void Handle(TEvStatistics::TEvDeleteStatisticsQueryResponse::TPtr& ev);
void Handle(TEvPrivate::TEvScheduleScan::TPtr& ev);
void Handle(TEvStatistics::TEvGetScanStatus::TPtr& ev);
void Handle(TEvStatistics::TEvAnalyzeStatus::TPtr& ev);
void Handle(TEvHive::TEvResponseTabletDistribution::TPtr& ev);
void Handle(TEvStatistics::TEvAggregateStatisticsResponse::TPtr& ev);
void Handle(TEvPrivate::TEvResolve::TPtr& ev);
Expand Down Expand Up @@ -176,7 +176,7 @@ class TStatisticsAggregator : public TActor<TStatisticsAggregator>, public NTabl
hFunc(TEvPrivate::TEvProcessUrgent, Handle);
hFunc(TEvPrivate::TEvPropagateTimeout, Handle);

hFunc(TEvStatistics::TEvScanTable, Handle);
hFunc(TEvStatistics::TEvAnalyze, Handle);
hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle);
hFunc(TEvTxProxySchemeCache::TEvResolveKeySetResult, Handle);
hFunc(NStat::TEvStatistics::TEvStatisticsResponse, Handle);
Expand All @@ -185,7 +185,7 @@ class TStatisticsAggregator : public TActor<TStatisticsAggregator>, public NTabl
hFunc(TEvStatistics::TEvSaveStatisticsQueryResponse, Handle);
hFunc(TEvStatistics::TEvDeleteStatisticsQueryResponse, Handle);
hFunc(TEvPrivate::TEvScheduleScan, Handle);
hFunc(TEvStatistics::TEvGetScanStatus, Handle);
hFunc(TEvStatistics::TEvAnalyzeStatus, Handle);
hFunc(TEvHive::TEvResponseTabletDistribution, Handle);
hFunc(TEvStatistics::TEvAggregateStatisticsResponse, Handle);
hFunc(TEvPrivate::TEvResolve, Handle);
Expand Down
8 changes: 6 additions & 2 deletions ydb/core/statistics/aggregator/tx_aggr_stat_response.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,16 @@ struct TStatisticsAggregator::TTxAggregateStatisticsResponse : public TTxBase {
for (auto& tablet : Record.GetFailedTablets()) {
auto error = tablet.GetError();
switch (error) {
case NKikimrStat::TEvAggregateStatisticsResponse::UnavailableNode:
case NKikimrStat::TEvAggregateStatisticsResponse::TYPE_UNSPECIFIED:
SA_LOG_CRIT("[" << Self->TabletID() << "] Unspecified TEvAggregateStatisticsResponse status");
return false;

case NKikimrStat::TEvAggregateStatisticsResponse::TYPE_UNAVAILABLE_NODE:
Self->TabletsForReqDistribution.insert(tablet.GetTabletId());
Action = EAction::SendReqDistribution;
break;

case NKikimrStat::TEvAggregateStatisticsResponse::NonLocalTablet:
case NKikimrStat::TEvAggregateStatisticsResponse::TYPE_NON_LOCAL_TABLET:
auto nodeId = tablet.GetNodeId();
if (nodeId == 0) {
// we cannot reach this tablet
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ struct TStatisticsAggregator::TTxDeleteQueryResponse : public TTxBase {
SA_LOG_D("[" << Self->TabletID() << "] TTxDeleteQueryResponse::Complete");

for (auto& id : ReplyToActorIds) {
ctx.Send(id, new TEvStatistics::TEvScanTableResponse);
ctx.Send(id, new TEvStatistics::TEvAnalyzeResponse);
}
}
};
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/statistics/aggregator/tx_save_query_response.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ struct TStatisticsAggregator::TTxSaveQueryResponse : public TTxBase {
SA_LOG_D("[" << Self->TabletID() << "] TTxSaveQueryResponse::Complete");

for (auto& id : ReplyToActorIds) {
ctx.Send(id, new TEvStatistics::TEvScanTableResponse);
ctx.Send(id, new TEvStatistics::TEvAnalyzeResponse);
}
}
};
Expand Down
36 changes: 15 additions & 21 deletions ydb/core/statistics/aggregator/tx_scan_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@
namespace NKikimr::NStat {

struct TStatisticsAggregator::TTxScanTable : public TTxBase {
NKikimrStat::TEvScanTable Record;
TPathId PathId;
TActorId ReplyToActorId;
ui64 OperationId = 0;

TTxScanTable(TSelf* self, NKikimrStat::TEvScanTable&& record, TActorId replyToActorId)
TTxScanTable(TSelf* self, const TPathId& pathId, TActorId replyToActorId)
: TTxBase(self)
, Record(std::move(record))
, PathId(pathId)
, ReplyToActorId(replyToActorId)
{}

Expand All @@ -24,9 +24,7 @@ struct TStatisticsAggregator::TTxScanTable : public TTxBase {
return true;
}

auto pathId = PathIdFromPathId(Record.GetPathId());

auto itOp = Self->ScanOperationsByPathId.find(pathId);
auto itOp = Self->ScanOperationsByPathId.find(PathId);
if (itOp != Self->ScanOperationsByPathId.end()) {
itOp->second.ReplyToActorIds.insert(ReplyToActorId);
OperationId = itOp->second.OperationId;
Expand All @@ -35,17 +33,17 @@ struct TStatisticsAggregator::TTxScanTable : public TTxBase {

NIceDb::TNiceDb db(txc.DB);

TScanOperation& operation = Self->ScanOperationsByPathId[pathId];
operation.PathId = pathId;
TScanOperation& operation = Self->ScanOperationsByPathId[PathId];
operation.PathId = PathId;
operation.OperationId = ++Self->LastScanOperationId;
operation.ReplyToActorIds.insert(ReplyToActorId);
Self->ScanOperations.PushBack(&operation);

Self->PersistLastScanOperationId(db);

db.Table<Schema::ScanOperations>().Key(operation.OperationId).Update(
NIceDb::TUpdate<Schema::ScanOperations::OwnerId>(pathId.OwnerId),
NIceDb::TUpdate<Schema::ScanOperations::LocalPathId>(pathId.LocalPathId));
NIceDb::TUpdate<Schema::ScanOperations::OwnerId>(PathId.OwnerId),
NIceDb::TUpdate<Schema::ScanOperations::LocalPathId>(PathId.LocalPathId));

OperationId = operation.OperationId;

Expand All @@ -54,21 +52,17 @@ struct TStatisticsAggregator::TTxScanTable : public TTxBase {

void Complete(const TActorContext& ctx) override {
SA_LOG_D("[" << Self->TabletID() << "] TTxScanTable::Complete");
}
};

if (!Self->EnableColumnStatistics) {
return;
}
void TStatisticsAggregator::Handle(TEvStatistics::TEvAnalyze::TPtr& ev) {
const auto& record = ev->Get()->Record;

auto accepted = std::make_unique<TEvStatistics::TEvScanTableAccepted>();
accepted->Record.SetOperationId(OperationId);
ctx.Send(ReplyToActorId, accepted.release());
// TODO: replace by queue
for (const auto& table : record.GetTables()) {
Execute(new TTxScanTable(this, PathIdFromPathId(table.GetPathId()), ev->Sender), TActivationContext::AsActorContext());
}
};

void TStatisticsAggregator::Handle(TEvStatistics::TEvScanTable::TPtr& ev) {
auto& record = ev->Get()->Record;
Execute(new TTxScanTable(this, std::move(record), ev->Sender),
TActivationContext::AsActorContext());
}

} // NKikimr::NStat
29 changes: 21 additions & 8 deletions ydb/core/statistics/aggregator/ut/ut_aggregator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,18 +175,18 @@ Y_UNIT_TEST_SUITE(StatisticsAggregator) {
runtime.SimulateSleep(TDuration::Seconds(5));
initThread.join();

ui64 tabletId = 0;
ui64 tabletId;
auto pathId = ResolvePathId(runtime, "/Root/Database/Table", nullptr, &tabletId);

runtime.SimulateSleep(TDuration::Seconds(30));

auto ev = std::make_unique<TEvStatistics::TEvScanTable>();
auto ev = std::make_unique<TEvStatistics::TEvAnalyze>();
auto& record = ev->Record;
PathIdFromPathId(pathId, record.MutablePathId());
PathIdFromPathId(pathId, record.AddTables()->MutablePathId());

auto sender = runtime.AllocateEdgeActor();
runtime.SendToPipe(tabletId, sender, ev.release());
runtime.GrabEdgeEventRethrow<TEvStatistics::TEvScanTableResponse>(sender);
runtime.GrabEdgeEventRethrow<TEvStatistics::TEvAnalyzeResponse>(sender);

ValidateCountMin(runtime, pathId);
}
Expand All @@ -198,17 +198,30 @@ Y_UNIT_TEST_SUITE(StatisticsAggregator) {
CreateUniformTable(env, "Database", "Table1");
CreateUniformTable(env, "Database", "Table2");
};
// TODO remove thread
std::thread initThread(init);

auto& runtime = *env.GetServer().GetRuntime();
runtime.SimulateSleep(TDuration::Seconds(5));
initThread.join();

runtime.SimulateSleep(TDuration::Seconds(60));
// TODO remove sleep
runtime.SimulateSleep(TDuration::Seconds(30));

auto pathId1 = ResolvePathId(runtime, "/Root/Database/Table1");
ui64 tabletId1;
auto pathId1 = ResolvePathId(runtime, "/Root/Database/Table1", nullptr, &tabletId1);
auto pathId2 = ResolvePathId(runtime, "/Root/Database/Table2");

auto ev = std::make_unique<TEvStatistics::TEvAnalyze>();
auto& record = ev->Record;
PathIdFromPathId(pathId1, record.AddTables()->MutablePathId());
PathIdFromPathId(pathId2, record.AddTables()->MutablePathId());

auto sender = runtime.AllocateEdgeActor();
runtime.SendToPipe(tabletId1, sender, ev.release());
runtime.GrabEdgeEventRethrow<TEvStatistics::TEvAnalyzeResponse>(sender);
runtime.GrabEdgeEventRethrow<TEvStatistics::TEvAnalyzeResponse>(sender);

ValidateCountMin(runtime, pathId1);
ValidateCountMin(runtime, pathId2);
}
Expand Down Expand Up @@ -333,9 +346,9 @@ Y_UNIT_TEST_SUITE(StatisticsAggregator) {
runtime.SimulateSleep(TDuration::Seconds(5));
init2Thread.join();

auto ev = std::make_unique<TEvStatistics::TEvScanTable>();
auto ev = std::make_unique<TEvStatistics::TEvAnalyze>();
auto& record = ev->Record;
PathIdFromPathId(pathId, record.MutablePathId());
PathIdFromPathId(pathId, record.AddTables()->MutablePathId());

auto sender = runtime.AllocateEdgeActor();
runtime.SendToPipe(tabletId, sender, ev.release());
Expand Down
Loading