11#include " columnshard.h"
22#include " columnshard_impl.h"
3+
34#include " ydb/core/tx/columnshard/engines/storage/indexes/count_min_sketch/meta.h"
45
56#include < ydb/core/protos/kqp.pb.h>
7+ #include < ydb/core/tx/columnshard/blobs_reader/actor.h>
68#include < ydb/core/tx/columnshard/engines/column_engine_logs.h>
79
810#include < yql/essentials/core/minsketch/count_min_sketch.h>
911
10-
1112namespace NKikimr ::NColumnShard {
1213
1314void TColumnShard::Handle (NStat::TEvStatistics::TEvAnalyzeTable::TPtr& ev, const TActorContext&) {
1415 auto & requestRecord = ev->Get ()->Record ;
1516 // TODO Start a potentially long analysis process.
1617 // ...
1718
18-
19-
2019 // Return the response when the analysis is completed
2120 auto response = std::make_unique<NStat::TEvStatistics::TEvAnalyzeTableResponse>();
2221 auto & responseRecord = response->Record ;
@@ -64,8 +63,7 @@ class TResultAccumulator {
6463 std::unique_ptr<NStat::TEvStatistics::TEvStatisticsResponse>&& response)
6564 : RequestSenderActorId(requestSenderActorId)
6665 , Cookie(cookie)
67- , Response(std::move(response))
68- {
66+ , Response(std::move(response)) {
6967 for (auto && i : tags) {
7068 AFL_VERIFY (Calculated.emplace (i, nullptr ).second );
7169 }
@@ -104,11 +102,11 @@ class TResultAccumulator {
104102 OnResultReady ();
105103 }
106104 }
107-
108105};
109106
110107class TColumnPortionsAccumulator {
111108private:
109+ const std::shared_ptr<NOlap::IStoragesManager> StoragesManager;
112110 const std::set<ui32> ColumnTagsRequested;
113111 std::vector<NOlap::TPortionInfo::TConstPtr> Portions;
114112 const ui32 PortionsCountLimit = 10000 ;
@@ -117,19 +115,68 @@ class TColumnPortionsAccumulator {
117115 const std::shared_ptr<NOlap::TVersionedIndex> VersionedIndex;
118116
119117public:
120- TColumnPortionsAccumulator (const std::shared_ptr<TResultAccumulator>& result, const ui32 portionsCountLimit,
121- const std::set<ui32>& originalColumnTags, const std::shared_ptr<NOlap::TVersionedIndex>& vIndex,
118+ TColumnPortionsAccumulator (const std::shared_ptr<NOlap::IStoragesManager>& storagesManager,
119+ const std::shared_ptr<TResultAccumulator>& result, const ui32 portionsCountLimit, const std::set<ui32>& originalColumnTags,
120+ const std::shared_ptr<NOlap::TVersionedIndex>& vIndex,
122121 const std::shared_ptr<NOlap::NDataAccessorControl::IDataAccessorsManager>& dataAccessorsManager)
123- : ColumnTagsRequested(originalColumnTags)
122+ : StoragesManager(storagesManager)
123+ , ColumnTagsRequested(originalColumnTags)
124124 , PortionsCountLimit(portionsCountLimit)
125125 , DataAccessors(dataAccessorsManager)
126126 , Result(result)
127- , VersionedIndex(vIndex)
128- {
127+ , VersionedIndex(vIndex) {
129128 }
130129
130+ class TIndexReadTask : public NOlap ::NBlobOperations::NRead::ITask {
131+ private:
132+ using TBase = NOlap::NBlobOperations::NRead::ITask;
133+ const std::shared_ptr<TResultAccumulator> Result;
134+ THashMap<ui32, THashMap<TString, THashSet<NOlap::TBlobRange>>> RangesByColumn;
135+ THashMap<ui32, std::unique_ptr<TCountMinSketch>> SketchesByColumns;
136+
137+ protected:
138+ virtual void DoOnDataReady (const std::shared_ptr<NOlap::NResourceBroker::NSubscribe::TResourcesGuard>& /* resourcesGuard*/ ) override {
139+ NOlap::NBlobOperations::NRead::TCompositeReadBlobs blobsData = ExtractBlobsData ();
140+ for (auto && [columnId, data] : RangesByColumn) {
141+ for (auto && [storageId, blobs] : data) {
142+ for (auto && b : blobs) {
143+ const TString blob = blobsData.Extract (storageId, b);
144+ auto sketch = std::unique_ptr<TCountMinSketch>(TCountMinSketch::FromString (blob.data (), blob.size ()));
145+ auto it = SketchesByColumns.find (columnId);
146+ AFL_VERIFY (it != SketchesByColumns.end ());
147+ *it->second += *sketch;
148+ }
149+ }
150+ }
151+ Result->AddResult (std::move (SketchesByColumns));
152+ }
153+
154+ virtual bool DoOnError (
155+ const TString& storageId, const NOlap::TBlobRange& range, const NOlap::IBlobsReadingAction::TErrorStatus& status) override {
156+ AFL_ERROR (NKikimrServices::TX_COLUMNSHARD)(" event" , " DoOnError" )(" storage_id" , storageId)(" blob_id" , range)(
157+ " status" , status.GetErrorMessage ())(" status_code" , status.GetStatus ());
158+ AFL_VERIFY (status.GetStatus () != NKikimrProto::EReplyStatus::NODATA)(" blob_id" , range)(" status" , status.GetStatus ())(
159+ " error" , status.GetErrorMessage ())(" type" , " STATISTICS" );
160+ return false ;
161+ }
162+
163+ public:
164+ TIndexReadTask (const std::shared_ptr<TResultAccumulator>& result,
165+ std::vector<std::shared_ptr<NOlap::IBlobsReadingAction>>&& readingActions,
166+ THashMap<ui32, THashMap<TString, THashSet<NOlap::TBlobRange>>>&& rangesByColumn,
167+ THashMap<ui32, std::unique_ptr<TCountMinSketch>>&& readySketches)
168+ : TBase(std::move(readingActions), " STATISTICS" , " STATISTICS" )
169+ , Result(result)
170+ , RangesByColumn(std::move(rangesByColumn))
171+ , SketchesByColumns(std::move(readySketches)) {
172+ AFL_VERIFY (!!Result);
173+ AFL_VERIFY (RangesByColumn.size ());
174+ }
175+ };
176+
131177 class TMetadataSubscriber : public NOlap ::IDataAccessorRequestsSubscriber {
132178 private:
179+ const std::shared_ptr<NOlap::IStoragesManager> StoragesManager;
133180 const std::shared_ptr<TResultAccumulator> Result;
134181 std::shared_ptr<NOlap::TVersionedIndex> VersionedIndex;
135182 const std::set<ui32> ColumnTagsRequested;
@@ -143,6 +190,9 @@ class TColumnPortionsAccumulator {
143190 sketchesByColumns.emplace (id, TCountMinSketch::Create ());
144191 }
145192
193+ THashMap<ui32, THashMap<TString, THashSet<NOlap::TBlobRange>>> rangesByColumn;
194+ THashMap<ui32, ui32> indexIdToColumnId;
195+
146196 for (const auto & [id, portionInfo] : result.GetPortions ()) {
147197 std::shared_ptr<NOlap::ISnapshotSchema> portionSchema = portionInfo.GetPortionInfo ().GetSchema (*VersionedIndex);
148198 for (const ui32 columnId : ColumnTagsRequested) {
@@ -153,27 +203,48 @@ class TColumnPortionsAccumulator {
153203 continue ;
154204 }
155205 AFL_VERIFY (indexMeta->GetColumnIds ().size () == 1 );
156-
157- const std::vector<TString> data = portionInfo.GetIndexInplaceDataVerified (indexMeta->GetIndexId ());
158-
159- for (const auto & sketchAsString : data) {
160- auto sketch =
161- std::unique_ptr<TCountMinSketch>(TCountMinSketch::FromString (sketchAsString.data (), sketchAsString.size ()));
162- *sketchesByColumns[columnId] += *sketch;
206+ indexIdToColumnId.emplace (indexMeta->GetIndexId (), columnId);
207+ if (!indexMeta->IsInplaceData ()) {
208+ portionInfo.FillBlobRangesByStorage (rangesByColumn, portionSchema->GetIndexInfo (), { indexMeta->GetIndexId () });
209+ } else {
210+ const std::vector<TString> data = portionInfo.GetIndexInplaceDataVerified (indexMeta->GetIndexId ());
211+
212+ for (const auto & sketchAsString : data) {
213+ auto sketch =
214+ std::unique_ptr<TCountMinSketch>(TCountMinSketch::FromString (sketchAsString.data (), sketchAsString.size ()));
215+ *sketchesByColumns[columnId] += *sketch;
216+ }
163217 }
164218 }
165219 }
166- Result->AddResult (std::move (sketchesByColumns));
220+ if (rangesByColumn.size ()) {
221+ NOlap::TBlobsAction blobsAction (StoragesManager, NOlap::NBlobOperations::EConsumer::STATISTICS);
222+ THashMap<ui32, THashMap<TString, THashSet<NOlap::TBlobRange>>> rangesByColumnLocal;
223+ for (auto && i : rangesByColumn) {
224+ for (auto && [storageId, ranges] : i.second ) {
225+ auto reader = blobsAction.GetReading (storageId);
226+ for (auto && i : ranges) {
227+ reader->AddRange (i);
228+ }
229+ }
230+ auto it = indexIdToColumnId.find (i.first );
231+ AFL_VERIFY (it != indexIdToColumnId.end ());
232+ rangesByColumnLocal.emplace (it->second , std::move (i.second ));
233+ }
234+ TActorContext::AsActorContext ().Register (new NOlap::NBlobOperations::NRead::TActor (std::make_shared<TIndexReadTask>(
235+ Result, blobsAction.GetReadingActions (), std::move (rangesByColumnLocal), std::move (sketchesByColumns))));
236+ } else {
237+ Result->AddResult (std::move (sketchesByColumns));
238+ }
167239 }
168240
169241 public:
170- TMetadataSubscriber (
171- const std::shared_ptr<TResultAccumulator>& result, const std::shared_ptr<NOlap::TVersionedIndex>& vIndex, const std::set<ui32>& tags)
172- : Result(result)
242+ TMetadataSubscriber (const std::shared_ptr<NOlap::IStoragesManager>& storagesManager, const std::shared_ptr<TResultAccumulator>& result,
243+ const std::shared_ptr<NOlap::TVersionedIndex>& vIndex, const std::set<ui32>& tags)
244+ : StoragesManager(storagesManager)
245+ , Result(result)
173246 , VersionedIndex(vIndex)
174- , ColumnTagsRequested(tags)
175- {
176-
247+ , ColumnTagsRequested(tags) {
177248 }
178249 };
179250
@@ -186,7 +257,7 @@ class TColumnPortionsAccumulator {
186257 for (auto && i : Portions) {
187258 request->AddPortion (i);
188259 }
189- request->RegisterSubscriber (std::make_shared<TMetadataSubscriber>(Result, VersionedIndex, ColumnTagsRequested));
260+ request->RegisterSubscriber (std::make_shared<TMetadataSubscriber>(StoragesManager, Result, VersionedIndex, ColumnTagsRequested));
190261 Portions.clear ();
191262 DataAccessors->AskData (request);
192263 }
@@ -234,7 +305,8 @@ void TColumnShard::Handle(NStat::TEvStatistics::TEvStatisticsRequest::TPtr& ev,
234305 std::shared_ptr<TResultAccumulator> resultAccumulator =
235306 std::make_shared<TResultAccumulator>(columnTagsRequested, ev->Sender , ev->Cookie , std::move (response));
236307 auto versionedIndex = std::make_shared<NOlap::TVersionedIndex>(index.GetVersionedIndex ());
237- TColumnPortionsAccumulator portionsPack (resultAccumulator, 1000 , columnTagsRequested, versionedIndex, DataAccessorsManager.GetObjectPtrVerified ());
308+ TColumnPortionsAccumulator portionsPack (
309+ StoragesManager, resultAccumulator, 1000 , columnTagsRequested, versionedIndex, DataAccessorsManager.GetObjectPtrVerified ());
238310
239311 for (const auto & [_, portionInfo] : spg->GetPortions ()) {
240312 if (!portionInfo->IsVisible (GetMaxReadVersion ())) {
@@ -246,4 +318,4 @@ void TColumnShard::Handle(NStat::TEvStatistics::TEvStatisticsRequest::TPtr& ev,
246318 resultAccumulator->Start ();
247319}
248320
249- }
321+ } // namespace NKikimr::NColumnShard
0 commit comments