Skip to content

Commit 0d5a97e

Browse files
correction
1 parent e4381e2 commit 0d5a97e

File tree

4 files changed

+129
-22
lines changed

4 files changed

+129
-22
lines changed

ydb/core/tx/columnshard/blobs_action/counters/storage.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ enum class EConsumer {
2626
WRITING_BUFFER,
2727
WRITING_OPERATOR,
2828
NORMALIZER,
29+
STATISTICS,
2930

3031
COUNT
3132
};

ydb/core/tx/columnshard/columnshard__statistics.cpp

Lines changed: 85 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,20 @@
11
#include "columnshard.h"
22
#include "columnshard_impl.h"
3+
34
#include "ydb/core/tx/columnshard/engines/storage/indexes/count_min_sketch/meta.h"
45

56
#include <ydb/core/protos/kqp.pb.h>
67
#include <ydb/core/tx/columnshard/engines/column_engine_logs.h>
78

89
#include <yql/essentials/core/minsketch/count_min_sketch.h>
910

10-
1111
namespace NKikimr::NColumnShard {
1212

1313
void TColumnShard::Handle(NStat::TEvStatistics::TEvAnalyzeTable::TPtr& ev, const TActorContext&) {
1414
auto& requestRecord = ev->Get()->Record;
1515
// TODO Start a potentially long analysis process.
1616
// ...
1717

18-
19-
2018
// Return the response when the analysis is completed
2119
auto response = std::make_unique<NStat::TEvStatistics::TEvAnalyzeTableResponse>();
2220
auto& responseRecord = response->Record;
@@ -64,8 +62,7 @@ class TResultAccumulator {
6462
std::unique_ptr<NStat::TEvStatistics::TEvStatisticsResponse>&& response)
6563
: RequestSenderActorId(requestSenderActorId)
6664
, Cookie(cookie)
67-
, Response(std::move(response))
68-
{
65+
, Response(std::move(response)) {
6966
for (auto&& i : tags) {
7067
AFL_VERIFY(Calculated.emplace(i, nullptr).second);
7168
}
@@ -104,11 +101,11 @@ class TResultAccumulator {
104101
OnResultReady();
105102
}
106103
}
107-
108104
};
109105

110106
class TColumnPortionsAccumulator {
111107
private:
108+
const std::shared_ptr<NOlap::IStoragesManager> StoragesManager;
112109
const std::set<ui32> ColumnTagsRequested;
113110
std::vector<NOlap::TPortionInfo::TConstPtr> Portions;
114111
const ui32 PortionsCountLimit = 10000;
@@ -117,19 +114,66 @@ class TColumnPortionsAccumulator {
117114
const std::shared_ptr<NOlap::TVersionedIndex> VersionedIndex;
118115

119116
public:
120-
TColumnPortionsAccumulator(const std::shared_ptr<TResultAccumulator>& result, const ui32 portionsCountLimit,
117+
TColumnPortionsAccumulator(const std::shared_ptr<NOlap::IStoragesManager>& storagesManager, const std::shared_ptr<TResultAccumulator>& result,
118+
const ui32 portionsCountLimit,
121119
const std::set<ui32>& originalColumnTags, const std::shared_ptr<NOlap::TVersionedIndex>& vIndex,
122120
const std::shared_ptr<NOlap::NDataAccessorControl::IDataAccessorsManager>& dataAccessorsManager)
123-
: ColumnTagsRequested(originalColumnTags)
121+
: StoragesManager(storagesManager)
122+
, ColumnTagsRequested(originalColumnTags)
124123
, PortionsCountLimit(portionsCountLimit)
125124
, DataAccessors(dataAccessorsManager)
126125
, Result(result)
127-
, VersionedIndex(vIndex)
128-
{
126+
, VersionedIndex(vIndex) {
129127
}
130128

129+
class TIndexReadTask: public NOlap::NBlobOperations::NRead::ITask {
130+
private:
131+
using TBase = NOlap::NBlobOperations::NRead::ITask;
132+
const std::shared_ptr<TResultAccumulator> Result;
133+
THashMap<ui32, THashMap<TString, std::vector<TBlobRange>>> RangesByColumn;
134+
THashMap<ui32, std::unique_ptr<TCountMinSketch>> SketchesByColumns;
135+
136+
protected:
137+
virtual void DoOnDataReady(const std::shared_ptr<NOlap::NResourceBroker::NSubscribe::TResourcesGuard>& /*resourcesGuard*/) override {
138+
TCompositeReadBlobs blobs = ExtractBlobsData();
139+
for (auto&& [columnId, data] : RangesByColumn) {
140+
for (auto&& [storageId, blobs] : data) {
141+
for (auto&& b : blobs) {
142+
const TString blob = blobs.Extract(storageId, b);
143+
auto sketch = std::unique_ptr<TCountMinSketch>(TCountMinSketch::FromString(blob.data(), blob.size()));
144+
*sketchesByColumns[columnId] += *sketch;
145+
}
146+
}
147+
}
148+
Result->AddResult(std::move(sketchesByColumns));
149+
}
150+
151+
virtual bool DoOnError(
152+
const TString& storageId, const NOlap::TBlobRange& range, const NOlap::IBlobsReadingAction::TErrorStatus& status) override {
153+
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "DoOnError")("storage_id", storageId)("blob_id", range)(
154+
"status", status.GetErrorMessage())("status_code", status.GetStatus());
155+
AFL_VERIFY(status.GetStatus() != NKikimrProto::EReplyStatus::NODATA)("blob_id", range)("status", status.GetStatus())(
156+
"error", status.GetErrorMessage())("type", TxEvent->IndexChanges->TypeString())(
157+
"task_id", TxEvent->IndexChanges->GetTaskIdentifier())("debug", TxEvent->IndexChanges->DebugString());
158+
return false;
159+
}
160+
161+
public:
162+
TIndexReadTask(const std::shared_ptr<TResultAccumulator>& result, std::vector<std::shared_ptr<IBlobsReadingAction>>&& readingActions,
163+
THashMap<ui32, THashMap<TString, std::vector<TBlobRange>>>&& rangesByColumn,
164+
THashMap<ui32, std::unique_ptr<TCountMinSketch>>&& readySketches)
165+
: TBase(std::move(readingActions), "STATISTICS", "STATISTICS")
166+
, Result(result)
167+
, RangesByColumn(std::move(RangesByColumn))
168+
, SketchesByColumns(std::move(readySketches)) {
169+
AFL_VERIFY(!!Result);
170+
AFL_VERIFY(RangesByColumn.size());
171+
}
172+
};
173+
131174
class TMetadataSubscriber: public NOlap::IDataAccessorRequestsSubscriber {
132175
private:
176+
const std::shared_ptr<NOlap::IStoragesManager> StoragesManager;
133177
const std::shared_ptr<TResultAccumulator> Result;
134178
std::shared_ptr<NOlap::TVersionedIndex> VersionedIndex;
135179
const std::set<ui32> ColumnTagsRequested;
@@ -143,6 +187,8 @@ class TColumnPortionsAccumulator {
143187
sketchesByColumns.emplace(id, TCountMinSketch::Create());
144188
}
145189

190+
THashMap<ui32, THashMap<TString, std::vector<TBlobRange>>> rangesByColumn;
191+
146192
for (const auto& [id, portionInfo] : result.GetPortions()) {
147193
std::shared_ptr<NOlap::ISnapshotSchema> portionSchema = portionInfo.GetPortionInfo().GetSchema(*VersionedIndex);
148194
for (const ui32 columnId : ColumnTagsRequested) {
@@ -154,26 +200,42 @@ class TColumnPortionsAccumulator {
154200
}
155201
AFL_VERIFY(indexMeta->GetColumnIds().size() == 1);
156202

157-
const std::vector<TString> data = portionInfo.GetIndexInplaceDataVerified(indexMeta->GetIndexId());
203+
if (!indexMeta->IsInplaceData()) {
204+
portionInfo.FillBlobRangesByStorage(rangesByColumn, portionSchema->GetIndexInfo(), { columnId });
205+
} else {
206+
const std::vector<TString> data = portionInfo.GetIndexInplaceDataVerified(indexMeta->GetIndexId());
158207

159-
for (const auto& sketchAsString : data) {
160-
auto sketch =
161-
std::unique_ptr<TCountMinSketch>(TCountMinSketch::FromString(sketchAsString.data(), sketchAsString.size()));
162-
*sketchesByColumns[columnId] += *sketch;
208+
for (const auto& sketchAsString : data) {
209+
auto sketch =
210+
std::unique_ptr<TCountMinSketch>(TCountMinSketch::FromString(sketchAsString.data(), sketchAsString.size()));
211+
*sketchesByColumns[columnId] += *sketch;
212+
}
163213
}
164214
}
165215
}
166-
Result->AddResult(std::move(sketchesByColumns));
216+
if (rangesByColumn.size()) {
217+
TBlobsAction blobsAction(StoragesManager, NBlobOperations::EConsumer::STATISTICS);
218+
for (auto&& i : rangesByColumn) {
219+
for (auto&& [storageId, ranges]: i.second) {
220+
auto reader = blobsAction.GetReading(storageId);
221+
for (auto&& i : ranges) {
222+
reader->AddRange(i);
223+
}
224+
}
225+
}
226+
TActorContext::AsActorContext().Register(new NOlap::NBlobOperations::NRead::TActor(
227+
std::make_shared<TIndexReadTask>(Result, blobsAction.GetReadingActions(), rangesByColumn, sketchesByColumns)));
228+
} else {
229+
Result->AddResult(std::move(sketchesByColumns));
230+
}
167231
}
168232

169233
public:
170234
TMetadataSubscriber(
171235
const std::shared_ptr<TResultAccumulator>& result, const std::shared_ptr<NOlap::TVersionedIndex>& vIndex, const std::set<ui32>& tags)
172236
: Result(result)
173237
, VersionedIndex(vIndex)
174-
, ColumnTagsRequested(tags)
175-
{
176-
238+
, ColumnTagsRequested(tags) {
177239
}
178240
};
179241

@@ -186,7 +248,7 @@ class TColumnPortionsAccumulator {
186248
for (auto&& i : Portions) {
187249
request->AddPortion(i);
188250
}
189-
request->RegisterSubscriber(std::make_shared<TMetadataSubscriber>(Result, VersionedIndex, ColumnTagsRequested));
251+
request->RegisterSubscriber(std::make_shared<TMetadataSubscriber>(StoragesManager, Result, VersionedIndex, ColumnTagsRequested));
190252
Portions.clear();
191253
DataAccessors->AskData(request);
192254
}
@@ -234,7 +296,8 @@ void TColumnShard::Handle(NStat::TEvStatistics::TEvStatisticsRequest::TPtr& ev,
234296
std::shared_ptr<TResultAccumulator> resultAccumulator =
235297
std::make_shared<TResultAccumulator>(columnTagsRequested, ev->Sender, ev->Cookie, std::move(response));
236298
auto versionedIndex = std::make_shared<NOlap::TVersionedIndex>(index.GetVersionedIndex());
237-
TColumnPortionsAccumulator portionsPack(resultAccumulator, 1000, columnTagsRequested, versionedIndex, DataAccessorsManager.GetObjectPtrVerified());
299+
TColumnPortionsAccumulator portionsPack(StoragesManager,
300+
resultAccumulator, 1000, columnTagsRequested, versionedIndex, DataAccessorsManager.GetObjectPtrVerified());
238301

239302
for (const auto& [_, portionInfo] : spg->GetPortions()) {
240303
if (!portionInfo->IsVisible(GetMaxReadVersion())) {
@@ -246,4 +309,4 @@ void TColumnShard::Handle(NStat::TEvStatistics::TEvStatisticsRequest::TPtr& ev,
246309
resultAccumulator->Start();
247310
}
248311

249-
}
312+
} // namespace NKikimr::NColumnShard

ydb/core/tx/columnshard/engines/portions/data_accessor.cpp

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,33 @@ TPortionDataAccessor::TPreparedBatchData TPortionDataAccessor::PrepareForAssembl
116116
return PrepareForAssembleImpl(*this, *PortionInfo, dataSchema, resultSchema, blobsData, defaultSnapshot, restoreAbsent);
117117
}
118118

119+
void TPortionDataAccessor::FillBlobRangesByStorage(THashMap<ui32, THashMap<TString, THashSet<TBlobRange>>>& result,
120+
const TVersionedIndex& index, const THashSet<ui32>& entityIds) const {
121+
auto schema = PortionInfo->GetSchema(index);
122+
return FillBlobRangesByStorage(result, schema->GetIndexInfo());
123+
}
124+
125+
void TPortionDataAccessor::FillBlobRangesByStorage(
126+
THashMap<ui32, THashMap<TString, THashSet<TBlobRange>>>& result, const TIndexInfo& indexInfo, const THashSet<ui32>& entityIds) const {
127+
for (auto&& i : GetRecordsVerified()) {
128+
if (!entityIds.contains(i.GetEntityId())) {
129+
continue;
130+
}
131+
const TString& storageId = PortionInfo->GetColumnStorageId(i.GetColumnId(), indexInfo);
132+
AFL_VERIFY(result[i.GetEntityId()][storageId].emplace(PortionInfo->RestoreBlobRange(i.GetBlobRange())).second)(
133+
"blob_id", PortionInfo->RestoreBlobRange(i.GetBlobRange()).ToString());
134+
}
135+
for (auto&& i : GetIndexesVerified()) {
136+
if (!entityIds.contains(i.GetEntityId())) {
137+
continue;
138+
}
139+
const TString& storageId = PortionInfo->GetIndexStorageId(i.GetIndexId(), indexInfo);
140+
auto bRange = i.GetBlobRangeVerified();
141+
AFL_VERIFY(result[i.GetEntityId()][storageId].emplace(PortionInfo->RestoreBlobRange(bRange)).second)(
142+
"blob_id", PortionInfo->RestoreBlobRange(bRange).ToString());
143+
}
144+
}
145+
119146
void TPortionDataAccessor::FillBlobRangesByStorage(THashMap<TString, THashSet<TBlobRange>>& result, const TVersionedIndex& index) const {
120147
auto schema = PortionInfo->GetSchema(index);
121148
return FillBlobRangesByStorage(result, schema->GetIndexInfo());

ydb/core/tx/columnshard/engines/portions/data_accessor.h

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,19 @@ class TPortionDataAccessor {
181181
return result;
182182
}
183183

184+
THashMap<TString, std::vector<TBlobRange>> GetIndexRangesVerified(const ui32 indexId) const {
185+
if (!Indexes) {
186+
return {};
187+
}
188+
THashMap<TString, std::vector<TBlobRange>> result;
189+
for (auto&& i : *Indexes) {
190+
if (i.GetEntityId() == indexId) {
191+
result.emplace_back(i.GetBlobDataVerified());
192+
}
193+
}
194+
return result;
195+
}
196+
184197
std::set<ui32> GetColumnIds() const {
185198
std::set<ui32> result;
186199
for (auto&& i : GetRecordsVerified()) {
@@ -229,6 +242,9 @@ class TPortionDataAccessor {
229242

230243
void FillBlobRangesByStorage(THashMap<TString, THashSet<TBlobRange>>& result, const TIndexInfo& indexInfo) const;
231244
void FillBlobRangesByStorage(THashMap<TString, THashSet<TBlobRange>>& result, const TVersionedIndex& index) const;
245+
void FillBlobRangesByStorage(THashMap<ui32, THashMap<TString, THashSet<TBlobRange>>>& result, const TIndexInfo& indexInfo, const THashSet<ui32>& entityIds) const;
246+
void FillBlobRangesByStorage(
247+
THashMap<ui32, THashMap<TString, THashSet<TBlobRange>>>& result, const TVersionedIndex& index, const THashSet<ui32>& entityIds) const;
232248
void FillBlobIdsByStorage(THashMap<TString, THashSet<TUnifiedBlobId>>& result, const TIndexInfo& indexInfo) const;
233249
void FillBlobIdsByStorage(THashMap<TString, THashSet<TUnifiedBlobId>>& result, const TVersionedIndex& index) const;
234250

0 commit comments

Comments
 (0)