Skip to content

Commit a564e8e

Browse files
authored
optimize predicate comparison in CS (#27493)
1 parent b796797 commit a564e8e

File tree

26 files changed

+578
-520
lines changed

26 files changed

+578
-520
lines changed

ydb/core/formats/arrow/arrow_batch_builder.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
#include <ydb/core/kqp/common/kqp_types.h>
66
#include <ydb/core/kqp/common/result_set_format/kqp_result_set_arrow.h>
77

8+
#include <ydb/library/actors/core/log.h>
9+
810
#include <contrib/libs/apache/arrow/cpp/src/arrow/io/memory.h>
911
#include <contrib/libs/apache/arrow/cpp/src/arrow/ipc/reader.h>
1012

@@ -288,12 +290,15 @@ void TArrowBatchBuilder::AppendValue(const NUdf::TUnboxedValue& value, ui32 colN
288290
}
289291

290292
void TArrowBatchBuilder::AddRow(const TDbTupleRef& key, const TDbTupleRef& value) {
293+
AFL_VERIFY(key.ColumnCount + value.ColumnCount == YdbSchema.size())("key", key.ColumnCount)("value", value.ColumnCount)(
294+
"schema", YdbSchema.size());
291295
++NumRows;
292296

293297
auto fnAppendTuple = [&] (const TDbTupleRef& tuple, size_t offsetInRow) {
294298
for (size_t i = 0; i < tuple.ColumnCount; ++i) {
295299
auto ydbType = tuple.Types[i];
296300
const ui32 colNum = offsetInRow + i;
301+
AFL_VERIFY(colNum < YdbSchema.size())("column", colNum)("schema", YdbSchema.size());
297302
Y_ABORT_UNLESS(ydbType == YdbSchema[colNum].second);
298303
auto& cell = tuple.Columns[i];
299304
AppendCell(cell, colNum);

ydb/core/formats/arrow/arrow_batch_builder.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,9 @@ class TArrowBatchBuilder : public NKikimr::IBlockBuilder {
191191
size_t Bytes() const override {
192192
return NumBytes;
193193
}
194+
size_t Rows() const {
195+
return NumRows;
196+
}
194197

195198
arrow::Status Start(const std::vector<std::pair<TString, NScheme::TTypeInfo>>& columns);
196199
arrow::Status Start(const std::vector<std::pair<TString, NKikimr::NMiniKQL::TType*>>& columns);

ydb/core/formats/arrow/reader/position.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,8 +97,8 @@ std::optional<TSortableBatchPosition::TFoundPosition> TSortableBatchPosition::Fi
9797
}
9898

9999
NKikimr::NArrow::NMerger::TRWSortableBatchPosition TSortableBatchPosition::BuildRWPosition(const bool needData, const bool deepCopy) const {
100-
return TRWSortableBatchPosition(Position, RecordsCount, ReverseSort, deepCopy ? Sorting->BuildCopy(Position) : Sorting,
101-
(needData && Data) ? (deepCopy ? Data->BuildCopy(Position) : Data) : nullptr);
100+
return TRWSortableBatchPosition(Position, RecordsCount, ReverseSort, deepCopy ? Sorting->BuildCopy() : Sorting,
101+
(needData && Data) ? (deepCopy ? Data->BuildCopy() : Data) : nullptr);
102102
}
103103

104104
NKikimr::NArrow::NMerger::TRWSortableBatchPosition TSortableBatchPosition::BuildRWPosition(

ydb/core/formats/arrow/reader/position.h

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,17 @@ class TSortableScanData {
110110
return std::partial_ordering::equivalent;
111111
}
112112

113+
TSortableScanData(const ui64 start, const ui64 finish, const ui64 lastInit, const ui64 recordsCount,
114+
const std::vector<std::shared_ptr<NAccessor::IChunkedArray>>& columns, const std::vector<std::shared_ptr<arrow::Field>>& fields)
115+
: RecordsCount(recordsCount)
116+
, Columns(columns)
117+
, Fields(fields)
118+
, StartPosition(start)
119+
, FinishPosition(finish)
120+
, LastInit(lastInit)
121+
{
122+
}
123+
113124
public:
114125
TSortableScanData(const ui64 position, const std::shared_ptr<arrow::RecordBatch>& batch);
115126
TSortableScanData(const ui64 position, const std::shared_ptr<arrow::RecordBatch>& batch, const std::vector<std::string>& columns);
@@ -137,10 +148,17 @@ class TSortableScanData {
137148
return PositionAddress[colIdx].GetAddress().GetLocalIndex(pos);
138149
}
139150

140-
std::shared_ptr<TSortableScanData> BuildCopy(const ui64 /*position*/) const {
151+
std::shared_ptr<TSortableScanData> BuildCopy() const {
141152
return std::make_shared<TSortableScanData>(*this);
142153
}
143154

155+
std::shared_ptr<TSortableScanData> Trim(const ui64 numFields) const {
156+
AFL_VERIFY(numFields < Fields.size())("req", numFields)("self", Fields.size());
157+
return std::make_shared<TSortableScanData>(TSortableScanData(StartPosition, FinishPosition, LastInit, RecordsCount,
158+
std::vector<std::shared_ptr<NAccessor::IChunkedArray>>(Columns.begin(), Columns.begin() + numFields),
159+
std::vector<std::shared_ptr<arrow::Field>>(Fields.begin(), Fields.begin() + numFields)));
160+
}
161+
144162
TCursor BuildCursor(const ui64 position) const {
145163
if (Contains(position)) {
146164
return TCursor(position, PositionAddress);
@@ -460,6 +478,10 @@ class TSortableBatchPosition {
460478
return ApplyOptionalReverseForCompareResult(directResult);
461479
}
462480

481+
TSortableBatchPosition TrimSortingKeys(const ui64 numSortingColumns) const {
482+
return TSortableBatchPosition(Position, RecordsCount, ReverseSort, Sorting->Trim(numSortingColumns), Data);
483+
}
484+
463485
std::partial_ordering Compare(const TSortableScanData& data, const ui64 dataPosition) const {
464486
return Sorting->Compare(Position, data, dataPosition);
465487
}
@@ -475,6 +497,11 @@ class TSortableBatchPosition {
475497
bool operator!=(const TSortableBatchPosition& item) const {
476498
return Compare(item) != std::partial_ordering::equivalent;
477499
}
500+
501+
std::shared_ptr<arrow::Scalar> GetScalar(const ui32 colIdx) const {
502+
AFL_VERIFY(colIdx < Sorting->GetColumns().size())("req", colIdx)("size", Sorting->GetColumns().size());
503+
return Sorting->GetColumns()[colIdx]->GetScalar(Sorting->GetPositionInChunk(colIdx, Position));
504+
}
478505
};
479506

480507
class TIntervalPosition {

ydb/core/kqp/ut/olap/kqp_olap_ut.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1101,7 +1101,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
11011101

11021102
TLocalHelper(kikimr).CreateTestOlapTable();
11031103
auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NYDBTest::NColumnShard::TController>();
1104-
WriteTestData(kikimr, "/Root/olapStore/olapTable", 0, 1000000, 2000);
1104+
WriteTestData(kikimr, "/Root/olapStore/olapTable", 0, 1000000, 2001);
11051105

11061106
auto tableClient = kikimr.GetTableClient();
11071107
auto selectQuery = TString(R"(

ydb/core/tx/columnshard/columnshard__write.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -394,7 +394,7 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor
394394
}
395395
}
396396

397-
const auto inFlightLocksRangesBytes = NOlap::TPKRangeFilter::GetFiltersTotalMemorySize();
397+
const auto inFlightLocksRangesBytes = NOlap::TPKRangesFilter::GetFiltersTotalMemorySize();
398398
const ui64 inFlightLocksRangesBytesLimit = AppDataVerified().ColumnShardConfig.GetInFlightLocksRangesBytesLimit();
399399
if (behaviour == EOperationBehaviour::WriteWithLock && inFlightLocksRangesBytes > inFlightLocksRangesBytesLimit) {
400400
if (auto lock = OperationsManager->GetLockOptional(record.GetLockTxId()); lock) {

ydb/core/tx/columnshard/engines/predicate/container.cpp

Lines changed: 50 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,7 @@
55

66
namespace NKikimr::NOlap {
77
std::partial_ordering TPredicateContainer::ComparePredicatesSamePrefix(const NOlap::TPredicate& l, const NOlap::TPredicate& r) {
8-
Y_ABORT_UNLESS(l.Batch);
9-
Y_ABORT_UNLESS(r.Batch);
10-
Y_ABORT_UNLESS(l.Batch->num_columns());
11-
Y_ABORT_UNLESS(r.Batch->num_columns());
12-
Y_ABORT_UNLESS(l.Batch->num_rows() == r.Batch->num_rows());
13-
Y_ABORT_UNLESS(l.Batch->num_rows() == 1);
14-
const auto commonPrefixLength = std::min(l.Batch->columns().size(), r.Batch->columns().size());
15-
using NKikimr::NArrow::TRawReplaceKey;
16-
return TRawReplaceKey{&l.Batch->columns(), 0}.ComparePart<false>(TRawReplaceKey{&r.Batch->columns(), 0}, commonPrefixLength);
8+
return l.Batch.ComparePartial(r.Batch);
179
}
1810

1911
TString TPredicateContainer::DebugString() const {
@@ -31,16 +23,15 @@ int TPredicateContainer::MatchScalar(const ui32 columnIdx, const std::shared_ptr
3123
if (!s) {
3224
return 1;
3325
}
34-
if ((int)columnIdx >= Object->Batch->num_columns()) {
26+
if (columnIdx >= NumColumns()) {
3527
return 1;
3628
}
37-
auto c = Object->Batch->column(columnIdx);
38-
Y_ABORT_UNLESS(c);
29+
AFL_VERIFY(columnIdx < Object->Batch.GetSorting()->GetColumns().size());
30+
const auto& c = Object->Batch.GetSorting()->GetColumns()[columnIdx];
3931
auto sPredicate = c->GetScalar(0);
40-
Y_ABORT_UNLESS(sPredicate.ok());
41-
const int cmpResult = NArrow::ScalarCompare(*sPredicate, s);
32+
const int cmpResult = NArrow::ScalarCompare(sPredicate, s);
4233
if (cmpResult == 0) {
43-
switch (CompareType) {
34+
switch (GetCompareType()) {
4435
case NArrow::ECompareType::GREATER:
4536
case NArrow::ECompareType::LESS:
4637
return -1;
@@ -49,7 +40,7 @@ int TPredicateContainer::MatchScalar(const ui32 columnIdx, const std::shared_ptr
4940
return 0;
5041
}
5142
} else if (cmpResult == 1) {
52-
switch (CompareType) {
43+
switch (GetCompareType()) {
5344
case NArrow::ECompareType::GREATER:
5445
case NArrow::ECompareType::GREATER_OR_EQUAL:
5546
return -1;
@@ -59,7 +50,7 @@ int TPredicateContainer::MatchScalar(const ui32 columnIdx, const std::shared_ptr
5950
}
6051

6152
} else if (cmpResult == -1) {
62-
switch (CompareType) {
53+
switch (GetCompareType()) {
6354
case NArrow::ECompareType::GREATER:
6455
case NArrow::ECompareType::GREATER_OR_EQUAL:
6556
return 1;
@@ -72,23 +63,24 @@ int TPredicateContainer::MatchScalar(const ui32 columnIdx, const std::shared_ptr
7263
}
7364
}
7465

75-
const std::vector<TString>& TPredicateContainer::GetColumnNames() const {
76-
if (!ColumnNames) {
77-
if (Object) {
78-
ColumnNames = Object->ColumnNames();
79-
} else {
80-
ColumnNames = std::vector<TString>();
81-
}
66+
std::vector<std::string> TPredicateContainer::GetColumnNames() const {
67+
if (!Object) {
68+
return {};
8269
}
83-
return *ColumnNames;
70+
return Object->Batch.GetSorting()->GetFieldNames();
8471
}
8572

8673
bool TPredicateContainer::IsForwardInterval() const {
87-
return CompareType == NArrow::ECompareType::GREATER_OR_EQUAL || CompareType == NArrow::ECompareType::GREATER;
74+
return IsAll() || Object->IsFrom();
75+
}
76+
77+
bool TPredicateContainer::IsBackwardInterval() const {
78+
return IsAll() || Object->IsTo();
8879
}
8980

9081
bool TPredicateContainer::IsInclude() const {
91-
return CompareType == NArrow::ECompareType::GREATER_OR_EQUAL || CompareType == NArrow::ECompareType::LESS_OR_EQUAL;
82+
AFL_VERIFY(!IsAll());
83+
return GetCompareType() == NArrow::ECompareType::GREATER_OR_EQUAL || GetCompareType() == NArrow::ECompareType::LESS_OR_EQUAL;
9284
}
9385

9486
bool TPredicateContainer::CrossRanges(const TPredicateContainer& ext) const {
@@ -101,9 +93,9 @@ bool TPredicateContainer::CrossRanges(const TPredicateContainer& ext) const {
10193
return IsForwardInterval();
10294
} else if (result == std::partial_ordering::greater) {
10395
return ext.IsForwardInterval();
104-
} else if (Object->Batch->num_columns() == ext.Object->Batch->num_columns()) {
96+
} else if (NumColumns() == ext.NumColumns()) {
10597
return IsInclude() && ext.IsInclude();
106-
} else if (Object->Batch->num_columns() < ext.Object->Batch->num_columns()) {
98+
} else if (NumColumns() < ext.NumColumns()) {
10799
return IsInclude();
108100
} else {
109101
return ext.IsInclude();
@@ -113,10 +105,9 @@ bool TPredicateContainer::CrossRanges(const TPredicateContainer& ext) const {
113105
}
114106
}
115107

116-
TConclusion<NKikimr::NOlap::TPredicateContainer> TPredicateContainer::BuildPredicateFrom(
117-
std::shared_ptr<NOlap::TPredicate> object, const std::shared_ptr<arrow::Schema>& pkSchema) {
118-
if (!object || object->Empty()) {
119-
return TPredicateContainer(NArrow::ECompareType::GREATER_OR_EQUAL);
108+
TConclusion<NKikimr::NOlap::TPredicateContainer> TPredicateContainer::BuildPredicateFrom(std::optional<TPredicate> object) {
109+
if (!object) {
110+
return TPredicateContainer();
120111
} else {
121112
if (!object->Good()) {
122113
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "add_range_filter")("problem", "not good 'from' predicate");
@@ -126,32 +117,13 @@ TConclusion<NKikimr::NOlap::TPredicateContainer> TPredicateContainer::BuildPredi
126117
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "add_range_filter")("problem", "'from' predicate not is from");
127118
return TConclusionStatus::Fail("'from' predicate not is from");
128119
}
129-
if (pkSchema) {
130-
auto cNames = object->ColumnNames();
131-
i32 countSortingFields = 0;
132-
for (i32 i = 0; i < pkSchema->num_fields(); ++i) {
133-
if (i < (int)cNames.size() && cNames[i] == pkSchema->field(i)->name()) {
134-
++countSortingFields;
135-
} else {
136-
break;
137-
}
138-
}
139-
if (countSortingFields != object->Batch->num_columns()) {
140-
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "incorrect predicate")("count", countSortingFields)(
141-
"object", object->Batch->num_columns())("schema", pkSchema->ToString())(
142-
"object", JoinSeq(",", cNames));
143-
return TConclusionStatus::Fail(
144-
"incorrect predicate (not prefix for pk: " + pkSchema->ToString() + " vs " + JoinSeq(",", cNames) + ")");
145-
}
146-
}
147-
return TPredicateContainer(object, pkSchema ? ExtractKey(*object, pkSchema) : nullptr);
120+
return TPredicateContainer(std::move(object));
148121
}
149122
}
150123

151-
TConclusion<TPredicateContainer> TPredicateContainer::BuildPredicateTo(
152-
std::shared_ptr<TPredicate> object, const std::shared_ptr<arrow::Schema>& pkSchema) {
153-
if (!object || object->Empty()) {
154-
return TPredicateContainer(NArrow::ECompareType::LESS_OR_EQUAL);
124+
TConclusion<TPredicateContainer> TPredicateContainer::BuildPredicateTo(std::optional<TPredicate> object) {
125+
if (!object) {
126+
return TPredicateContainer();
155127
} else {
156128
if (!object->Good()) {
157129
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "add_range_filter")("problem", "not good 'to' predicate");
@@ -161,52 +133,34 @@ TConclusion<TPredicateContainer> TPredicateContainer::BuildPredicateTo(
161133
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "add_range_filter")("problem", "'to' predicate not is to");
162134
return TConclusionStatus::Fail("'to' predicate not is to");
163135
}
164-
if (pkSchema) {
165-
auto cNames = object->ColumnNames();
166-
i32 countSortingFields = 0;
167-
for (i32 i = 0; i < pkSchema->num_fields(); ++i) {
168-
if (i < (int)cNames.size() && cNames[i] == pkSchema->field(i)->name()) {
169-
++countSortingFields;
170-
} else {
171-
break;
172-
}
173-
}
174-
Y_ABORT_UNLESS(countSortingFields == object->Batch->num_columns());
175-
}
176-
return TPredicateContainer(object, pkSchema ? TPredicateContainer::ExtractKey(*object, pkSchema) : nullptr);
136+
return TPredicateContainer(object);
177137
}
178138
}
179139

180-
NArrow::TColumnFilter TPredicateContainer::BuildFilter(const std::shared_ptr<NArrow::TGeneralContainer>& data) const {
140+
std::optional<NArrow::NMerger::TSortableBatchPosition::TFoundPosition> TPredicateContainer::FindFirstIncluded(
141+
NArrow::NMerger::TRWSortableBatchPosition& begin) const {
142+
AFL_VERIFY(IsForwardInterval());
143+
AFL_VERIFY(begin.GetRecordsCount());
144+
181145
if (!Object) {
182-
auto result = NArrow::TColumnFilter::BuildAllowFilter();
183-
result.Add(true, data->GetRecordsCount());
184-
return result;
146+
return NArrow::NMerger::TSortableBatchPosition::TFoundPosition(begin.GetPosition(), std::partial_ordering::equivalent);
185147
}
186-
if (!data->GetRecordsCount()) {
187-
return NArrow::TColumnFilter::BuildAllowFilter();
188-
}
189-
auto sortingFields = Object->Batch->schema()->field_names();
190-
auto position = NArrow::NMerger::TRWSortableBatchPosition(data, 0, sortingFields, {}, false);
191-
const auto border = NArrow::NMerger::TSortableBatchPosition(Object->Batch, 0, sortingFields, {}, false);
192-
const bool needUppedBound = CompareType == NArrow::ECompareType::LESS_OR_EQUAL || CompareType == NArrow::ECompareType::GREATER;
193-
const auto findBound = position.FindBound(position, 0, data->GetRecordsCount() - 1, border, needUppedBound);
194-
const ui64 rowsBeforeBound = findBound ? findBound->GetPosition() : data->GetRecordsCount();
195-
196-
auto filter = NArrow::TColumnFilter::BuildAllowFilter();
197-
switch (CompareType) {
198-
case NArrow::ECompareType::LESS:
199-
case NArrow::ECompareType::LESS_OR_EQUAL:
200-
filter.Add(true, rowsBeforeBound);
201-
filter.Add(false, data->GetRecordsCount() - rowsBeforeBound);
202-
break;
203-
case NArrow::ECompareType::GREATER:
204-
case NArrow::ECompareType::GREATER_OR_EQUAL:
205-
filter.Add(false, rowsBeforeBound);
206-
filter.Add(true, data->GetRecordsCount() - rowsBeforeBound);
207-
break;
148+
149+
return NArrow::NMerger::TSortableBatchPosition::FindBound(
150+
begin, begin.GetPosition(), begin.GetRecordsCount() - 1, Object->Batch, !Object->IsInclusive());
151+
}
152+
153+
std::optional<NArrow::NMerger::TSortableBatchPosition::TFoundPosition> TPredicateContainer::FindFirstExcluded(
154+
NArrow::NMerger::TRWSortableBatchPosition& begin) const {
155+
AFL_VERIFY(IsBackwardInterval());
156+
AFL_VERIFY(begin.GetRecordsCount());
157+
158+
if (!Object) {
159+
return std::nullopt;
208160
}
209-
return filter;
161+
162+
return NArrow::NMerger::TSortableBatchPosition::FindBound(
163+
begin, begin.GetPosition(), begin.GetRecordsCount() - 1, Object->Batch, Object->IsInclusive());
210164
}
211165

212166
} // namespace NKikimr::NOlap

0 commit comments

Comments
 (0)