Skip to content

customization fetching by columns #14927

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
28405e7
customization fetching by columns
ivanmorozov333 Feb 22, 2025
5740933
fix
ivanmorozov333 Feb 22, 2025
9a51781
fetch kernels control
ivanmorozov333 Feb 23, 2025
58a5ecd
fixes
ivanmorozov333 Feb 23, 2025
68b18f9
fixes
ivanmorozov333 Feb 23, 2025
0409023
fixes
ivanmorozov333 Feb 23, 2025
0a9cd06
fixes
ivanmorozov333 Feb 23, 2025
3e97fa6
correction
ivanmorozov333 Feb 23, 2025
c225f5c
restore
ivanmorozov333 Feb 23, 2025
938b21f
fix
ivanmorozov333 Feb 23, 2025
3a2c6a5
tests and fixes
ivanmorozov333 Feb 24, 2025
af3e61a
corrections
ivanmorozov333 Feb 24, 2025
e882530
clean logging
ivanmorozov333 Feb 24, 2025
ea7d6de
corrections
ivanmorozov333 Feb 24, 2025
8ea3ce6
fixes
ivanmorozov333 Feb 24, 2025
c68f70b
fix test
ivanmorozov333 Feb 24, 2025
69c8af5
fixes
ivanmorozov333 Feb 24, 2025
6c5202d
unify columns separation logic
ivanmorozov333 Feb 24, 2025
864834d
fix
ivanmorozov333 Feb 24, 2025
dd85f9b
fix
ivanmorozov333 Feb 24, 2025
9b2830b
corrections
ivanmorozov333 Feb 24, 2025
7b9bf2e
fixes
ivanmorozov333 Feb 24, 2025
4365873
fix
ivanmorozov333 Feb 24, 2025
44ebfe4
fix detector
ivanmorozov333 Feb 24, 2025
8526730
test and fix
ivanmorozov333 Feb 24, 2025
19ed2f9
add test. fix it.
ivanmorozov333 Feb 24, 2025
0efd572
remove danger default value
ivanmorozov333 Feb 24, 2025
defe904
fixes
ivanmorozov333 Feb 24, 2025
38a5985
fix
ivanmorozov333 Feb 24, 2025
b27efd9
speed up
ivanmorozov333 Feb 24, 2025
fecda94
speed up filtering (dont apply filter to self-filter columns)
ivanmorozov333 Feb 24, 2025
b2abc52
fix
ivanmorozov333 Feb 24, 2025
2c76642
fix
ivanmorozov333 Feb 24, 2025
a1d4170
add test json exists
ivanmorozov333 Feb 24, 2025
d84d302
add counters and remove deprecated fake objects
ivanmorozov333 Feb 24, 2025
f3d08d4
reduce logging
ivanmorozov333 Feb 24, 2025
b34e5f7
fix tests
ivanmorozov333 Feb 25, 2025
6905872
add test. fix
ivanmorozov333 Feb 25, 2025
f98b292
fix
ivanmorozov333 Feb 25, 2025
1966ab3
tests. fixes
ivanmorozov333 Feb 25, 2025
df857f9
test and fixes
ivanmorozov333 Feb 25, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion ydb/core/formats/arrow/accessor/abstract/accessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,13 +120,17 @@ std::shared_ptr<IChunkedArray> IChunkedArray::DoApplyFilter(const TColumnFilter&
}

std::shared_ptr<IChunkedArray> IChunkedArray::ApplyFilter(const TColumnFilter& filter, const std::shared_ptr<IChunkedArray>& selfPtr) const {
AFL_VERIFY(selfPtr);
if (filter.IsTotalAllowFilter()) {
return selfPtr;
}
if (filter.IsTotalDenyFilter()) {
return TTrivialArray::BuildEmpty(GetDataType());
}
return DoApplyFilter(filter);
auto result = DoApplyFilter(filter);
AFL_VERIFY(result);
AFL_VERIFY(result->GetRecordsCount() == filter.GetFilteredCountVerified());
return result;
}

