Skip to content

Commit 07a1a26

Browse files
Merge f6ffdec into 916a423
2 parents 916a423 + f6ffdec commit 07a1a26

File tree

6 files changed

+127
-27
lines changed

6 files changed

+127
-27
lines changed

ydb/core/tx/columnshard/blobs_action/counters/storage.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ enum class EConsumer {
2626
WRITING_BUFFER,
2727
WRITING_OPERATOR,
2828
NORMALIZER,
29+
STATISTICS,
2930

3031
COUNT
3132
};

ydb/core/tx/columnshard/columnshard__statistics.cpp

Lines changed: 91 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,21 @@
11
#include "columnshard.h"
22
#include "columnshard_impl.h"
3+
34
#include "ydb/core/tx/columnshard/engines/storage/indexes/count_min_sketch/meta.h"
45

56
#include <ydb/core/protos/kqp.pb.h>
7+
#include <ydb/core/tx/columnshard/blobs_reader/actor.h>
68
#include <ydb/core/tx/columnshard/engines/column_engine_logs.h>
79

810
#include <yql/essentials/core/minsketch/count_min_sketch.h>
911

10-
1112
namespace NKikimr::NColumnShard {
1213

1314
void TColumnShard::Handle(NStat::TEvStatistics::TEvAnalyzeTable::TPtr& ev, const TActorContext&) {
1415
auto& requestRecord = ev->Get()->Record;
1516
// TODO Start a potentially long analysis process.
1617
// ...
1718

18-
19-
2019
// Return the response when the analysis is completed
2120
auto response = std::make_unique<NStat::TEvStatistics::TEvAnalyzeTableResponse>();
2221
auto& responseRecord = response->Record;
@@ -64,8 +63,7 @@ class TResultAccumulator {
6463
std::unique_ptr<NStat::TEvStatistics::TEvStatisticsResponse>&& response)
6564
: RequestSenderActorId(requestSenderActorId)
6665
, Cookie(cookie)
67-
, Response(std::move(response))
68-
{
66+
, Response(std::move(response)) {
6967
for (auto&& i : tags) {
7068
AFL_VERIFY(Calculated.emplace(i, nullptr).second);
7169
}
@@ -104,11 +102,11 @@ class TResultAccumulator {
104102
OnResultReady();
105103
}
106104
}
107-
108105
};
109106

110107
class TColumnPortionsAccumulator {
111108
private:
109+
const std::shared_ptr<NOlap::IStoragesManager> StoragesManager;
112110
const std::set<ui32> ColumnTagsRequested;
113111
std::vector<NOlap::TPortionInfo::TConstPtr> Portions;
114112
const ui32 PortionsCountLimit = 10000;
@@ -117,19 +115,66 @@ class TColumnPortionsAccumulator {
117115
const std::shared_ptr<NOlap::TVersionedIndex> VersionedIndex;
118116

119117
public:
120-
TColumnPortionsAccumulator(const std::shared_ptr<TResultAccumulator>& result, const ui32 portionsCountLimit,
121-
const std::set<ui32>& originalColumnTags, const std::shared_ptr<NOlap::TVersionedIndex>& vIndex,
118+
TColumnPortionsAccumulator(const std::shared_ptr<NOlap::IStoragesManager>& storagesManager,
119+
const std::shared_ptr<TResultAccumulator>& result, const ui32 portionsCountLimit, const std::set<ui32>& originalColumnTags,
120+
const std::shared_ptr<NOlap::TVersionedIndex>& vIndex,
122121
const std::shared_ptr<NOlap::NDataAccessorControl::IDataAccessorsManager>& dataAccessorsManager)
123-
: ColumnTagsRequested(originalColumnTags)
122+
: StoragesManager(storagesManager)
123+
, ColumnTagsRequested(originalColumnTags)
124124
, PortionsCountLimit(portionsCountLimit)
125125
, DataAccessors(dataAccessorsManager)
126126
, Result(result)
127-
, VersionedIndex(vIndex)
128-
{
127+
, VersionedIndex(vIndex) {
129128
}
130129

130+
class TIndexReadTask: public NOlap::NBlobOperations::NRead::ITask {
131+
private:
132+
using TBase = NOlap::NBlobOperations::NRead::ITask;
133+
const std::shared_ptr<TResultAccumulator> Result;
134+
THashMap<ui32, THashMap<TString, THashSet<NOlap::TBlobRange>>> RangesByColumn;
135+
THashMap<ui32, std::unique_ptr<TCountMinSketch>> SketchesByColumns;
136+
137+
protected:
138+
virtual void DoOnDataReady(const std::shared_ptr<NOlap::NResourceBroker::NSubscribe::TResourcesGuard>& /*resourcesGuard*/) override {
139+
NOlap::NBlobOperations::NRead::TCompositeReadBlobs blobsData = ExtractBlobsData();
140+
for (auto&& [columnId, data] : RangesByColumn) {
141+
for (auto&& [storageId, blobs] : data) {
142+
for (auto&& b : blobs) {
143+
const TString blob = blobsData.Extract(storageId, b);
144+
auto sketch = std::unique_ptr<TCountMinSketch>(TCountMinSketch::FromString(blob.data(), blob.size()));
145+
*SketchesByColumns[columnId] += *sketch;
146+
}
147+
}
148+
}
149+
Result->AddResult(std::move(SketchesByColumns));
150+
}
151+
152+
virtual bool DoOnError(
153+
const TString& storageId, const NOlap::TBlobRange& range, const NOlap::IBlobsReadingAction::TErrorStatus& status) override {
154+
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "DoOnError")("storage_id", storageId)("blob_id", range)(
155+
"status", status.GetErrorMessage())("status_code", status.GetStatus());
156+
AFL_VERIFY(status.GetStatus() != NKikimrProto::EReplyStatus::NODATA)("blob_id", range)("status", status.GetStatus())(
157+
"error", status.GetErrorMessage())("type", "STATISTICS");
158+
return false;
159+
}
160+
161+
public:
162+
TIndexReadTask(const std::shared_ptr<TResultAccumulator>& result,
163+
std::vector<std::shared_ptr<NOlap::IBlobsReadingAction>>&& readingActions,
164+
THashMap<ui32, THashMap<TString, THashSet<NOlap::TBlobRange>>>&& rangesByColumn,
165+
THashMap<ui32, std::unique_ptr<TCountMinSketch>>&& readySketches)
166+
: TBase(std::move(readingActions), "STATISTICS", "STATISTICS")
167+
, Result(result)
168+
, RangesByColumn(std::move(rangesByColumn))
169+
, SketchesByColumns(std::move(readySketches)) {
170+
AFL_VERIFY(!!Result);
171+
AFL_VERIFY(RangesByColumn.size());
172+
}
173+
};
174+
131175
class TMetadataSubscriber: public NOlap::IDataAccessorRequestsSubscriber {
132176
private:
177+
const std::shared_ptr<NOlap::IStoragesManager> StoragesManager;
133178
const std::shared_ptr<TResultAccumulator> Result;
134179
std::shared_ptr<NOlap::TVersionedIndex> VersionedIndex;
135180
const std::set<ui32> ColumnTagsRequested;
@@ -143,6 +188,8 @@ class TColumnPortionsAccumulator {
143188
sketchesByColumns.emplace(id, TCountMinSketch::Create());
144189
}
145190

191+
THashMap<ui32, THashMap<TString, THashSet<NOlap::TBlobRange>>> rangesByColumn;
192+
146193
for (const auto& [id, portionInfo] : result.GetPortions()) {
147194
std::shared_ptr<NOlap::ISnapshotSchema> portionSchema = portionInfo.GetPortionInfo().GetSchema(*VersionedIndex);
148195
for (const ui32 columnId : ColumnTagsRequested) {
@@ -154,26 +201,43 @@ class TColumnPortionsAccumulator {
154201
}
155202
AFL_VERIFY(indexMeta->GetColumnIds().size() == 1);
156203

157-
const std::vector<TString> data = portionInfo.GetIndexInplaceDataVerified(indexMeta->GetIndexId());
204+
if (!indexMeta->IsInplaceData()) {
205+
portionInfo.FillBlobRangesByStorage(rangesByColumn, portionSchema->GetIndexInfo(), { indexMeta->GetIndexId() });
206+
} else {
207+
const std::vector<TString> data = portionInfo.GetIndexInplaceDataVerified(indexMeta->GetIndexId());
158208

159-
for (const auto& sketchAsString : data) {
160-
auto sketch =
161-
std::unique_ptr<TCountMinSketch>(TCountMinSketch::FromString(sketchAsString.data(), sketchAsString.size()));
162-
*sketchesByColumns[columnId] += *sketch;
209+
for (const auto& sketchAsString : data) {
210+
auto sketch =
211+
std::unique_ptr<TCountMinSketch>(TCountMinSketch::FromString(sketchAsString.data(), sketchAsString.size()));
212+
*sketchesByColumns[columnId] += *sketch;
213+
}
163214
}
164215
}
165216
}
166-
Result->AddResult(std::move(sketchesByColumns));
217+
if (rangesByColumn.size()) {
218+
NOlap::TBlobsAction blobsAction(StoragesManager, NOlap::NBlobOperations::EConsumer::STATISTICS);
219+
for (auto&& i : rangesByColumn) {
220+
for (auto&& [storageId, ranges] : i.second) {
221+
auto reader = blobsAction.GetReading(storageId);
222+
for (auto&& i : ranges) {
223+
reader->AddRange(i);
224+
}
225+
}
226+
}
227+
TActorContext::AsActorContext().Register(new NOlap::NBlobOperations::NRead::TActor(
228+
std::make_shared<TIndexReadTask>(Result, blobsAction.GetReadingActions(), std::move(rangesByColumn), std::move(sketchesByColumns))));
229+
} else {
230+
Result->AddResult(std::move(sketchesByColumns));
231+
}
167232
}
168233

169234
public:
170-
TMetadataSubscriber(
171-
const std::shared_ptr<TResultAccumulator>& result, const std::shared_ptr<NOlap::TVersionedIndex>& vIndex, const std::set<ui32>& tags)
172-
: Result(result)
235+
TMetadataSubscriber(const std::shared_ptr<NOlap::IStoragesManager>& storagesManager, const std::shared_ptr<TResultAccumulator>& result,
236+
const std::shared_ptr<NOlap::TVersionedIndex>& vIndex, const std::set<ui32>& tags)
237+
: StoragesManager(storagesManager)
238+
, Result(result)
173239
, VersionedIndex(vIndex)
174-
, ColumnTagsRequested(tags)
175-
{
176-
240+
, ColumnTagsRequested(tags) {
177241
}
178242
};
179243

@@ -186,7 +250,7 @@ class TColumnPortionsAccumulator {
186250
for (auto&& i : Portions) {
187251
request->AddPortion(i);
188252
}
189-
request->RegisterSubscriber(std::make_shared<TMetadataSubscriber>(Result, VersionedIndex, ColumnTagsRequested));
253+
request->RegisterSubscriber(std::make_shared<TMetadataSubscriber>(StoragesManager, Result, VersionedIndex, ColumnTagsRequested));
190254
Portions.clear();
191255
DataAccessors->AskData(request);
192256
}
@@ -234,7 +298,8 @@ void TColumnShard::Handle(NStat::TEvStatistics::TEvStatisticsRequest::TPtr& ev,
234298
std::shared_ptr<TResultAccumulator> resultAccumulator =
235299
std::make_shared<TResultAccumulator>(columnTagsRequested, ev->Sender, ev->Cookie, std::move(response));
236300
auto versionedIndex = std::make_shared<NOlap::TVersionedIndex>(index.GetVersionedIndex());
237-
TColumnPortionsAccumulator portionsPack(resultAccumulator, 1000, columnTagsRequested, versionedIndex, DataAccessorsManager.GetObjectPtrVerified());
301+
TColumnPortionsAccumulator portionsPack(
302+
StoragesManager, resultAccumulator, 1000, columnTagsRequested, versionedIndex, DataAccessorsManager.GetObjectPtrVerified());
238303

239304
for (const auto& [_, portionInfo] : spg->GetPortions()) {
240305
if (!portionInfo->IsVisible(GetMaxReadVersion())) {
@@ -246,4 +311,4 @@ void TColumnShard::Handle(NStat::TEvStatistics::TEvStatisticsRequest::TPtr& ev,
246311
resultAccumulator->Start();
247312
}
248313

249-
}
314+
} // namespace NKikimr::NColumnShard

ydb/core/tx/columnshard/engines/portions/data_accessor.cpp

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,33 @@ TPortionDataAccessor::TPreparedBatchData TPortionDataAccessor::PrepareForAssembl
116116
return PrepareForAssembleImpl(*this, *PortionInfo, dataSchema, resultSchema, blobsData, defaultSnapshot, restoreAbsent);
117117
}
118118

119+
void TPortionDataAccessor::FillBlobRangesByStorage(THashMap<ui32, THashMap<TString, THashSet<TBlobRange>>>& result,
120+
const TVersionedIndex& index, const THashSet<ui32>& entityIds) const {
121+
auto schema = PortionInfo->GetSchema(index);
122+
return FillBlobRangesByStorage(result, schema->GetIndexInfo(), entityIds);
123+
}
124+
125+
void TPortionDataAccessor::FillBlobRangesByStorage(
126+
THashMap<ui32, THashMap<TString, THashSet<TBlobRange>>>& result, const TIndexInfo& indexInfo, const THashSet<ui32>& entityIds) const {
127+
for (auto&& i : GetRecordsVerified()) {
128+
if (!entityIds.contains(i.GetEntityId())) {
129+
continue;
130+
}
131+
const TString& storageId = PortionInfo->GetColumnStorageId(i.GetColumnId(), indexInfo);
132+
AFL_VERIFY(result[i.GetEntityId()][storageId].emplace(PortionInfo->RestoreBlobRange(i.GetBlobRange())).second)(
133+
"blob_id", PortionInfo->RestoreBlobRange(i.GetBlobRange()).ToString());
134+
}
135+
for (auto&& i : GetIndexesVerified()) {
136+
if (!entityIds.contains(i.GetEntityId())) {
137+
continue;
138+
}
139+
const TString& storageId = PortionInfo->GetIndexStorageId(i.GetIndexId(), indexInfo);
140+
auto bRange = i.GetBlobRangeVerified();
141+
AFL_VERIFY(result[i.GetEntityId()][storageId].emplace(PortionInfo->RestoreBlobRange(bRange)).second)(
142+
"blob_id", PortionInfo->RestoreBlobRange(bRange).ToString());
143+
}
144+
}
145+
119146
void TPortionDataAccessor::FillBlobRangesByStorage(THashMap<TString, THashSet<TBlobRange>>& result, const TVersionedIndex& index) const {
120147
auto schema = PortionInfo->GetSchema(index);
121148
return FillBlobRangesByStorage(result, schema->GetIndexInfo());

ydb/core/tx/columnshard/engines/portions/data_accessor.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,9 @@ class TPortionDataAccessor {
229229

230230
void FillBlobRangesByStorage(THashMap<TString, THashSet<TBlobRange>>& result, const TIndexInfo& indexInfo) const;
231231
void FillBlobRangesByStorage(THashMap<TString, THashSet<TBlobRange>>& result, const TVersionedIndex& index) const;
232+
void FillBlobRangesByStorage(THashMap<ui32, THashMap<TString, THashSet<TBlobRange>>>& result, const TIndexInfo& indexInfo, const THashSet<ui32>& entityIds) const;
233+
void FillBlobRangesByStorage(
234+
THashMap<ui32, THashMap<TString, THashSet<TBlobRange>>>& result, const TVersionedIndex& index, const THashSet<ui32>& entityIds) const;
232235
void FillBlobIdsByStorage(THashMap<TString, THashSet<TUnifiedBlobId>>& result, const TIndexInfo& indexInfo) const;
233236
void FillBlobIdsByStorage(THashMap<TString, THashSet<TUnifiedBlobId>>& result, const TVersionedIndex& index) const;
234237

ydb/core/tx/columnshard/engines/scheme/indexes/abstract/meta.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,10 @@ class IIndexMeta {
4545
using TFactory = NObjectFactory::TObjectFactory<IIndexMeta, TString>;
4646
using TProto = NKikimrSchemeOp::TOlapIndexDescription;
4747

48+
bool IsInplaceData() const {
49+
return StorageId == NBlobOperations::TGlobal::LocalMetadataStorageId;
50+
}
51+
4852
IIndexMeta() = default;
4953
IIndexMeta(const ui32 indexId, const TString& indexName, const TString& storageId)
5054
: IndexName(indexName)

ydb/core/tx/columnshard/engines/storage/indexes/count_min_sketch/constructor.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ std::shared_ptr<NKikimr::NOlap::NIndexes::IIndexMeta> TCountMinSketchConstructor
2020
}
2121
AFL_VERIFY(columnIds.emplace(columnInfo->GetId()).second);
2222
}
23-
return std::make_shared<TIndexMeta>(indexId, indexName, GetStorageId().value_or(NBlobOperations::TGlobal::LocalMetadataStorageId), columnIds);
23+
return std::make_shared<TIndexMeta>(indexId, indexName, GetStorageId().value_or(NBlobOperations::TGlobal::DefaultStorageId), columnIds);
2424
}
2525

2626
NKikimr::TConclusionStatus TCountMinSketchConstructor::DoDeserializeFromJson(const NJson::TJsonValue& jsonInfo) {

0 commit comments

Comments
 (0)