@@ -26,6 +26,176 @@ void TColumnShard::Handle(NStat::TEvStatistics::TEvAnalyzeTable::TPtr& ev, const
2626 Send (ev->Sender , response.release (), 0 , ev->Cookie );
2727}
2828
29+ class TResultAccumulator {
30+ private:
31+ TMutex Mutex;
32+ THashMap<ui32, std::unique_ptr<TCountMinSketch>> Calculated;
33+ TAtomicCounter ResultsCount = 0 ;
34+ TAtomicCounter WaitingCount = 0 ;
35+ const NActors::TActorId RequestSenderActorId;
36+ bool Started = false ;
37+ const ui64 Cookie;
38+ std::unique_ptr<NStat::TEvStatistics::TEvStatisticsResponse> Response;
39+ bool Replied = false ;
40+
41+ void OnResultReady () {
42+ AFL_VERIFY (!Replied);
43+ Replied = true ;
44+ auto & respRecord = Response->Record ;
45+ respRecord.SetStatus (NKikimrStat::TEvStatisticsResponse::STATUS_SUCCESS);
46+
47+ for (auto && [columnTag, sketch] : Calculated) {
48+ if (!sketch) {
49+ continue ;
50+ }
51+
52+ auto * column = respRecord.AddColumns ();
53+ column->SetTag (columnTag);
54+ auto * statistic = column->AddStatistics ();
55+ statistic->SetType (NStat::COUNT_MIN_SKETCH);
56+ statistic->SetData (TString (sketch->AsStringBuf ()));
57+ }
58+
59+ NActors::TActivationContext::Send (RequestSenderActorId, std::move (Response), 0 , Cookie);
60+ }
61+
62+ public:
63+ TResultAccumulator (const std::set<ui32>& tags, const NActors::TActorId& requestSenderActorId, const ui64 cookie,
64+ std::unique_ptr<NStat::TEvStatistics::TEvStatisticsResponse>&& response)
65+ : RequestSenderActorId(requestSenderActorId)
66+ , Cookie(cookie)
67+ , Response(std::move(response))
68+ {
69+ for (auto && i : tags) {
70+ AFL_VERIFY (Calculated.emplace (i, nullptr ).second );
71+ }
72+ }
73+
74+ void AddResult (THashMap<ui32, std::unique_ptr<TCountMinSketch>>&& sketch) {
75+ {
76+ TGuard<TMutex> g (Mutex);
77+ for (auto && i : sketch) {
78+ auto it = Calculated.find (i.first );
79+ AFL_VERIFY (it != Calculated.end ());
80+ if (!it->second ) {
81+ it->second = std::move (i.second );
82+ } else {
83+ *it->second += *i.second ;
84+ }
85+ }
86+ }
87+ const i64 count = ResultsCount.Inc ();
88+ if (count == WaitingCount.Val ()) {
89+ OnResultReady ();
90+ } else {
91+ AFL_VERIFY (count < WaitingCount.Val ());
92+ }
93+ }
94+
95+ void AddWaitingTask () {
96+ AFL_VERIFY (!Started);
97+ WaitingCount.Inc ();
98+ }
99+
100+ void Start () {
101+ AFL_VERIFY (!Started);
102+ Started = true ;
103+ if (WaitingCount.Val () == ResultsCount.Val ()) {
104+ OnResultReady ();
105+ }
106+ }
107+
108+ };
109+
110+ class TColumnPortionsAccumulator {
111+ private:
112+ const std::set<ui32> ColumnTagsRequested;
113+ std::vector<NOlap::TPortionInfo::TConstPtr> Portions;
114+ const ui32 PortionsCountLimit = 10000 ;
115+ std::shared_ptr<NOlap::NDataAccessorControl::IDataAccessorsManager> DataAccessors;
116+ std::shared_ptr<TResultAccumulator> Result;
117+ const std::shared_ptr<NOlap::TVersionedIndex> VersionedIndex;
118+
119+ public:
120+ TColumnPortionsAccumulator (const std::shared_ptr<TResultAccumulator>& result, const ui32 portionsCountLimit,
121+ const std::set<ui32>& originalColumnTags, const std::shared_ptr<NOlap::TVersionedIndex>& vIndex,
122+ const std::shared_ptr<NOlap::NDataAccessorControl::IDataAccessorsManager>& dataAccessorsManager)
123+ : ColumnTagsRequested(originalColumnTags)
124+ , PortionsCountLimit(portionsCountLimit)
125+ , DataAccessors(dataAccessorsManager)
126+ , Result(result)
127+ , VersionedIndex(vIndex)
128+ {
129+ }
130+
131+ class TMetadataSubscriber : public NOlap ::IDataAccessorRequestsSubscriber {
132+ private:
133+ const std::shared_ptr<TResultAccumulator> Result;
134+ std::shared_ptr<NOlap::TVersionedIndex> VersionedIndex;
135+ const std::set<ui32> ColumnTagsRequested;
136+
137+ virtual void DoOnRequestsFinished (NOlap::TDataAccessorsResult&& result) override {
138+ THashMap<ui32, std::unique_ptr<TCountMinSketch>> sketchesByColumns;
139+ for (auto id : ColumnTagsRequested) {
140+ sketchesByColumns.emplace (id, TCountMinSketch::Create ());
141+ }
142+
143+ for (const auto & portionInfo : result.GetPortions ()) {
144+ std::shared_ptr<NOlap::ISnapshotSchema> portionSchema = portionInfo.GetPortionInfo ().GetSchema (*VersionedIndex);
145+ for (const ui32 columnId : ColumnTagsRequested) {
146+ auto indexMeta = portionSchema->GetIndexInfo ().GetIndexMetaCountMinSketch ({ columnId });
147+
148+ if (!indexMeta) {
149+ AFL_WARN (NKikimrServices::TX_COLUMNSHARD)(" error" , " Missing countMinSketch index for columnId " + ToString (columnId));
150+ continue ;
151+ }
152+ AFL_VERIFY (indexMeta->GetColumnIds ().size () == 1 );
153+
154+ const std::vector<TString> data = portionInfo.GetIndexInplaceDataVerified (indexMeta->GetIndexId ());
155+
156+ for (const auto & sketchAsString : data) {
157+ auto sketch =
158+ std::unique_ptr<TCountMinSketch>(TCountMinSketch::FromString (sketchAsString.data (), sketchAsString.size ()));
159+ *sketchesByColumns[columnId] += *sketch;
160+ }
161+ }
162+ }
163+ Result->AddResult (std::move (sketchesByColumns));
164+ }
165+
166+ public:
167+ TMetadataSubscriber (
168+ const std::shared_ptr<TResultAccumulator>& result, const std::shared_ptr<NOlap::TVersionedIndex>& vIndex, const std::set<ui32>& tags)
169+ : Result(result)
170+ , VersionedIndex(vIndex)
171+ , ColumnTagsRequested(tags)
172+ {
173+
174+ }
175+ };
176+
177+ void Flush () {
178+ if (!Portions.size ()) {
179+ return ;
180+ }
181+ Result->AddWaitingTask ();
182+ std::shared_ptr<NOlap::TDataAccessorsRequest> request = std::make_shared<NOlap::TDataAccessorsRequest>();
183+ for (auto && i : Portions) {
184+ request->AddPortion (i);
185+ }
186+ request->RegisterSubscriber (std::make_shared<TMetadataSubscriber>(Result, VersionedIndex, ColumnTagsRequested));
187+ Portions.clear ();
188+ DataAccessors->AskData (request);
189+ }
190+
191+ void AddTask (const NOlap::TPortionInfo::TConstPtr& portion) {
192+ Portions.emplace_back (portion);
193+ if (Portions.size () >= PortionsCountLimit) {
194+ Flush ();
195+ }
196+ }
197+ };
198+
29199void TColumnShard::Handle (NStat::TEvStatistics::TEvStatisticsRequest::TPtr& ev, const TActorContext&) {
30200 const auto & record = ev->Get ()->Record ;
31201
@@ -57,45 +227,20 @@ void TColumnShard::Handle(NStat::TEvStatistics::TEvStatisticsRequest::TPtr& ev,
57227 columnTagsRequested = std::set<ui32>(allColumnIds.begin (), allColumnIds.end ());
58228 }
59229
60- std::map<ui32, std::unique_ptr<TCountMinSketch>> sketchesByColumns;
61- for (auto id : columnTagsRequested) {
62- sketchesByColumns.emplace (id, TCountMinSketch::Create ());
63- }
230+ NOlap::TDataAccessorsRequest request;
231+ std::shared_ptr<TResultAccumulator> resultAccumulator =
232+ std::make_shared<TResultAccumulator>(columnTagsRequested, ev->Sender , ev->Cookie , std::move (response));
233+ auto versionedIndex = std::make_shared<NOlap::TVersionedIndex>(index.GetVersionedIndex ());
234+ TColumnPortionsAccumulator portionsPack (resultAccumulator, 1000 , columnTagsRequested, versionedIndex, DataAccessorsManager.GetObjectPtrVerified ());
64235
65236 for (const auto & [_, portionInfo] : spg->GetPortions ()) {
66- if (portionInfo->IsVisible (GetMaxReadVersion ())) {
67- std::shared_ptr<NOlap::ISnapshotSchema> portionSchema = portionInfo->GetSchema (index.GetVersionedIndex ());
68- for (ui32 columnId : columnTagsRequested) {
69- auto indexMeta = portionSchema->GetIndexInfo ().GetIndexMetaCountMinSketch ({columnId});
70-
71- if (!indexMeta) {
72- AFL_WARN (NKikimrServices::TX_COLUMNSHARD)(" error" , " Missing countMinSketch index for columnId " + ToString (columnId));
73- continue ;
74- }
75- AFL_VERIFY (indexMeta->GetColumnIds ().size () == 1 );
76-
77- const std::vector<TString> data = portionInfo->GetIndexInplaceDataVerified (indexMeta->GetIndexId ());
78-
79- for (const auto & sketchAsString : data) {
80- auto sketch = std::unique_ptr<TCountMinSketch>(TCountMinSketch::FromString (sketchAsString.data (), sketchAsString.size ()));
81- *sketchesByColumns[columnId] += *sketch;
82- }
83- }
237+ if (!portionInfo->IsVisible (GetMaxReadVersion ())) {
238+ continue ;
84239 }
240+ portionsPack.AddTask (portionInfo);
85241 }
86-
87- respRecord.SetStatus (NKikimrStat::TEvStatisticsResponse::STATUS_SUCCESS);
88-
89- for (ui32 columnTag : columnTagsRequested) {
90- auto * column = respRecord.AddColumns ();
91- column->SetTag (columnTag);
92-
93- auto * statistic = column->AddStatistics ();
94- statistic->SetType (NStat::COUNT_MIN_SKETCH);
95- statistic->SetData (TString (sketchesByColumns[columnTag]->AsStringBuf ()));
96- }
97-
98- Send (ev->Sender , response.release (), 0 , ev->Cookie );
242+ portionsPack.Flush ();
243+ resultAccumulator->Start ();
99244}
100245
101246}
0 commit comments