TString IChunkedArray::TReader::DebugString(const ui32 position) const {
Expand Down
24 changes: 22 additions & 2 deletions ydb/core/formats/arrow/accessor/abstract/accessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ class IChunkedArray {
SerializedChunkedArray,
CompositeChunkedArray,
SparsedArray,
SubColumnsArray
SubColumnsArray,
SubColumnsPartialArray
};

class TCommonChunkAddress {
Expand Down Expand Up @@ -322,6 +323,21 @@ class IChunkedArray {
public:
std::shared_ptr<IChunkedArray> ApplyFilter(const TColumnFilter& filter, const std::shared_ptr<IChunkedArray>& selfPtr) const;

template <class TResult, class TActor>
static std::optional<TResult> VisitDataOwners(const std::shared_ptr<IChunkedArray>& arr, const TActor& actor) {
AFL_VERIFY(arr);
std::optional<IChunkedArray::TFullChunkedArrayAddress> arrCurrent;
for (ui32 currentIndex = 0; currentIndex < arr->GetRecordsCount();) {
arrCurrent = arr->GetArray(arrCurrent, currentIndex, arr);
auto result = actor(arrCurrent->GetArray());
if (!!result) {
return result;
}
currentIndex = currentIndex + arrCurrent->GetArray()->GetRecordsCount();
}
return std::nullopt;
}

NJson::TJsonValue DebugJson() const {
NJson::TJsonValue result = NJson::JSON_MAP;
result.InsertValue("type", ::ToString(Type));
Expand Down Expand Up @@ -415,14 +431,18 @@ class IChunkedArray {
std::shared_ptr<arrow::ChunkedArray> Slice(const ui32 offset, const ui32 count) const;
std::shared_ptr<IChunkedArray> ISlice(const ui32 offset, const ui32 count) const {
AFL_VERIFY(offset + count <= GetRecordsCount())("offset", offset)("count", count)("records", GetRecordsCount());
return DoISlice(offset, count);
auto result = DoISlice(offset, count);
AFL_VERIFY(result);
AFL_VERIFY(result->GetRecordsCount() == count)("records", result->GetRecordsCount())("count", count);
return result;
}

bool IsDataOwner() const {
switch (Type) {
case EType::SparsedArray:
case EType::ChunkedArray:
case EType::SubColumnsArray:
case EType::SubColumnsPartialArray:
case EType::Array:
return true;
case EType::Undefined:
Expand Down
5 changes: 5 additions & 0 deletions ydb/core/formats/arrow/accessor/common/chunk_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,9 @@ TChunkConstructionData::TChunkConstructionData(const ui32 recordsCount, const st
AFL_VERIFY(!!DefaultSerializer);
}

TChunkConstructionData TChunkConstructionData::GetSubset(const ui32 recordsCount) const {
AFL_VERIFY(recordsCount <= RecordsCount)("sub", recordsCount)("global", RecordsCount);
return TChunkConstructionData(recordsCount, DefaultValue, ColumnType, DefaultSerializer);
}

} // namespace NKikimr::NArrow::NAccessor
2 changes: 2 additions & 0 deletions ydb/core/formats/arrow/accessor/common/chunk_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ class TChunkConstructionData {
public:
TChunkConstructionData(const ui32 recordsCount, const std::shared_ptr<arrow::Scalar>& defaultValue,
const std::shared_ptr<arrow::DataType>& columnType, const std::shared_ptr<NSerialization::ISerializer>& defaultSerializer);

TChunkConstructionData GetSubset(const ui32 recordsCount) const;
};

} // namespace NKikimr::NArrow::NAccessor
46 changes: 32 additions & 14 deletions ydb/core/formats/arrow/accessor/composite/accessor.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
#include "accessor.h"

#include <ydb/core/formats/arrow/arrow_filter.h>
namespace NKikimr::NArrow::NAccessor {

namespace {
Expand Down Expand Up @@ -29,33 +31,49 @@ class TCompositeChunkAccessor {
} // namespace

std::shared_ptr<IChunkedArray> ICompositeChunkedArray::DoISlice(const ui32 offset, const ui32 count) const {
ui32 slicedRecordsCount = 0;
ui32 currentIndex = offset;
ui32 currentIndex = 0;
std::optional<IChunkedArray::TFullChunkedArrayAddress> arrAddress;
std::vector<std::shared_ptr<IChunkedArray>> chunks;
while (slicedRecordsCount < count && currentIndex < GetRecordsCount()) {
while (currentIndex < offset + count) {
arrAddress = GetArray(arrAddress, currentIndex, nullptr);
const ui32 localIndex = arrAddress->GetAddress().GetLocalIndex(currentIndex);
const ui32 localCount = (arrAddress->GetArray()->GetRecordsCount() + slicedRecordsCount < count)
? arrAddress->GetArray()->GetRecordsCount()
: (count - slicedRecordsCount);

if (localIndex == 0 && localCount == arrAddress->GetArray()->GetRecordsCount()) {
chunks.emplace_back(arrAddress->GetArray());
const auto& arr = arrAddress->GetArray();
if (currentIndex + arr->GetRecordsCount() < offset) {
} else if (currentIndex >= offset && currentIndex + arr->GetRecordsCount() <= offset + count) {
chunks.emplace_back(arr);
} else {
chunks.emplace_back(arrAddress->GetArray()->ISlice(localIndex, localCount));
const ui32 localStart = std::max<ui32>(offset, currentIndex);
const ui32 localFinish = std::min<ui32>(offset + count, currentIndex + arr->GetRecordsCount());
AFL_VERIFY(localStart < localFinish)("start", localStart)("finish", localFinish);
chunks.emplace_back(arrAddress->GetArray()->ISlice(localStart - currentIndex, localFinish - localStart));
}
slicedRecordsCount += localCount;
currentIndex += localCount;
currentIndex += arr->GetRecordsCount();
}
AFL_VERIFY(slicedRecordsCount == count)("sliced", slicedRecordsCount)("count", count);
if (chunks.size() == 1) {
return chunks.front();
} else {
return std::make_shared<TCompositeChunkedArray>(std::move(chunks), count, GetDataType());
}
}

std::shared_ptr<IChunkedArray> ICompositeChunkedArray::DoApplyFilter(const TColumnFilter& filter) const {
std::optional<IChunkedArray::TFullChunkedArrayAddress> arrAddress;
std::vector<std::shared_ptr<IChunkedArray>> chunks;
ui32 currentIndex = 0;
while (currentIndex < GetRecordsCount()) {
arrAddress = GetArray(arrAddress, currentIndex, nullptr);
if (filter.CheckSlice(currentIndex, arrAddress->GetArray()->GetRecordsCount())) {
auto sliceFilter = filter.Slice(currentIndex, arrAddress->GetArray()->GetRecordsCount());
chunks.emplace_back(sliceFilter.Apply(arrAddress->GetArray()));
}
currentIndex += arrAddress->GetArray()->GetRecordsCount();
}
if (chunks.size() == 1) {
return chunks.front();
} else {
return std::make_shared<TCompositeChunkedArray>(std::move(chunks), filter.GetFilteredCountVerified(), GetDataType());
}
}

IChunkedArray::TLocalDataAddress TCompositeChunkedArray::DoGetLocalData(
const std::optional<TCommonChunkAddress>& /*chunkCurrent*/, const ui64 /*position*/) const {
AFL_VERIFY(false);
Expand Down
47 changes: 46 additions & 1 deletion ydb/core/formats/arrow/accessor/composite/accessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ class ICompositeChunkedArray: public NArrow::NAccessor::IChunkedArray {
private:
using TBase = NArrow::NAccessor::IChunkedArray;
virtual std::shared_ptr<IChunkedArray> DoISlice(const ui32 offset, const ui32 count) const override final;
virtual std::shared_ptr<IChunkedArray> DoApplyFilter(const TColumnFilter& filter) const override;

public:
using TBase::TBase;
Expand Down Expand Up @@ -54,6 +55,47 @@ class TCompositeChunkedArray: public ICompositeChunkedArray {
, Chunks(std::move(chunks)) {
}

class TIterator: TNonCopyable {
private:
const std::shared_ptr<TCompositeChunkedArray> Owner;
ui32 RecordIndex = 0;
std::optional<TFullChunkedArrayAddress> CurrentChunk;

public:
TIterator(const std::shared_ptr<TCompositeChunkedArray>& owner)
: Owner(owner) {
if (Owner->GetRecordsCount()) {
CurrentChunk = Owner->GetArray(CurrentChunk, RecordIndex, Owner);
}
}

const std::shared_ptr<IChunkedArray>& GetArray() const {
AFL_VERIFY(CurrentChunk);
return CurrentChunk->GetArray();
}

bool IsValid() {
return RecordIndex < Owner->GetRecordsCount();
}

bool Next() {
AFL_VERIFY(IsValid());
AFL_VERIFY(CurrentChunk);
RecordIndex += CurrentChunk->GetArray()->GetRecordsCount();
AFL_VERIFY(RecordIndex <= Owner->GetRecordsCount());
if (IsValid()) {
CurrentChunk = Owner->GetArray(CurrentChunk, RecordIndex, Owner);
return true;
} else {
return false;
}
}
};

static TIterator BuildIterator(std::shared_ptr<TCompositeChunkedArray>& owner) {
return TIterator(owner);
}

class TBuilder {
private:
ui32 RecordsCount = 0;
Expand All @@ -74,9 +116,12 @@ class TCompositeChunkedArray: public ICompositeChunkedArray {
RecordsCount += arr->GetRecordsCount();
}

std::shared_ptr<TCompositeChunkedArray> Finish() {
std::shared_ptr<IChunkedArray> Finish() {
AFL_VERIFY(!Finished);
Finished = true;
if (Chunks.size() == 1) {
return Chunks.front();
}
return std::shared_ptr<TCompositeChunkedArray>(new TCompositeChunkedArray(std::move(Chunks), RecordsCount, Type));
}
};
Expand Down
102 changes: 102 additions & 0 deletions ydb/core/formats/arrow/accessor/composite/ut/ut_composite.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
#include <ydb/core/formats/arrow/accessor/composite/accessor.h>
#include <ydb/core/formats/arrow/accessor/plain/accessor.h>
#include <ydb/core/formats/arrow/arrow_filter.h>

#include <library/cpp/testing/unittest/registar.h>

#include <regex>

Y_UNIT_TEST_SUITE(CompositeArrayAccessor) {
using namespace NKikimr::NArrow::NAccessor;
using namespace NKikimr::NArrow;

std::string PrepareToCompare(const std::string& str) {
return std::regex_replace(str, std::regex(" |\\n"), "");
}

static std::shared_ptr<IChunkedArray> BuildCompositeArray() {
TCompositeChunkedArray::TBuilder builder(arrow::utf8());
{
TTrivialArray::TPlainBuilder arrBuilder;
arrBuilder.AddRecord(1, "a1");
arrBuilder.AddRecord(3, "a3");
arrBuilder.AddRecord(7, "a7");
builder.AddChunk(arrBuilder.Finish(10));
}
{
TTrivialArray::TPlainBuilder arrBuilder;
arrBuilder.AddRecord(1, "b1");
arrBuilder.AddRecord(2, "b2");
arrBuilder.AddRecord(3, "b3");
builder.AddChunk(arrBuilder.Finish(5));
}
{
TTrivialArray::TPlainBuilder arrBuilder;
arrBuilder.AddRecord(0, "c0");
arrBuilder.AddRecord(2, "c2");
arrBuilder.AddRecord(3, "c3");
builder.AddChunk(arrBuilder.Finish(4));
}
return builder.Finish();
}

Y_UNIT_TEST(SlicesSimple) {
auto arr = BuildCompositeArray();
Cerr << PrepareToCompare(arr->GetChunkedArray()->ToString()) << Endl;
{
auto slice = arr->ISlice(0, 10);
const TString arrString = PrepareToCompare(slice->GetChunkedArray()->ToString());
AFL_VERIFY(arrString == R"([[null,"a1",null,"a3",null,null,null,"a7",null,null]])")("string", arrString);
}
{
auto slice = arr->ISlice(3, 8);
const TString arrString = PrepareToCompare(slice->GetChunkedArray()->ToString());
AFL_VERIFY(arrString == R"([["a3",null,null,null,"a7",null,null],[null]])")("string", arrString);
}
{
auto slice = arr->ISlice(3, 16);
const TString arrString = PrepareToCompare(slice->GetChunkedArray()->ToString());
AFL_VERIFY(arrString == R"([["a3",null,null,null,"a7",null,null],[null,"b1","b2","b3",null],["c0",null,"c2","c3"]])")("string", arrString);
}
{
auto slice = arr->ISlice(8, 11);
const TString arrString = PrepareToCompare(slice->GetChunkedArray()->ToString());
AFL_VERIFY(arrString == R"([[null,null],[null,"b1","b2","b3",null],["c0",null,"c2","c3"]])")("string", arrString);
}
{
auto slice = arr->ISlice(8, 2);
const TString arrString = PrepareToCompare(slice->GetChunkedArray()->ToString());
AFL_VERIFY(arrString == R"([[null,null]])")("string", arrString);
}
{
auto slice = arr->ISlice(9, 3);
const TString arrString = PrepareToCompare(slice->GetChunkedArray()->ToString());
AFL_VERIFY(arrString == R"([[null],[null,"b1"]])")("string", arrString);
}
}

Y_UNIT_TEST(FilterSimple) {
auto arr = BuildCompositeArray();
{
const TColumnFilter filter = TColumnFilter::BuildConstFilter(true, { 1, 2, 3, 4, 5, 4 });

auto filtered = arr->ApplyFilter(filter, arr);
const TString arrString = PrepareToCompare(filtered->GetChunkedArray()->ToString());
AFL_VERIFY(arrString == R"([[null,"a3",null,null],[null,"b1","b2","b3",null]])")("string", arrString);
}
{
const TColumnFilter filter = TColumnFilter::BuildConstFilter(false, { 1, 2, 3, 4, 5, 4 });

auto filtered = arr->ApplyFilter(filter, arr);
const TString arrString = PrepareToCompare(filtered->GetChunkedArray()->ToString());
AFL_VERIFY(arrString == R"([["a1",null,null,"a7",null,null],["c0",null,"c2","c3"]])")("string", arrString);
}
{
const TColumnFilter filter = TColumnFilter::BuildConstFilter(false, { 3, 1, 3, 1, 3, 1, 3, 1, 3 });

auto filtered = arr->ApplyFilter(filter, arr);
const TString arrString = PrepareToCompare(filtered->GetChunkedArray()->ToString());
AFL_VERIFY(arrString == R"([["a3","a7"],["b1"],["c0"]])")("string", arrString);
}
}
};
19 changes: 19 additions & 0 deletions ydb/core/formats/arrow/accessor/composite/ut/ya.make
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
UNITTEST_FOR(ydb/core/formats/arrow/accessor/sparsed)

SIZE(SMALL)

PEERDIR(
ydb/core/formats/arrow/accessor/sparsed
ydb/core/formats/arrow/accessor/plain
ydb/core/formats/arrow
yql/essentials/public/udf/service/stub
ydb/core/formats/arrow
)

YQL_LAST_ABI_VERSION()

SRCS(
ut_composite.cpp
)

END()
4 changes: 4 additions & 0 deletions ydb/core/formats/arrow/accessor/composite/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,7 @@ SRCS(
)

END()

RECURSE_FOR_TESTS(
ut
)
8 changes: 8 additions & 0 deletions ydb/core/formats/arrow/accessor/sub_columns/accessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,14 @@ class TSubColumnsArray: public IChunkedArray {
virtual std::shared_ptr<arrow::Scalar> DoGetScalar(const ui32 /*index*/) const override {
return nullptr;
}

std::shared_ptr<IChunkedArray> GetPathAccessor(const std::string_view svPath, const ui32 recordsCount) const {
auto accResult = ColumnsData.GetPathAccessor(svPath);
if (accResult) {
return accResult;
}
return OthersData.GetPathAccessor(svPath, recordsCount);
}
};

} // namespace NKikimr::NArrow::NAccessor
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ TColumnsData TColumnsData::Slice(const ui32 offset, const ui32 count) const {
}
++idx;
}
records.DeleteFieldsByIndex(indexesToRemove);
return TColumnsData(builder.Finish(), std::make_shared<TGeneralContainer>(std::move(records)));

} else {
Expand All @@ -26,6 +27,9 @@ TColumnsData TColumnsData::Slice(const ui32 offset, const ui32 count) const {
}

TColumnsData TColumnsData::ApplyFilter(const TColumnFilter& filter) const {
if (!Stats.GetColumnsCount()) {
return *this;
}
auto records = Records;
AFL_VERIFY(filter.Apply(records));
if (records->GetRecordsCount()) {
Expand Down
Loading
Loading