Skip to content

Commit a2c3448

Browse files
fixes and speed up (#16436)
1 parent 7778718 commit a2c3448

File tree

7 files changed

+46
-27
lines changed

7 files changed

+46
-27
lines changed

ydb/core/tx/columnshard/engines/reader/common_reader/iterator/source.h

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -154,10 +154,10 @@ class IDataSource: public ICursorEntity, public NArrow::NSSA::IDataSource, publi
154154
virtual void DoAssembleColumns(const std::shared_ptr<TColumnsSet>& columns, const bool sequential) = 0;
155155

156156
NEvLog::TLogsThread Events;
157+
std::unique_ptr<TFetchedData> StageData;
157158

158159
protected:
159160
std::vector<std::shared_ptr<NGroupedMemoryManager::TAllocationGuard>> ResourceGuards;
160-
std::unique_ptr<TFetchedData> StageData;
161161
std::unique_ptr<TFetchedResult> StageResult;
162162

163163
public:
@@ -315,15 +315,31 @@ class IDataSource: public ICursorEntity, public NArrow::NSSA::IDataSource, publi
315315
return false;
316316
}
317317

318-
bool HasStageData() const {
319-
return !!StageData;
318+
void InitStageData(std::unique_ptr<TFetchedData>&& data) {
319+
AFL_VERIFY(!StageData);
320+
StageData = std::move(data);
321+
}
322+
323+
std::unique_ptr<TFetchedData> ExtractStageData() {
324+
AFL_VERIFY(StageData);
325+
auto result = std::move(StageData);
326+
StageData.reset();
327+
return std::move(result);
328+
}
329+
330+
void ClearStageData() {
331+
StageData.reset();
320332
}
321333

322334
const TFetchedData& GetStageData() const {
323335
AFL_VERIFY(StageData);
324336
return *StageData;
325337
}
326338

339+
bool HasStageData() const {
340+
return !!StageData;
341+
}
342+
327343
TFetchedData& MutableStageData() {
328344
AFL_VERIFY(StageData);
329345
return *StageData;

ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.cpp

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ void IDataSource::RegisterInterval(TFetchingInterval& interval, const std::share
2828
if (AtomicCas(&SourceStartedFlag, 1, 0)) {
2929
SetMemoryGroupId(interval.GetIntervalId());
3030
AFL_VERIFY(FetchingPlan);
31-
StageData = std::make_unique<TFetchedData>(GetExclusiveIntervalOnly(), GetRecordsCount());
31+
InitStageData(std::make_unique<TFetchedData>(GetExclusiveIntervalOnly(), GetRecordsCount()));
3232
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("InitFetchingPlan", FetchingPlan->DebugString())("source_idx", GetSourceIdx());
3333
NActors::TLogContextGuard logGuard(NActors::TLogContextBuilder::Build()("source", GetSourceIdx())("method", "InitFetchingPlan"));
3434
if (GetContext()->IsAborted()) {
@@ -64,15 +64,15 @@ void IDataSource::DoOnEmptyStageData(const std::shared_ptr<NCommon::IDataSource>
6464
StageResult = TFetchedResult::BuildEmpty();
6565
} else {
6666
StageResult = std::make_unique<TFetchedResult>(
67-
std::move(StageData), GetContext()->GetMergeColumns()->GetColumnIds(), *GetContext()->GetCommonContext()->GetResolver());
67+
ExtractStageData(), GetContext()->GetMergeColumns()->GetColumnIds(), *GetContext()->GetCommonContext()->GetResolver());
6868
}
69-
StageData.reset();
69+
ClearStageData();
7070
}
7171

7272
void IDataSource::DoBuildStageResult(const std::shared_ptr<NCommon::IDataSource>& /*sourcePtr*/) {
7373
TMemoryProfileGuard mpg("SCAN_PROFILE::STAGE_RESULT", IS_DEBUG_LOG_ENABLED(NKikimrServices::TX_COLUMNSHARD_SCAN_MEMORY));
74-
StageResult = std::make_unique<TFetchedResult>(std::move(StageData), *GetContext()->GetCommonContext()->GetResolver());
75-
StageData.reset();
74+
StageResult = std::make_unique<TFetchedResult>(ExtractStageData(), *GetContext()->GetCommonContext()->GetResolver());
75+
ClearStageData();
7676
}
7777

7878
void TPortionDataSource::NeedFetchColumns(const std::set<ui32>& columnIds, TBlobsAction& blobsAction,
@@ -111,15 +111,15 @@ bool TPortionDataSource::DoStartFetchingColumns(
111111
const std::shared_ptr<NCommon::IDataSource>& sourcePtr, const TFetchingScriptCursor& step, const TColumnsSetIds& columns) {
112112
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", step.GetName());
113113
AFL_VERIFY(columns.GetColumnsCount());
114-
AFL_VERIFY(!StageData->GetAppliedFilter() || !StageData->GetAppliedFilter()->IsTotalDenyFilter());
114+
AFL_VERIFY(!GetStageData().GetAppliedFilter() || !GetStageData().GetAppliedFilter()->IsTotalDenyFilter());
115115
auto& columnIds = columns.GetColumnIds();
116116
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", step.GetName())("fetching_info", step.DebugString());
117117

118118
TBlobsAction action(GetContext()->GetCommonContext()->GetStoragesManager(), NBlobOperations::EConsumer::SCAN);
119119
{
120120
THashMap<TChunkAddress, TPortionDataAccessor::TAssembleBlobInfo> nullBlocks;
121-
NeedFetchColumns(columnIds, action, nullBlocks, StageData->GetAppliedFilter());
122-
StageData->AddDefaults(std::move(nullBlocks));
121+
NeedFetchColumns(columnIds, action, nullBlocks, GetStageData().GetAppliedFilter());
122+
MutableStageData().AddDefaults(std::move(nullBlocks));
123123
}
124124

125125
auto readActions = action.GetReadingActions();
@@ -184,7 +184,7 @@ class TPortionAccessorFetchingSubscriber: public IDataAccessorRequestsSubscriber
184184
} // namespace
185185

186186
bool TPortionDataSource::DoStartFetchingAccessor(const std::shared_ptr<IDataSource>& sourcePtr, const TFetchingScriptCursor& step) {
187-
AFL_VERIFY(!StageData->HasPortionAccessor());
187+
AFL_VERIFY(!GetStageData().HasPortionAccessor());
188188
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", step.GetName())("fetching_info", step.DebugString());
189189

190190
std::shared_ptr<TDataAccessorsRequest> request = std::make_shared<TDataAccessorsRequest>("PLAIN::" + step.GetName());

ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,7 @@ class TPortionDataSource: public IDataSource {
239239
}
240240

241241
virtual bool NeedAccessorsFetching() const override {
242-
return !StageData || !StageData->HasPortionAccessor();
242+
return !HasStageData() || !GetStageData().HasPortionAccessor();
243243
}
244244

245245
virtual bool DoAddTxConflict() override {

ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ std::shared_ptr<TFetchingScript> TSpecialReadContext::DoGetColumnsFetchingPlan(c
1111
const bool needSnapshots = GetReadMetadata()->GetRequestSnapshot() < source->GetRecordSnapshotMax();
1212
const bool dontNeedColumns = !needSnapshots && GetFFColumns()->GetColumnIds().size() == 1 &&
1313
GetFFColumns()->GetColumnIds().contains(NOlap::NPortion::TSpecialColumns::SPEC_COL_PLAN_STEP_INDEX);
14-
if (!dontNeedColumns && !source->GetStageData().HasPortionAccessor()) {
14+
if (!dontNeedColumns && !source->HasStageData()) {
1515
if (!AskAccumulatorsScript) {
1616
NCommon::TFetchingScriptBuilder acc(*this);
1717
acc.AddStep(std::make_shared<NCommon::TAllocateMemoryStep>

ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.cpp

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ void IDataSource::InitFetchingPlan(const std::shared_ptr<TFetchingScript>& fetch
2727

2828
void IDataSource::StartProcessing(const std::shared_ptr<IDataSource>& sourcePtr) {
2929
AFL_VERIFY(!ProcessingStarted);
30+
InitStageData(std::make_unique<TFetchedData>(true, sourcePtr->GetRecordsCount()));
3031
AFL_VERIFY(FetchingPlan);
3132
ProcessingStarted = true;
3233
SourceGroupGuard = NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildGroupGuard(
@@ -58,7 +59,7 @@ void IDataSource::DoOnEmptyStageData(const std::shared_ptr<NCommon::IDataSource>
5859
ResourceGuards.clear();
5960
StageResult = TFetchedResult::BuildEmpty();
6061
StageResult->SetPages({ TPortionDataAccessor::TReadPage(0, GetRecordsCount(), 0) });
61-
StageData.reset();
62+
ClearStageData();
6263
}
6364

6465
void IDataSource::DoBuildStageResult(const std::shared_ptr<NCommon::IDataSource>& /*sourcePtr*/) {
@@ -68,14 +69,14 @@ void IDataSource::DoBuildStageResult(const std::shared_ptr<NCommon::IDataSource>
6869
void IDataSource::Finalize(const std::optional<ui64> memoryLimit) {
6970
TMemoryProfileGuard mpg("SCAN_PROFILE::STAGE_RESULT", IS_DEBUG_LOG_ENABLED(NKikimrServices::TX_COLUMNSHARD_SCAN_MEMORY));
7071
if (memoryLimit && !IsSourceInMemory()) {
71-
const auto accessor = StageData->GetPortionAccessor();
72-
StageResult = std::make_unique<TFetchedResult>(std::move(StageData), *GetContext()->GetCommonContext()->GetResolver());
72+
const auto accessor = GetStageData().GetPortionAccessor();
73+
StageResult = std::make_unique<TFetchedResult>(ExtractStageData(), *GetContext()->GetCommonContext()->GetResolver());
7374
StageResult->SetPages(accessor.BuildReadPages(*memoryLimit, GetContext()->GetProgramInputColumns()->GetColumnIds()));
7475
} else {
75-
StageResult = std::make_unique<TFetchedResult>(std::move(StageData), *GetContext()->GetCommonContext()->GetResolver());
76+
StageResult = std::make_unique<TFetchedResult>(ExtractStageData(), *GetContext()->GetCommonContext()->GetResolver());
7677
StageResult->SetPages({ TPortionDataAccessor::TReadPage(0, GetRecordsCount(), 0) });
7778
}
78-
StageData.reset();
79+
ClearStageData();
7980
}
8081

8182
void TPortionDataSource::NeedFetchColumns(const std::set<ui32>& columnIds, TBlobsAction& blobsAction,
@@ -114,15 +115,15 @@ bool TPortionDataSource::DoStartFetchingColumns(
114115
const std::shared_ptr<NCommon::IDataSource>& sourcePtr, const TFetchingScriptCursor& step, const TColumnsSetIds& columns) {
115116
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", step.GetName());
116117
AFL_VERIFY(columns.GetColumnsCount());
117-
AFL_VERIFY(!StageData->GetAppliedFilter() || !StageData->GetAppliedFilter()->IsTotalDenyFilter());
118+
AFL_VERIFY(!GetStageData().GetAppliedFilter() || !GetStageData().GetAppliedFilter()->IsTotalDenyFilter());
118119
auto& columnIds = columns.GetColumnIds();
119120
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", step.GetName())("fetching_info", step.DebugString());
120121

121122
TBlobsAction action(GetContext()->GetCommonContext()->GetStoragesManager(), NBlobOperations::EConsumer::SCAN);
122123
{
123124
THashMap<TChunkAddress, TPortionDataAccessor::TAssembleBlobInfo> nullBlocks;
124-
NeedFetchColumns(columnIds, action, nullBlocks, StageData->GetAppliedFilter());
125-
StageData->AddDefaults(std::move(nullBlocks));
125+
NeedFetchColumns(columnIds, action, nullBlocks, GetStageData().GetAppliedFilter());
126+
MutableStageData().AddDefaults(std::move(nullBlocks));
126127
}
127128

128129
auto readActions = action.GetReadingActions();
@@ -387,7 +388,7 @@ class TPortionAccessorFetchingSubscriber: public IDataAccessorRequestsSubscriber
387388
} // namespace
388389

389390
bool TPortionDataSource::DoStartFetchingAccessor(const std::shared_ptr<IDataSource>& sourcePtr, const TFetchingScriptCursor& step) {
390-
AFL_VERIFY(!StageData->HasPortionAccessor());
391+
AFL_VERIFY(!GetStageData().HasPortionAccessor());
391392
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", step.GetName())("fetching_info", step.DebugString());
392393

393394
std::shared_ptr<TDataAccessorsRequest> request = std::make_shared<TDataAccessorsRequest>("SIMPLE::" + step.GetName());

ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.h

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ class IDataSource: public NCommon::IDataSource {
157157
}
158158

159159
virtual void ClearResult() {
160-
StageData.reset();
160+
ClearStageData();
161161
StageResult.reset();
162162
ResourceGuards.clear();
163163
SourceGroupGuard = nullptr;
@@ -264,12 +264,11 @@ class IDataSource: public NCommon::IDataSource {
264264
: TBase(sourceId, sourceIdx, context, recordSnapshotMin, recordSnapshotMax, recordsCount, shardingVersion, hasDeletions)
265265
, Start(context->GetReadMetadata()->IsDescSorted() ? finish : start, context->GetReadMetadata()->IsDescSorted())
266266
, Finish(context->GetReadMetadata()->IsDescSorted() ? start : finish, context->GetReadMetadata()->IsDescSorted()) {
267-
StageData = std::make_unique<TFetchedData>(true, recordsCount);
268267
UsageClass = GetContext()->GetReadMetadata()->GetPKRangesFilter().GetUsageClass(start, finish);
269268
AFL_VERIFY(UsageClass != TPKRangeFilter::EUsageClass::NoUsage);
270269
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "portions_for_merge")("start", Start.DebugString())(
271270
"finish", Finish.DebugString());
272-
Y_ABORT_UNLESS(Start.Compare(Finish) != std::partial_ordering::greater);
271+
AFL_VERIFY_DEBUG(Start.Compare(Finish) != std::partial_ordering::greater);
273272
}
274273

275274
virtual ~IDataSource() = default;
@@ -286,7 +285,7 @@ class TPortionDataSource: public IDataSource {
286285

287286
virtual void InitUsedRawBytes() override {
288287
AFL_VERIFY(!UsedRawBytes);
289-
UsedRawBytes = StageData->GetPortionAccessor().GetColumnRawBytes(GetContext()->GetAllUsageColumns()->GetColumnIds(), false);
288+
UsedRawBytes = GetStageData().GetPortionAccessor().GetColumnRawBytes(GetContext()->GetAllUsageColumns()->GetColumnIds(), false);
290289
}
291290

292291
virtual bool DoStartFetchingColumns(

ydb/core/tx/columnshard/engines/storage/indexes/skip_index/meta.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,9 @@ class TSkipBitmapIndex: public TSkipIndex {
4747

4848
virtual bool DoCheckValue(const TString& data, const std::optional<ui64> cat, const std::shared_ptr<arrow::Scalar>& value,
4949
const EOperation op) const override final {
50+
if (data.empty()) {
51+
return false;
52+
}
5053
auto storageConclusion = BitsStorageConstructor->Build(data);
5154
return DoCheckValueImpl(*storageConclusion.GetResult(), cat, value, op);
5255
}

0 commit comments

Comments
 (0)