11#include " columnshard.h"
22#include " columnshard_impl.h"
3+
34#include " ydb/core/tx/columnshard/engines/storage/indexes/count_min_sketch/meta.h"
45
56#include < ydb/core/protos/kqp.pb.h>
67#include < ydb/core/tx/columnshard/engines/column_engine_logs.h>
78
89#include < yql/essentials/core/minsketch/count_min_sketch.h>
910
10-
1111namespace NKikimr ::NColumnShard {
1212
1313void TColumnShard::Handle (NStat::TEvStatistics::TEvAnalyzeTable::TPtr& ev, const TActorContext&) {
1414 auto & requestRecord = ev->Get ()->Record ;
1515 // TODO Start a potentially long analysis process.
1616 // ...
1717
18-
19-
2018 // Return the response when the analysis is completed
2119 auto response = std::make_unique<NStat::TEvStatistics::TEvAnalyzeTableResponse>();
2220 auto & responseRecord = response->Record ;
@@ -64,8 +62,7 @@ class TResultAccumulator {
6462 std::unique_ptr<NStat::TEvStatistics::TEvStatisticsResponse>&& response)
6563 : RequestSenderActorId(requestSenderActorId)
6664 , Cookie(cookie)
67- , Response(std::move(response))
68- {
65+ , Response(std::move(response)) {
6966 for (auto && i : tags) {
7067 AFL_VERIFY (Calculated.emplace (i, nullptr ).second );
7168 }
@@ -104,11 +101,11 @@ class TResultAccumulator {
104101 OnResultReady ();
105102 }
106103 }
107-
108104};
109105
110106class TColumnPortionsAccumulator {
111107private:
108+ const std::shared_ptr<NOlap::IStoragesManager> StoragesManager;
112109 const std::set<ui32> ColumnTagsRequested;
113110 std::vector<NOlap::TPortionInfo::TConstPtr> Portions;
114111 const ui32 PortionsCountLimit = 10000 ;
@@ -117,19 +114,66 @@ class TColumnPortionsAccumulator {
117114 const std::shared_ptr<NOlap::TVersionedIndex> VersionedIndex;
118115
119116public:
120- TColumnPortionsAccumulator (const std::shared_ptr<TResultAccumulator>& result, const ui32 portionsCountLimit,
117+ TColumnPortionsAccumulator (const std::shared_ptr<NOlap::IStoragesManager>& storagesManager, const std::shared_ptr<TResultAccumulator>& result,
118+ const ui32 portionsCountLimit,
121119 const std::set<ui32>& originalColumnTags, const std::shared_ptr<NOlap::TVersionedIndex>& vIndex,
122120 const std::shared_ptr<NOlap::NDataAccessorControl::IDataAccessorsManager>& dataAccessorsManager)
123- : ColumnTagsRequested(originalColumnTags)
121+ : StoragesManager(storagesManager)
122+ , ColumnTagsRequested(originalColumnTags)
124123 , PortionsCountLimit(portionsCountLimit)
125124 , DataAccessors(dataAccessorsManager)
126125 , Result(result)
127- , VersionedIndex(vIndex)
128- {
126+ , VersionedIndex(vIndex) {
129127 }
130128
129+ class TIndexReadTask : public NOlap ::NBlobOperations::NRead::ITask {
130+ private:
131+ using TBase = NOlap::NBlobOperations::NRead::ITask;
132+ const std::shared_ptr<TResultAccumulator> Result;
133+ THashMap<ui32, THashMap<TString, std::vector<TBlobRange>>> RangesByColumn;
134+ THashMap<ui32, std::unique_ptr<TCountMinSketch>> SketchesByColumns;
135+
136+ protected:
137+ virtual void DoOnDataReady (const std::shared_ptr<NOlap::NResourceBroker::NSubscribe::TResourcesGuard>& /* resourcesGuard*/ ) override {
138+ TCompositeReadBlobs blobs = ExtractBlobsData ();
139+ for (auto && [columnId, data] : RangesByColumn) {
140+ for (auto && [storageId, blobs] : data) {
141+ for (auto && b : blobs) {
142+ const TString blob = blobs.Extract (storageId, b);
143+ auto sketch = std::unique_ptr<TCountMinSketch>(TCountMinSketch::FromString (blob.data (), blob.size ()));
144+ *sketchesByColumns[columnId] += *sketch;
145+ }
146+ }
147+ }
148+ Result->AddResult (std::move (sketchesByColumns));
149+ }
150+
151+ virtual bool DoOnError (
152+ const TString& storageId, const NOlap::TBlobRange& range, const NOlap::IBlobsReadingAction::TErrorStatus& status) override {
153+ AFL_ERROR (NKikimrServices::TX_COLUMNSHARD)(" event" , " DoOnError" )(" storage_id" , storageId)(" blob_id" , range)(
154+ " status" , status.GetErrorMessage ())(" status_code" , status.GetStatus ());
155+ AFL_VERIFY (status.GetStatus () != NKikimrProto::EReplyStatus::NODATA)(" blob_id" , range)(" status" , status.GetStatus ())(
156+ " error" , status.GetErrorMessage ())(" type" , TxEvent->IndexChanges ->TypeString ())(
157+ " task_id" , TxEvent->IndexChanges ->GetTaskIdentifier ())(" debug" , TxEvent->IndexChanges ->DebugString ());
158+ return false ;
159+ }
160+
161+ public:
162+ TIndexReadTask (const std::shared_ptr<TResultAccumulator>& result, std::vector<std::shared_ptr<IBlobsReadingAction>>&& readingActions,
163+ THashMap<ui32, THashMap<TString, std::vector<TBlobRange>>>&& rangesByColumn,
164+ THashMap<ui32, std::unique_ptr<TCountMinSketch>>&& readySketches)
165+ : TBase(std::move(readingActions), " STATISTICS" , " STATISTICS" )
166+ , Result(result)
167+ , RangesByColumn(std::move(RangesByColumn))
168+ , SketchesByColumns(std::move(readySketches)) {
169+ AFL_VERIFY (!!Result);
170+ AFL_VERIFY (RangesByColumn.size ());
171+ }
172+ };
173+
131174 class TMetadataSubscriber : public NOlap ::IDataAccessorRequestsSubscriber {
132175 private:
176+ const std::shared_ptr<NOlap::IStoragesManager> StoragesManager;
133177 const std::shared_ptr<TResultAccumulator> Result;
134178 std::shared_ptr<NOlap::TVersionedIndex> VersionedIndex;
135179 const std::set<ui32> ColumnTagsRequested;
@@ -140,6 +184,8 @@ class TColumnPortionsAccumulator {
140184 sketchesByColumns.emplace (id, TCountMinSketch::Create ());
141185 }
142186
187+ THashMap<ui32, THashMap<TString, std::vector<TBlobRange>>> rangesByColumn;
188+
143189 for (const auto & [id, portionInfo] : result.GetPortions ()) {
144190 std::shared_ptr<NOlap::ISnapshotSchema> portionSchema = portionInfo.GetPortionInfo ().GetSchema (*VersionedIndex);
145191 for (const ui32 columnId : ColumnTagsRequested) {
@@ -151,26 +197,42 @@ class TColumnPortionsAccumulator {
151197 }
152198 AFL_VERIFY (indexMeta->GetColumnIds ().size () == 1 );
153199
154- const std::vector<TString> data = portionInfo.GetIndexInplaceDataVerified (indexMeta->GetIndexId ());
200+ if (!indexMeta->IsInplaceData ()) {
201+ portionInfo.FillBlobRangesByStorage (rangesByColumn, portionSchema->GetIndexInfo (), { columnId });
202+ } else {
203+ const std::vector<TString> data = portionInfo.GetIndexInplaceDataVerified (indexMeta->GetIndexId ());
155204
156- for (const auto & sketchAsString : data) {
157- auto sketch =
158- std::unique_ptr<TCountMinSketch>(TCountMinSketch::FromString (sketchAsString.data (), sketchAsString.size ()));
159- *sketchesByColumns[columnId] += *sketch;
205+ for (const auto & sketchAsString : data) {
206+ auto sketch =
207+ std::unique_ptr<TCountMinSketch>(TCountMinSketch::FromString (sketchAsString.data (), sketchAsString.size ()));
208+ *sketchesByColumns[columnId] += *sketch;
209+ }
160210 }
161211 }
162212 }
163- Result->AddResult (std::move (sketchesByColumns));
213+ if (rangesByColumn.size ()) {
214+ TBlobsAction blobsAction (StoragesManager, NBlobOperations::EConsumer::STATISTICS);
215+ for (auto && i : rangesByColumn) {
216+ for (auto && [storageId, ranges]: i.second ) {
217+ auto reader = blobsAction.GetReading (storageId);
218+ for (auto && i : ranges) {
219+ reader->AddRange (i);
220+ }
221+ }
222+ }
223+ TActorContext::AsActorContext ().Register (new NOlap::NBlobOperations::NRead::TActor (
224+ std::make_shared<TIndexReadTask>(Result, blobsAction.GetReadingActions (), rangesByColumn, sketchesByColumns)));
225+ } else {
226+ Result->AddResult (std::move (sketchesByColumns));
227+ }
164228 }
165229
166230 public:
167231 TMetadataSubscriber (
168232 const std::shared_ptr<TResultAccumulator>& result, const std::shared_ptr<NOlap::TVersionedIndex>& vIndex, const std::set<ui32>& tags)
169233 : Result(result)
170234 , VersionedIndex(vIndex)
171- , ColumnTagsRequested(tags)
172- {
173-
235+ , ColumnTagsRequested(tags) {
174236 }
175237 };
176238
@@ -183,7 +245,7 @@ class TColumnPortionsAccumulator {
183245 for (auto && i : Portions) {
184246 request->AddPortion (i);
185247 }
186- request->RegisterSubscriber (std::make_shared<TMetadataSubscriber>(Result, VersionedIndex, ColumnTagsRequested));
248+ request->RegisterSubscriber (std::make_shared<TMetadataSubscriber>(StoragesManager, Result, VersionedIndex, ColumnTagsRequested));
187249 Portions.clear ();
188250 DataAccessors->AskData (request);
189251 }
@@ -231,7 +293,8 @@ void TColumnShard::Handle(NStat::TEvStatistics::TEvStatisticsRequest::TPtr& ev,
231293 std::shared_ptr<TResultAccumulator> resultAccumulator =
232294 std::make_shared<TResultAccumulator>(columnTagsRequested, ev->Sender , ev->Cookie , std::move (response));
233295 auto versionedIndex = std::make_shared<NOlap::TVersionedIndex>(index.GetVersionedIndex ());
234- TColumnPortionsAccumulator portionsPack (resultAccumulator, 1000 , columnTagsRequested, versionedIndex, DataAccessorsManager.GetObjectPtrVerified ());
296+ TColumnPortionsAccumulator portionsPack (StoragesManager,
297+ resultAccumulator, 1000 , columnTagsRequested, versionedIndex, DataAccessorsManager.GetObjectPtrVerified ());
235298
236299 for (const auto & [_, portionInfo] : spg->GetPortions ()) {
237300 if (!portionInfo->IsVisible (GetMaxReadVersion ())) {
@@ -243,4 +306,4 @@ void TColumnShard::Handle(NStat::TEvStatistics::TEvStatisticsRequest::TPtr& ev,
243306 resultAccumulator->Start ();
244307}
245308
246- }
309+ } // namespace NKikimr::NColumnShard
0 commit comments