Skip to content

Commit a833a43

Browse files
sys view for granules (#3498)
1 parent c97ef92 commit a833a43

File tree

15 files changed

+245
-54
lines changed

15 files changed

+245
-54
lines changed

ydb/core/sys_view/common/schema.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,8 +217,10 @@ class TSystemViewResolver : public ISystemViewResolver {
217217

218218
RegisterOlapStoreSystemView<Schema::PrimaryIndexStats>(StorePrimaryIndexStatsName);
219219
RegisterOlapStoreSystemView<Schema::PrimaryIndexPortionStats>(StorePrimaryIndexPortionStatsName);
220+
RegisterOlapStoreSystemView<Schema::PrimaryIndexGranuleStats>(StorePrimaryIndexGranuleStatsName);
220221
RegisterColumnTableSystemView<Schema::PrimaryIndexStats>(TablePrimaryIndexStatsName);
221222
RegisterColumnTableSystemView<Schema::PrimaryIndexPortionStats>(TablePrimaryIndexPortionStatsName);
223+
RegisterColumnTableSystemView<Schema::PrimaryIndexGranuleStats>(TablePrimaryIndexGranuleStatsName);
222224

223225
RegisterSystemView<Schema::TopPartitions>(TopPartitions1MinuteName);
224226
RegisterSystemView<Schema::TopPartitions>(TopPartitions1HourName);

ydb/core/sys_view/common/schema.h

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,10 @@ constexpr TStringBuf QueryMetricsName = "query_metrics_one_minute";
3333

3434
constexpr TStringBuf StorePrimaryIndexStatsName = "store_primary_index_stats";
3535
constexpr TStringBuf StorePrimaryIndexPortionStatsName = "store_primary_index_portion_stats";
36+
constexpr TStringBuf StorePrimaryIndexGranuleStatsName = "store_primary_index_granule_stats";
3637
constexpr TStringBuf TablePrimaryIndexStatsName = "primary_index_stats";
3738
constexpr TStringBuf TablePrimaryIndexPortionStatsName = "primary_index_portion_stats";
39+
constexpr TStringBuf TablePrimaryIndexGranuleStatsName = "primary_index_granule_stats";
3840

3941
constexpr TStringBuf TopPartitions1MinuteName = "top_partitions_one_minute";
4042
constexpr TStringBuf TopPartitions1HourName = "top_partitions_one_hour";
@@ -532,6 +534,23 @@ struct Schema : NIceDb::Schema {
532534
>;
533535
};
534536

537+
struct PrimaryIndexGranuleStats: Table<14> {
538+
struct PathId: Column<1, NScheme::NTypeIds::Uint64> {};
539+
struct TabletId: Column<2, NScheme::NTypeIds::Uint64> {};
540+
struct PortionsCount: Column<3, NScheme::NTypeIds::Uint64> {};
541+
struct HostName: Column<4, NScheme::NTypeIds::Utf8> {};
542+
struct NodeId: Column<5, NScheme::NTypeIds::Uint64> {};
543+
544+
using TKey = TableKey<PathId, TabletId>;
545+
using TColumns = TableColumns<
546+
PathId,
547+
TabletId,
548+
PortionsCount,
549+
HostName,
550+
NodeId
551+
>;
552+
};
553+
535554
};
536555

537556
bool MaybeSystemViewPath(const TVector<TString>& path);

ydb/core/tx/columnshard/engines/reader/sys_view/abstract/abstract.h

Lines changed: 65 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,47 @@
33
#include <ydb/core/tx/columnshard/engines/reader/abstract/read_metadata.h>
44
#include <ydb/core/tx/columnshard/engines/scheme/versions/versioned_index.h>
55
#include <ydb/core/tx/columnshard/engines/reader/abstract/abstract.h>
6+
#include <ydb/core/tx/columnshard/engines/storage/granule.h>
67

78
namespace NKikimr::NOlap::NReader::NSysView::NAbstract {
89

10+
class TGranuleMetaView {
11+
private:
12+
using TPortions = std::deque<std::shared_ptr<TPortionInfo>>;
13+
YDB_READONLY(ui64, PathId, 0);
14+
YDB_READONLY_DEF(TPortions, Portions);
15+
public:
16+
TGranuleMetaView(const TGranuleMeta& granule, const bool reverse)
17+
: PathId(granule.GetPathId())
18+
{
19+
for (auto&& i : granule.GetPortions()) {
20+
Portions.emplace_back(i.second);
21+
}
22+
23+
const auto predSort = [](const std::shared_ptr<TPortionInfo>& l, const std::shared_ptr<TPortionInfo>& r) {
24+
return l->GetPortionId() < r->GetPortionId();
25+
};
26+
27+
std::sort(Portions.begin(), Portions.end(), predSort);
28+
if (reverse) {
29+
std::reverse(Portions.begin(), Portions.end());
30+
}
31+
}
32+
33+
bool operator<(const TGranuleMetaView& item) const {
34+
return PathId < item.PathId;
35+
}
36+
37+
std::shared_ptr<TPortionInfo> PopFrontPortion() {
38+
if (Portions.empty()) {
39+
return nullptr;
40+
}
41+
auto result = Portions.front();
42+
Portions.pop_front();
43+
return result;
44+
}
45+
};
46+
947
struct TReadStatsMetadata: public TReadMetadataBase {
1048
private:
1149
using TBase = TReadMetadataBase;
@@ -15,9 +53,10 @@ struct TReadStatsMetadata: public TReadMetadataBase {
1553
const ui64 TabletId;
1654
std::vector<ui32> ReadColumnIds;
1755
std::vector<ui32> ResultColumnIds;
18-
std::deque<std::shared_ptr<TPortionInfo>> IndexPortions;
56+
std::deque<TGranuleMetaView> IndexGranules;
1957

20-
explicit TReadStatsMetadata(const std::shared_ptr<TVersionedIndex>& info, ui64 tabletId, const ESorting sorting, const TProgramContainer& ssaProgram, const std::shared_ptr<ISnapshotSchema>& schema, const TSnapshot& requestSnapshot)
58+
explicit TReadStatsMetadata(const std::shared_ptr<TVersionedIndex>& info, ui64 tabletId, const ESorting sorting,
59+
const TProgramContainer& ssaProgram, const std::shared_ptr<ISnapshotSchema>& schema, const TSnapshot& requestSnapshot)
2160
: TBase(info, sorting, ssaProgram, schema, requestSnapshot)
2261
, TabletId(tabletId) {
2362
}
@@ -63,31 +102,27 @@ class TStatsIterator : public TScanIteratorBase {
63102

64103
TStatsIterator(const NAbstract::TReadStatsMetadata::TConstPtr& readMetadata)
65104
: ReadMetadata(readMetadata)
66-
, Reverse(ReadMetadata->IsDescSorted())
67105
, KeySchema(MakeArrowSchema(StatsSchema.Columns, StatsSchema.KeyColumns))
68106
, ResultSchema(MakeArrowSchema(StatsSchema.Columns, ReadMetadata->ResultColumnIds))
69-
, IndexPortions(ReadMetadata->IndexPortions)
107+
, IndexGranules(ReadMetadata->IndexGranules)
70108
{
71109
if (ResultSchema->num_fields() == 0) {
72110
ResultSchema = KeySchema;
73111
}
74-
if (Reverse) {
75-
std::reverse(IndexPortions.begin(), IndexPortions.end());
76-
}
77112
}
78113

79114
bool Finished() const override {
80-
return IndexPortions.empty();
115+
return IndexGranules.empty();
81116
}
82117
protected:
83-
virtual void AppendStats(const std::vector<std::unique_ptr<arrow::ArrayBuilder>>& builders, const TPortionInfo& portion) const = 0;
84-
virtual ui32 GetConstructionRecordsCount(const TPortionInfo& portion) const = 0;
118+
virtual void AppendStats(const std::vector<std::unique_ptr<arrow::ArrayBuilder>>& builders, TGranuleMetaView& granule) const = 0;
119+
virtual ui32 PredictRecordsCount(const TGranuleMetaView& granule) const = 0;
85120
TReadStatsMetadata::TConstPtr ReadMetadata;
86121
const bool Reverse = false;
87122
std::shared_ptr<arrow::Schema> KeySchema;
88123
std::shared_ptr<arrow::Schema> ResultSchema;
89124

90-
std::deque<std::shared_ptr<TPortionInfo>> IndexPortions;
125+
std::deque<TGranuleMetaView> IndexGranules;
91126

92127
virtual TConclusion<std::optional<TPartialReadResult>> GetBatch() override {
93128
// Take next raw batch
@@ -117,31 +152,34 @@ class TStatsIterator : public TScanIteratorBase {
117152
}
118153

119154
std::shared_ptr<arrow::RecordBatch> FillStatsBatch() {
120-
std::vector<std::shared_ptr<TPortionInfo>> portions;
121-
ui32 recordsCount = 0;
122-
while (IndexPortions.size()) {
123-
auto& i = IndexPortions.front();
124-
recordsCount += GetConstructionRecordsCount(*i);
125-
portions.emplace_back(i);
126-
IndexPortions.pop_front();
127-
if (recordsCount > 10000) {
128-
break;
129-
}
130-
}
131155
std::vector<ui32> allColumnIds;
132156
for (const auto& c : StatsSchema.Columns) {
133157
allColumnIds.push_back(c.second.Id);
134158
}
135159
std::sort(allColumnIds.begin(), allColumnIds.end());
136160
auto schema = MakeArrowSchema(StatsSchema.Columns, allColumnIds);
137-
auto builders = NArrow::MakeBuilders(schema, recordsCount);
138161

139-
for (auto&& p : portions) {
140-
AppendStats(builders, *p);
162+
std::vector<std::unique_ptr<arrow::ArrayBuilder>> builders;
163+
if (IndexGranules.size()) {
164+
builders = NArrow::MakeBuilders(schema, PredictRecordsCount(IndexGranules.front()));
165+
AppendStats(builders, IndexGranules.front());
166+
if (IndexGranules.front().GetPortions().empty()) {
167+
IndexGranules.pop_front();
168+
}
169+
} else {
170+
builders = NArrow::MakeBuilders(schema);
141171
}
142-
143172
auto columns = NArrow::Finish(std::move(builders));
144-
return arrow::RecordBatch::Make(schema, recordsCount, columns);
173+
AFL_VERIFY(columns.size());
174+
std::optional<ui32> count;
175+
for (auto&& i : columns) {
176+
if (!count) {
177+
count = i->length();
178+
} else {
179+
AFL_VERIFY(*count == i->length());
180+
}
181+
}
182+
return arrow::RecordBatch::Make(schema, columns.front()->length(), columns);
145183
}
146184

147185
void ApplyRangePredicates(std::shared_ptr<arrow::RecordBatch>& batch) {

ydb/core/tx/columnshard/engines/reader/sys_view/chunks/chunks.cpp

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,4 +84,26 @@ std::shared_ptr<NKikimr::NOlap::NReader::NSysView::NAbstract::TReadStatsMetadata
8484
read.GetProgram(), index ? index->GetVersionedIndex().GetSchema(read.GetSnapshot()) : nullptr, read.GetSnapshot());
8585
}
8686

87+
void TStatsIterator::AppendStats(const std::vector<std::unique_ptr<arrow::ArrayBuilder>>& builders, NAbstract::TGranuleMetaView& granule) const {
88+
ui64 recordsCount = 0;
89+
while (auto portion = granule.PopFrontPortion()) {
90+
recordsCount += portion->GetRecords().size() + portion->GetIndexes().size();
91+
AppendStats(builders, *portion);
92+
if (recordsCount > 10000) {
93+
break;
94+
}
95+
}
96+
}
97+
98+
ui32 TStatsIterator::PredictRecordsCount(const NAbstract::TGranuleMetaView& granule) const {
99+
ui32 recordsCount = 0;
100+
for (auto&& portion : granule.GetPortions()) {
101+
recordsCount += portion->GetRecords().size() + portion->GetIndexes().size();
102+
if (recordsCount > 10000) {
103+
break;
104+
}
105+
}
106+
return recordsCount;
107+
}
108+
87109
}

ydb/core/tx/columnshard/engines/reader/sys_view/chunks/chunks.h

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,9 @@ class TReadStatsMetadata: public NAbstract::TReadStatsMetadata, std::enable_shar
2929
class TStatsIterator: public NAbstract::TStatsIterator<NKikimr::NSysView::Schema::PrimaryIndexStats> {
3030
private:
3131
using TBase = NAbstract::TStatsIterator<NKikimr::NSysView::Schema::PrimaryIndexStats>;
32-
virtual void AppendStats(const std::vector<std::unique_ptr<arrow::ArrayBuilder>>& builders, const TPortionInfo& portion) const override;
33-
virtual ui32 GetConstructionRecordsCount(const TPortionInfo& portion) const override {
34-
return portion.GetRecords().size() + portion.GetIndexes().size();
35-
}
36-
32+
virtual void AppendStats(const std::vector<std::unique_ptr<arrow::ArrayBuilder>>& builders, NAbstract::TGranuleMetaView& granule) const override;
33+
virtual ui32 PredictRecordsCount(const NAbstract::TGranuleMetaView& granule) const override;
34+
void AppendStats(const std::vector<std::unique_ptr<arrow::ArrayBuilder>>& builders, const TPortionInfo& portion) const;
3735
public:
3836
using TBase::TBase;
3937
};

ydb/core/tx/columnshard/engines/reader/sys_view/constructor/constructor.h

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -36,39 +36,39 @@ class TStatScannerConstructor: public IScannerConstructor {
3636
if (!logsIndex) {
3737
return dynamic_pointer_cast<TReadMetadataBase>(out);
3838
}
39-
THashMap<ui64, THashSet<ui64>> portionsInUse;
40-
const auto predStatSchema = [](const std::shared_ptr<TPortionInfo>& l, const std::shared_ptr<TPortionInfo>& r) {
41-
return std::tuple(l->GetPathId(), l->GetPortionId()) < std::tuple(r->GetPathId(), r->GetPortionId());
42-
};
39+
THashSet<ui64> pathIds;
4340
for (auto&& filter : read.PKRangesFilter) {
4441
const ui64 fromPathId = *filter.GetPredicateFrom().Get<arrow::UInt64Array>(0, 0, 1);
4542
const ui64 toPathId = *filter.GetPredicateTo().Get<arrow::UInt64Array>(0, 0, Max<ui64>());
46-
if (read.TableName.EndsWith(IIndexInfo::TABLE_INDEX_STATS_TABLE) || read.TableName.EndsWith(IIndexInfo::TABLE_INDEX_PORTION_STATS_TABLE)) {
47-
if (fromPathId <= read.PathId && toPathId >= read.PathId) {
43+
if (read.TableName.EndsWith(IIndexInfo::TABLE_INDEX_STATS_TABLE)
44+
|| read.TableName.EndsWith(IIndexInfo::TABLE_INDEX_PORTION_STATS_TABLE)
45+
|| read.TableName.EndsWith(IIndexInfo::TABLE_INDEX_GRANULE_STATS_TABLE)
46+
) {
47+
if (fromPathId <= read.PathId && read.PathId <= toPathId) {
4848
auto pathInfo = logsIndex->GetGranuleOptional(read.PathId);
4949
if (!pathInfo) {
5050
continue;
5151
}
52-
for (auto&& p : pathInfo->GetPortions()) {
53-
if (portionsInUse[read.PathId].emplace(p.first).second) {
54-
out->IndexPortions.emplace_back(p.second);
55-
}
52+
if (pathIds.emplace(pathInfo->GetPathId()).second) {
53+
out->IndexGranules.emplace_back(NAbstract::TGranuleMetaView(*pathInfo, out->IsDescSorted()));
5654
}
5755
}
58-
std::sort(out->IndexPortions.begin(), out->IndexPortions.end(), predStatSchema);
59-
} else if (read.TableName.EndsWith(IIndexInfo::STORE_INDEX_STATS_TABLE) || read.TableName.EndsWith(IIndexInfo::STORE_INDEX_PORTION_STATS_TABLE)) {
56+
} else if (read.TableName.EndsWith(IIndexInfo::STORE_INDEX_STATS_TABLE)
57+
|| read.TableName.EndsWith(IIndexInfo::STORE_INDEX_PORTION_STATS_TABLE)
58+
|| read.TableName.EndsWith(IIndexInfo::STORE_INDEX_GRANULE_STATS_TABLE)
59+
) {
6060
auto pathInfos = logsIndex->GetTables(fromPathId, toPathId);
6161
for (auto&& pathInfo : pathInfos) {
62-
for (auto&& p : pathInfo->GetPortions()) {
63-
if (portionsInUse[p.second->GetPathId()].emplace(p.first).second) {
64-
out->IndexPortions.emplace_back(p.second);
65-
}
62+
if (pathIds.emplace(pathInfo->GetPathId()).second) {
63+
out->IndexGranules.emplace_back(NAbstract::TGranuleMetaView(*pathInfo, out->IsDescSorted()));
6664
}
6765
}
68-
std::sort(out->IndexPortions.begin(), out->IndexPortions.end(), predStatSchema);
6966
}
7067
}
71-
68+
std::sort(out->IndexGranules.begin(), out->IndexGranules.end());
69+
if (out->IsDescSorted()) {
70+
std::reverse(out->IndexGranules.begin(), out->IndexGranules.end());
71+
}
7272
return dynamic_pointer_cast<TReadMetadataBase>(out);
7373
}
7474
public:
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
#include "granules.h"
2+
#include <ydb/core/formats/arrow/switch/switch_type.h>
3+
#include <ydb/core/tx/columnshard/blobs_action/common/const.h>
4+
#include <ydb/core/tx/columnshard/engines/reader/abstract/read_context.h>
5+
#include <util/system/hostname.h>
6+
7+
namespace NKikimr::NOlap::NReader::NSysView::NGranules {
8+
9+
void TStatsIterator::AppendStats(const std::vector<std::unique_ptr<arrow::ArrayBuilder>>& builders, NAbstract::TGranuleMetaView& granule) const {
10+
NArrow::Append<arrow::UInt64Type>(*builders[0], granule.GetPathId());
11+
NArrow::Append<arrow::UInt64Type>(*builders[1], ReadMetadata->TabletId);
12+
NArrow::Append<arrow::UInt64Type>(*builders[2], granule.GetPortions().size());
13+
NArrow::Append<arrow::StringType>(*builders[3], HostNameField);
14+
NArrow::Append<arrow::UInt64Type>(*builders[4], NActors::TActivationContext::AsActorContext().SelfID.NodeId());
15+
while (granule.PopFrontPortion()) {
16+
}
17+
}
18+
19+
std::unique_ptr<TScanIteratorBase> TReadStatsMetadata::StartScan(const std::shared_ptr<TReadContext>& readContext) const {
20+
return std::make_unique<TStatsIterator>(readContext->GetReadMetadataPtrVerifiedAs<TReadStatsMetadata>());
21+
}
22+
23+
std::vector<std::pair<TString, NKikimr::NScheme::TTypeInfo>> TReadStatsMetadata::GetKeyYqlSchema() const {
24+
return GetColumns(TStatsIterator::StatsSchema, TStatsIterator::StatsSchema.KeyColumns);
25+
}
26+
27+
std::shared_ptr<NAbstract::TReadStatsMetadata> TConstructor::BuildMetadata(const NColumnShard::TColumnShard* self, const TReadDescription& read) const {
28+
auto* index = self->GetIndexOptional();
29+
return std::make_shared<TReadStatsMetadata>(index ? index->CopyVersionedIndexPtr() : nullptr, self->TabletID(),
30+
IsReverse ? TReadMetadataBase::ESorting::DESC : TReadMetadataBase::ESorting::ASC,
31+
read.GetProgram(), index ? index->GetVersionedIndex().GetSchema(read.GetSnapshot()) : nullptr, read.GetSnapshot());
32+
}
33+
34+
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
#pragma once
2+
#include <ydb/core/sys_view/common/schema.h>
3+
#include <ydb/core/tx/columnshard/engines/reader/sys_view/abstract/abstract.h>
4+
#include <ydb/core/tx/columnshard/engines/reader/sys_view/constructor/constructor.h>
5+
#include <util/system/hostname.h>
6+
7+
namespace NKikimr::NOlap::NReader::NSysView::NGranules {
8+
9+
class TConstructor: public TStatScannerConstructor<NKikimr::NSysView::Schema::PrimaryIndexGranuleStats> {
10+
private:
11+
using TBase = TStatScannerConstructor<NKikimr::NSysView::Schema::PrimaryIndexGranuleStats>;
12+
protected:
13+
virtual std::shared_ptr<NAbstract::TReadStatsMetadata> BuildMetadata(const NColumnShard::TColumnShard* self, const TReadDescription& read) const override;
14+
15+
public:
16+
using TBase::TBase;
17+
};
18+
19+
struct TReadStatsMetadata: public NAbstract::TReadStatsMetadata {
20+
private:
21+
using TBase = NAbstract::TReadStatsMetadata;
22+
using TSysViewSchema = NKikimr::NSysView::Schema::PrimaryIndexGranuleStats;
23+
public:
24+
using TBase::TBase;
25+
26+
virtual std::unique_ptr<TScanIteratorBase> StartScan(const std::shared_ptr<TReadContext>& readContext) const override;
27+
virtual std::vector<std::pair<TString, NScheme::TTypeInfo>> GetKeyYqlSchema() const override;
28+
};
29+
30+
class TStatsIterator : public NAbstract::TStatsIterator<NKikimr::NSysView::Schema::PrimaryIndexGranuleStats> {
31+
private:
32+
const std::string HostNameField = HostName();
33+
using TBase = NAbstract::TStatsIterator<NKikimr::NSysView::Schema::PrimaryIndexGranuleStats>;
34+
virtual ui32 PredictRecordsCount(const NAbstract::TGranuleMetaView& /*granule*/) const override {
35+
return 1;
36+
}
37+
virtual void AppendStats(const std::vector<std::unique_ptr<arrow::ArrayBuilder>>& builders, NAbstract::TGranuleMetaView& granule) const override;
38+
public:
39+
using TBase::TBase;
40+
};
41+
42+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
LIBRARY()
2+
3+
PEERDIR(
4+
ydb/core/tx/columnshard/engines/reader/sys_view/abstract
5+
)
6+
7+
SRCS(
8+
granules.cpp
9+
)
10+
11+
END()
12+

0 commit comments

Comments
 (0)