Skip to content

Commit defb4b2

Browse files
Merge 7d89479 into 1a1e14a
2 parents 1a1e14a + 7d89479 commit defb4b2

File tree

6 files changed

+167
-59
lines changed

6 files changed

+167
-59
lines changed

ydb/core/formats/arrow/accessor/abstract/accessor.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ class IChunkedArray {
7676
}
7777

7878
ui32 GetLocalIndex(const ui32 position) const {
79-
AFL_VERIFY(Contains(position));
79+
AFL_VERIFY(Contains(position))("pos", position)("start", GlobalStartPosition);
8080
return position - GlobalStartPosition;
8181
}
8282

ydb/core/formats/arrow/accessor/sparsed/accessor.h

+10-10
Original file line numberDiff line numberDiff line change
@@ -137,16 +137,16 @@ class TSparsedArray: public IChunkedArray {
137137
return chunk.GetScalar(index - chunk.GetStartPosition());
138138
}
139139

140-
TSparsedArrayChunk GetSparsedChunk(const ui64 position) const {
141-
ui32 currentIdx = 0;
142-
for (ui32 i = 0; i < Records.size(); ++i) {
143-
if (currentIdx <= position && position < currentIdx + Records[i].GetRecordsCount()) {
144-
return Records[i];
145-
}
146-
currentIdx += Records[i].GetRecordsCount();
147-
}
148-
AFL_VERIFY(false);
149-
return Records.back();
140+
const TSparsedArrayChunk& GetSparsedChunk(const ui64 position) const {
141+
const auto pred = [](const ui64 position, const TSparsedArrayChunk& item) {
142+
return position < item.GetStartPosition();
143+
};
144+
auto it = std::upper_bound(Records.begin(), Records.end(), position, pred);
145+
AFL_VERIFY(it != Records.begin());
146+
--it;
147+
AFL_VERIFY(position < it->GetStartPosition() + it->GetRecordsCount());
148+
AFL_VERIFY(it->GetStartPosition() <= position);
149+
return *it;
150150
}
151151

152152
class TBuilder {

ydb/core/formats/arrow/arrow_helpers.cpp

+32
Original file line numberDiff line numberDiff line change
@@ -589,6 +589,38 @@ bool ScalarLess(const arrow::Scalar& x, const arrow::Scalar& y) {
589589
return ScalarCompare(x, y) < 0;
590590
}
591591

592+
bool ColumnEqualsScalar(
593+
const std::shared_ptr<arrow::Array>& c, const ui32 position, const std::shared_ptr<arrow::Scalar>& s) {
594+
AFL_VERIFY(c);
595+
if (!s) {
596+
return c->IsNull(position) ;
597+
}
598+
AFL_VERIFY(c->type()->Equals(s->type))("s", s->type->ToString())("c", c->type()->ToString());
599+
600+
return SwitchTypeImpl<bool, 0>(c->type()->id(), [&](const auto& type) {
601+
using TWrap = std::decay_t<decltype(type)>;
602+
using TScalar = typename arrow::TypeTraits<typename TWrap::T>::ScalarType;
603+
using TArrayType = typename arrow::TypeTraits<typename TWrap::T>::ArrayType;
604+
using TValue = std::decay_t<decltype(static_cast<const TScalar&>(*s).value)>;
605+
606+
if constexpr (arrow::has_string_view<typename TWrap::T>()) {
607+
const auto& cval = static_cast<const TArrayType&>(*c).GetView(position);
608+
const auto& sval = static_cast<const TScalar&>(*s).value;
609+
AFL_VERIFY(sval);
610+
TStringBuf cBuf(reinterpret_cast<const char*>(cval.data()), cval.size());
611+
TStringBuf sBuf(reinterpret_cast<const char*>(sval->data()), sval->size());
612+
return cBuf == sBuf;
613+
}
614+
if constexpr (std::is_arithmetic_v<TValue>) {
615+
const auto cval = static_cast<const TArrayType&>(*c).GetView(position);
616+
const auto sval = static_cast<const TScalar&>(*s).value;
617+
return (cval == sval);
618+
}
619+
Y_ABORT_UNLESS(false); // TODO: non primitive types
620+
return false;
621+
});
622+
}
623+
592624
int ScalarCompare(const arrow::Scalar& x, const arrow::Scalar& y) {
593625
Y_VERIFY_S(x.type->Equals(y.type), x.type->ToString() + " vs " + y.type->ToString());
594626

ydb/core/formats/arrow/arrow_helpers.h

+4-1
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,10 @@ bool IsGoodScalar(const std::shared_ptr<arrow::Scalar>& x);
9999
int ScalarCompare(const arrow::Scalar& x, const arrow::Scalar& y);
100100
int ScalarCompare(const std::shared_ptr<arrow::Scalar>& x, const std::shared_ptr<arrow::Scalar>& y);
101101
int ScalarCompareNullable(const std::shared_ptr<arrow::Scalar>& x, const std::shared_ptr<arrow::Scalar>& y);
102-
std::partial_ordering ColumnsCompare(const std::vector<std::shared_ptr<arrow::Array>>& x, const ui32 xRow, const std::vector<std::shared_ptr<arrow::Array>>& y, const ui32 yRow);
102+
std::partial_ordering ColumnsCompare(
103+
const std::vector<std::shared_ptr<arrow::Array>>& x, const ui32 xRow, const std::vector<std::shared_ptr<arrow::Array>>& y, const ui32 yRow);
104+
bool ColumnEqualsScalar(
105+
const std::shared_ptr<arrow::Array>& c, const ui32 position, const std::shared_ptr<arrow::Scalar>& s);
103106
bool ScalarLess(const std::shared_ptr<arrow::Scalar>& x, const std::shared_ptr<arrow::Scalar>& y);
104107
bool ScalarLess(const arrow::Scalar& x, const arrow::Scalar& y);
105108
std::shared_ptr<arrow::RecordBatch> ReallocateBatch(std::shared_ptr<arrow::RecordBatch> original);

ydb/core/tx/columnshard/engines/changes/compaction/sparsed/logic.cpp

+54-42
Original file line numberDiff line numberDiff line change
@@ -63,64 +63,76 @@ TSparsedMerger::TWriter::TWriter(const TColumnMergeContext& context)
6363
IndexBuilderImpl = (arrow::UInt32Builder*)(IndexBuilder.get());
6464
}
6565

66-
bool TSparsedMerger::TCursor::AddIndexTo(const ui32 index, TWriter& writer) {
66+
bool TSparsedMerger::TPlainChunkCursor::AddIndexTo(const ui32 index, TWriter& writer, const TColumnMergeContext& context) {
67+
if (ChunkFinishPosition <= index) {
68+
InitArrays(index);
69+
}
70+
AFL_VERIFY(ChunkStartPosition <= index);
71+
if (NArrow::ColumnEqualsScalar(ChunkAddress->GetArray(), index - ChunkStartPosition, context.GetLoader()->GetDefaultValue())) {
72+
return false;
73+
} else {
74+
writer.AddRealData(ChunkAddress->GetArray(), index - ChunkStartPosition);
75+
return true;
76+
}
77+
}
78+
79+
bool TSparsedMerger::TSparsedChunkCursor::AddIndexTo(const ui32 index, TWriter& writer, const TColumnMergeContext& /*context*/) {
80+
AFL_VERIFY(ChunkStartGlobalPosition <= index);
6781
if (index < NextGlobalPosition) {
6882
return false;
69-
} else if (index == NextGlobalPosition) {
70-
if (index == CommonShift + Chunk->GetRecordsCount()) {
83+
} else {
84+
if (FinishGlobalPosition <= index) {
7185
InitArrays(index);
72-
if (index != NextGlobalPosition) {
86+
}
87+
if (index == NextGlobalPosition) {
88+
writer.AddRealData(Chunk->GetColValue(), NextLocalPosition);
89+
if (++NextLocalPosition < Chunk->GetNotDefaultRecordsCount()) {
90+
NextGlobalPosition = ChunkStartGlobalPosition + Chunk->GetIndexUnsafeFast(NextLocalPosition);
91+
return true;
92+
} else {
93+
NextGlobalPosition = ChunkStartGlobalPosition + Chunk->GetRecordsCount();
7394
return false;
7495
}
75-
}
76-
writer.AddRealData(Chunk->GetColValue(), NextLocalPosition);
77-
if (++NextLocalPosition < Chunk->GetNotDefaultRecordsCount()) {
78-
NextGlobalPosition = CommonShift + Chunk->GetIndexUnsafeFast(NextLocalPosition);
79-
return true;
8096
} else {
81-
NextGlobalPosition = CommonShift + Chunk->GetRecordsCount();
97+
bool found = false;
98+
for (; NextLocalPosition < Chunk->GetNotDefaultRecordsCount(); ++NextLocalPosition) {
99+
NextGlobalPosition = ChunkStartGlobalPosition + Chunk->GetIndexUnsafeFast(NextLocalPosition);
100+
if (NextGlobalPosition == index) {
101+
writer.AddRealData(Chunk->GetColValue(), NextLocalPosition);
102+
found = true;
103+
} else if (index < NextGlobalPosition) {
104+
return found;
105+
}
106+
}
107+
NextGlobalPosition = ChunkStartGlobalPosition + Chunk->GetRecordsCount();
82108
return false;
83109
}
84110
}
85-
AFL_VERIFY(Chunk->GetStartPosition() <= index);
86-
if (CommonShift + Chunk->GetRecordsCount() <= index) {
111+
}
112+
113+
bool TSparsedMerger::TCursor::AddIndexTo(const ui32 index, TWriter& writer) {
114+
if (FinishGlobalPosition <= index) {
87115
InitArrays(index);
88116
}
89-
bool found = false;
90-
for (; NextLocalPosition < Chunk->GetNotDefaultRecordsCount(); ++NextLocalPosition) {
91-
NextGlobalPosition = CommonShift + Chunk->GetIndexUnsafeFast(NextLocalPosition);
92-
if (NextGlobalPosition == index) {
93-
writer.AddRealData(Chunk->GetColValue(), NextLocalPosition);
94-
found = true;
95-
} else if (index < NextGlobalPosition) {
96-
return found;
97-
}
117+
if (SparsedCursor) {
118+
return SparsedCursor->AddIndexTo(index, writer, Context);
119+
} else {
120+
return PlainCursor->AddIndexTo(index, writer, Context);
98121
}
99-
NextGlobalPosition = CommonShift + Chunk->GetRecordsCount();
100-
return false;
101122
}
102123

103124
void TSparsedMerger::TCursor::InitArrays(const ui32 position) {
104-
if (!CurrentOwnedArray || !CurrentOwnedArray->GetAddress().Contains(position)) {
105-
CurrentOwnedArray = Array->GetArray(CurrentOwnedArray, position, Array);
106-
if (CurrentOwnedArray->GetArray()->GetType() == NArrow::NAccessor::IChunkedArray::EType::SparsedArray) {
107-
CurrentSparsedArray = static_pointer_cast<NArrow::NAccessor::TSparsedArray>(CurrentOwnedArray->GetArray());
108-
} else {
109-
CurrentSparsedArray = make_shared<NArrow::NAccessor::TSparsedArray>(*CurrentOwnedArray->GetArray(), Context.GetDefaultValue());
110-
}
111-
Chunk.reset();
112-
}
113-
if (!Chunk || Chunk->GetFinishPosition() <= position) {
114-
Chunk = CurrentSparsedArray->GetSparsedChunk(CurrentOwnedArray->GetAddress().GetLocalIndex(position));
115-
AFL_VERIFY(Chunk->GetRecordsCount());
116-
AFL_VERIFY(CurrentOwnedArray->GetAddress().GetGlobalStartPosition() + Chunk->GetStartPosition() <= position &&
117-
position < CurrentOwnedArray->GetAddress().GetGlobalStartPosition() + Chunk->GetFinishPosition())
118-
("pos", position)("start", Chunk->GetStartPosition())("finish", Chunk->GetFinishPosition())(
119-
"shift", CurrentOwnedArray->GetAddress().GetGlobalStartPosition());
125+
AFL_VERIFY(!CurrentOwnedArray || !CurrentOwnedArray->GetAddress().Contains(position));
126+
CurrentOwnedArray = Array->GetArray(CurrentOwnedArray, position, Array);
127+
if (CurrentOwnedArray->GetArray()->GetType() == NArrow::NAccessor::IChunkedArray::EType::SparsedArray) {
128+
auto sparsedArray = static_pointer_cast<NArrow::NAccessor::TSparsedArray>(CurrentOwnedArray->GetArray());
129+
SparsedCursor = std::make_shared<TSparsedChunkCursor>(sparsedArray, &*CurrentOwnedArray);
130+
PlainCursor = nullptr;
131+
} else {
132+
PlainCursor = make_shared<TPlainChunkCursor>(CurrentOwnedArray->GetArray(), &*CurrentOwnedArray);
133+
SparsedCursor = nullptr;
120134
}
121-
CommonShift = CurrentOwnedArray->GetAddress().GetGlobalStartPosition() + Chunk->GetStartPosition();
122-
NextGlobalPosition = CurrentOwnedArray->GetAddress().GetGlobalStartPosition() + Chunk->GetFirstIndexNotDefault();
123-
NextLocalPosition = 0;
135+
FinishGlobalPosition = CurrentOwnedArray->GetAddress().GetGlobalStartPosition() + CurrentOwnedArray->GetArray()->GetRecordsCount();
124136
}
125137

126138
} // namespace NKikimr::NOlap::NCompaction

ydb/core/tx/columnshard/engines/changes/compaction/sparsed/logic.h

+66-5
Original file line numberDiff line numberDiff line change
@@ -41,15 +41,76 @@ class TSparsedMerger: public IColumnMerger {
4141
TColumnPortionResult Flush();
4242
};
4343

44-
class TCursor {
44+
class TPlainChunkCursor {
45+
private:
46+
std::shared_ptr<NArrow::NAccessor::IChunkedArray> CurrentChunkedArray;
47+
std::optional<NArrow::NAccessor::IChunkedArray::TFullDataAddress> ChunkAddress;
48+
const NArrow::NAccessor::IChunkedArray::TFullChunkedArrayAddress* CurrentOwnedArray;
49+
ui32 ChunkStartPosition = 0;
50+
ui32 ChunkFinishPosition = 0;
51+
52+
void InitArrays(const ui32 position) {
53+
AFL_VERIFY(!ChunkAddress || ChunkFinishPosition <= position);
54+
ChunkAddress = CurrentChunkedArray->GetChunk(ChunkAddress, position);
55+
AFL_VERIFY(ChunkAddress);
56+
ChunkStartPosition = CurrentOwnedArray->GetAddress().GetGlobalStartPosition() + ChunkAddress->GetAddress().GetGlobalStartPosition();
57+
ChunkFinishPosition = CurrentOwnedArray->GetAddress().GetGlobalStartPosition() + ChunkAddress->GetAddress().GetGlobalFinishPosition();
58+
}
59+
60+
public:
61+
TPlainChunkCursor(const std::shared_ptr<NArrow::NAccessor::IChunkedArray>& chunked,
62+
const NArrow::NAccessor::IChunkedArray::TFullChunkedArrayAddress* currentOwnedArray)
63+
: CurrentChunkedArray(chunked)
64+
, CurrentOwnedArray(currentOwnedArray)
65+
{
66+
AFL_VERIFY(CurrentChunkedArray);
67+
AFL_VERIFY(CurrentOwnedArray);
68+
InitArrays(CurrentOwnedArray->GetAddress().GetGlobalStartPosition());
69+
}
70+
bool AddIndexTo(const ui32 index, TWriter& writer, const TColumnMergeContext& context);
71+
};
72+
73+
class TSparsedChunkCursor {
4574
private:
46-
std::shared_ptr<NArrow::NAccessor::IChunkedArray> Array;
47-
std::optional<NArrow::NAccessor::IChunkedArray::TFullChunkedArrayAddress> CurrentOwnedArray;
4875
std::shared_ptr<NArrow::NAccessor::TSparsedArray> CurrentSparsedArray;
76+
const NArrow::NAccessor::TSparsedArrayChunk* Chunk = nullptr;
77+
const NArrow::NAccessor::IChunkedArray::TFullChunkedArrayAddress* CurrentOwnedArray;
78+
ui32 ChunkStartGlobalPosition = 0;
4979
ui32 NextGlobalPosition = 0;
5080
ui32 NextLocalPosition = 0;
51-
ui32 CommonShift = 0;
52-
std::optional<NArrow::NAccessor::TSparsedArrayChunk> Chunk;
81+
ui32 FinishGlobalPosition = 0;
82+
void InitArrays(const ui32 position) {
83+
AFL_VERIFY(!Chunk || CurrentOwnedArray->GetAddress().GetGlobalStartPosition() + Chunk->GetFinishPosition() <= position);
84+
Chunk = &CurrentSparsedArray->GetSparsedChunk(CurrentOwnedArray->GetAddress().GetLocalIndex(position));
85+
AFL_VERIFY(Chunk->GetRecordsCount());
86+
AFL_VERIFY(CurrentOwnedArray->GetAddress().GetGlobalStartPosition() + Chunk->GetStartPosition() <= position &&
87+
position < CurrentOwnedArray->GetAddress().GetGlobalStartPosition() + Chunk->GetFinishPosition())
88+
("pos", position)("start", Chunk->GetStartPosition())("finish", Chunk->GetFinishPosition())(
89+
"shift", CurrentOwnedArray->GetAddress().GetGlobalStartPosition());
90+
ChunkStartGlobalPosition = CurrentOwnedArray->GetAddress().GetGlobalStartPosition() + Chunk->GetStartPosition();
91+
NextGlobalPosition = CurrentOwnedArray->GetAddress().GetGlobalStartPosition() + Chunk->GetFirstIndexNotDefault();
92+
NextLocalPosition = 0;
93+
FinishGlobalPosition = CurrentOwnedArray->GetAddress().GetGlobalStartPosition() + Chunk->GetFinishPosition();
94+
}
95+
public:
96+
bool AddIndexTo(const ui32 index, TWriter& writer, const TColumnMergeContext& context);
97+
TSparsedChunkCursor(const std::shared_ptr<NArrow::NAccessor::TSparsedArray>& sparsed,
98+
const NArrow::NAccessor::IChunkedArray::TFullChunkedArrayAddress* currentOwnedArray)
99+
: CurrentSparsedArray(sparsed)
100+
, CurrentOwnedArray(currentOwnedArray) {
101+
AFL_VERIFY(sparsed);
102+
AFL_VERIFY(currentOwnedArray);
103+
InitArrays(CurrentOwnedArray->GetAddress().GetGlobalStartPosition());
104+
}
105+
};
106+
107+
class TCursor {
108+
private:
109+
std::shared_ptr<NArrow::NAccessor::IChunkedArray> Array;
110+
std::optional<NArrow::NAccessor::IChunkedArray::TFullChunkedArrayAddress> CurrentOwnedArray;
111+
std::shared_ptr<TSparsedChunkCursor> SparsedCursor;
112+
std::shared_ptr<TPlainChunkCursor> PlainCursor;
113+
ui32 FinishGlobalPosition = 0;
53114
const TColumnMergeContext& Context;
54115
void InitArrays(const ui32 position);
55116

0 commit comments

Comments
 (0)