Skip to content

Commit a701bdb

Browse files
fix filter processing checker (#16476)
1 parent 61a872c commit a701bdb

File tree

9 files changed

+25
-23
lines changed

9 files changed

+25
-23
lines changed

ydb/core/formats/arrow/accessor/abstract/accessor.cpp

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -62,10 +62,10 @@ IChunkedArray::TFullDataAddress IChunkedArray::GetChunk(const std::optional<TAdd
6262
return TFullDataAddress(localAddress.GetArray(), std::move(addressChain));
6363
} else {
6464
auto chunkedArrayAddress = GetArray(chunkCurrent, position, nullptr);
65-
if (chunkCurrent) {
66-
AFL_VERIFY(chunkCurrent->GetSize() == 1 + chunkedArrayAddress.GetAddress().GetSize())("current", chunkCurrent->GetSize())(
67-
"chunked", chunkedArrayAddress.GetAddress().GetSize());
68-
}
65+
// if (chunkCurrent) {
66+
// AFL_VERIFY(chunkCurrent->GetSize() == chunkedArrayAddress.GetAddress().GetSize())("current", chunkCurrent->GetSize())(
67+
// "chunked", chunkedArrayAddress.GetAddress().GetSize());
68+
// }
6969
auto localAddress = chunkedArrayAddress.GetArray()->GetLocalData(address, chunkedArrayAddress.GetAddress().GetLocalIndex(position));
7070
auto fullAddress = std::move(chunkedArrayAddress.MutableAddress());
7171
fullAddress.Add(localAddress.GetAddress());
@@ -112,7 +112,7 @@ std::shared_ptr<IChunkedArray> IChunkedArray::DoApplyFilter(const TColumnFilter&
112112
auto schema = std::make_shared<arrow::Schema>(fields);
113113
auto table = arrow::Table::Make(schema, { arr }, GetRecordsCount());
114114
AFL_VERIFY(table->num_columns() == 1);
115-
AFL_VERIFY(filter.Apply(table));
115+
filter.Apply(table);
116116
if (table->column(0)->num_chunks() == 1) {
117117
return std::make_shared<TTrivialArray>(table->column(0)->chunk(0));
118118
} else {

ydb/core/formats/arrow/accessor/sub_columns/columns_storage.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ TColumnsData TColumnsData::ApplyFilter(const TColumnFilter& filter) const {
3131
return *this;
3232
}
3333
auto records = Records;
34-
AFL_VERIFY(filter.Apply(records));
34+
filter.Apply(records);
3535
if (records->GetRecordsCount()) {
3636
TDictStats::TBuilder builder;
3737
ui32 idx = 0;

ydb/core/formats/arrow/accessor/sub_columns/others_storage.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -253,7 +253,7 @@ std::shared_ptr<IChunkedArray> TOthersData::GetPathAccessor(const std::string_vi
253253
filter.Add(it.GetKeyIndex() == *idx);
254254
}
255255
auto recordsFiltered = Records;
256-
AFL_VERIFY(filter.Apply(recordsFiltered));
256+
filter.Apply(recordsFiltered);
257257
auto table = recordsFiltered->BuildTableVerified(std::set<std::string>({ "record_idx", "value" }));
258258

259259
TSparsedArray::TBuilder builder(nullptr, arrow::utf8());

ydb/core/formats/arrow/arrow_filter.cpp

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -363,9 +363,9 @@ NKikimr::NArrow::TColumnFilter TColumnFilter::MakePredicateFilter(
363363
}
364364

365365
template <class TData>
366-
bool ApplyImpl(const TColumnFilter& filter, std::shared_ptr<TData>& batch, const TColumnFilter::TApplyContext& context) {
366+
void ApplyImpl(const TColumnFilter& filter, std::shared_ptr<TData>& batch, const TColumnFilter::TApplyContext& context) {
367367
if (!batch || !batch->num_rows()) {
368-
return false;
368+
return;
369369
}
370370
if (!filter.IsEmpty()) {
371371
if (context.HasSlice()) {
@@ -380,10 +380,10 @@ bool ApplyImpl(const TColumnFilter& filter, std::shared_ptr<TData>& batch, const
380380
}
381381
if (filter.IsTotalDenyFilter()) {
382382
batch = NAdapter::TDataBuilderPolicy<TData>::GetEmptySame(batch);
383-
return true;
383+
return;
384384
}
385385
if (filter.IsTotalAllowFilter()) {
386-
return true;
386+
return;
387387
}
388388
if (context.GetTrySlices() && filter.GetFilter().size() * 10 < filter.GetRecordsCountVerified() &&
389389
filter.GetRecordsCountVerified() < filter.GetFilteredCountVerified() * 50) {
@@ -394,18 +394,17 @@ bool ApplyImpl(const TColumnFilter& filter, std::shared_ptr<TData>& batch, const
394394
} else {
395395
batch = NAdapter::TDataBuilderPolicy<TData>::ApplyArrowFilter(batch, filter);
396396
}
397-
return batch->num_rows();
398397
}
399398

400-
bool TColumnFilter::Apply(std::shared_ptr<TGeneralContainer>& batch, const TApplyContext& context) const {
399+
void TColumnFilter::Apply(std::shared_ptr<TGeneralContainer>& batch, const TApplyContext& context) const {
401400
return ApplyImpl(*this, batch, context);
402401
}
403402

404-
bool TColumnFilter::Apply(std::shared_ptr<arrow::Table>& batch, const TApplyContext& context) const {
403+
void TColumnFilter::Apply(std::shared_ptr<arrow::Table>& batch, const TApplyContext& context) const {
405404
return ApplyImpl(*this, batch, context);
406405
}
407406

408-
bool TColumnFilter::Apply(std::shared_ptr<arrow::RecordBatch>& batch, const TApplyContext& context) const {
407+
void TColumnFilter::Apply(std::shared_ptr<arrow::RecordBatch>& batch, const TApplyContext& context) const {
409408
return ApplyImpl(*this, batch, context);
410409
}
411410

ydb/core/formats/arrow/arrow_filter.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -289,9 +289,9 @@ class TColumnFilter {
289289
TApplyContext& Slice(const ui32 start, const ui32 count);
290290
};
291291

292-
[[nodiscard]] bool Apply(std::shared_ptr<TGeneralContainer>& batch, const TApplyContext& context = Default<TApplyContext>()) const;
293-
[[nodiscard]] bool Apply(std::shared_ptr<arrow::Table>& batch, const TApplyContext& context = Default<TApplyContext>()) const;
294-
[[nodiscard]] bool Apply(std::shared_ptr<arrow::RecordBatch>& batch, const TApplyContext& context = Default<TApplyContext>()) const;
292+
void Apply(std::shared_ptr<TGeneralContainer>& batch, const TApplyContext& context = Default<TApplyContext>()) const;
293+
void Apply(std::shared_ptr<arrow::Table>& batch, const TApplyContext& context = Default<TApplyContext>()) const;
294+
void Apply(std::shared_ptr<arrow::RecordBatch>& batch, const TApplyContext& context = Default<TApplyContext>()) const;
295295
void Apply(const ui32 expectedRecordsCount, std::vector<arrow::Datum*>& datums) const;
296296
[[nodiscard]] std::shared_ptr<NAccessor::IChunkedArray> Apply(
297297
const std::shared_ptr<NAccessor::IChunkedArray>& source, const TApplyContext& context = Default<TApplyContext>()) const;

ydb/core/formats/arrow/reader/merger.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ std::shared_ptr<arrow::Table> TMergePartialStream::SingleSourceDrain(const TSort
105105
*lastResultPosition = TCursor(keys, 0, SortSchema->field_names());
106106
}
107107
if (SortHeap.Current().GetFilter()) {
108-
AFL_VERIFY(SortHeap.Current().GetFilter()->Apply(result, TColumnFilter::TApplyContext(pos.GetPosition() + (include ? 0 : 1), resultSize)));
108+
SortHeap.Current().GetFilter()->Apply(result, TColumnFilter::TApplyContext(pos.GetPosition() + (include ? 0 : 1), resultSize));
109109
}
110110
} else {
111111
result = SortHeap.Current().GetKeyColumns().SliceData(startPos, resultSize);
@@ -114,7 +114,7 @@ std::shared_ptr<arrow::Table> TMergePartialStream::SingleSourceDrain(const TSort
114114
*lastResultPosition = TCursor(keys, keys->num_rows() - 1, SortSchema->field_names());
115115
}
116116
if (SortHeap.Current().GetFilter()) {
117-
AFL_VERIFY(SortHeap.Current().GetFilter()->Apply(result, TColumnFilter::TApplyContext(startPos, resultSize)));
117+
SortHeap.Current().GetFilter()->Apply(result, TColumnFilter::TApplyContext(startPos, resultSize));
118118
}
119119
}
120120
if (!result || !result->num_rows()) {

ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/fetching.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,10 @@ TConclusion<bool> TBuildResultStep::DoExecuteInplace(const std::shared_ptr<IData
159159
if (!source->GetStageResult().IsEmpty()) {
160160
resultBatch = source->GetStageResult().GetBatch()->BuildTableVerified(contextTableConstruct);
161161
if (auto filter = source->GetStageResult().GetNotAppliedFilter()) {
162-
AFL_VERIFY(filter->Apply(resultBatch, NArrow::TColumnFilter::TApplyContext(StartIndex, RecordsCount).SetTrySlices(true)));
162+
filter->Apply(resultBatch, NArrow::TColumnFilter::TApplyContext(StartIndex, RecordsCount).SetTrySlices(true));
163+
if (!resultBatch->num_rows()) {
164+
resultBatch = nullptr;
165+
}
163166
}
164167
}
165168

ydb/core/tx/columnshard/engines/reader/sys_view/abstract/iterator.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ TConclusion<std::shared_ptr<TPartialReadResult>> TStatsIteratorBase::GetBatch()
4545

4646
{
4747
NArrow::TColumnFilter filter = ReadMetadata->GetPKRangesFilter().BuildFilter(originalBatch);
48-
AFL_VERIFY(filter.Apply(originalBatch));
48+
filter.Apply(originalBatch);
4949
}
5050

5151
// Leave only requested columns

ydb/core/tx/columnshard/operations/batch_builder/merger.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ class TReplaceMerger: public IMerger {
6767

6868
virtual NArrow::TContainerWithIndexes<arrow::RecordBatch> BuildResultBatch() override {
6969
auto result = IncomingData;
70-
AFL_VERIFY(Filter.Apply(result.MutableContainer()));
70+
Filter.Apply(result.MutableContainer());
7171
return result;
7272
}
7373
};

0 commit comments

Comments
 (0)