Skip to content

not nulls information provide into arrow::array for correct kernels c… #15060

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions ydb/core/formats/arrow/accessor/common/chunk_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,27 @@
namespace NKikimr::NArrow::NAccessor {

TChunkConstructionData::TChunkConstructionData(const ui32 recordsCount, const std::shared_ptr<arrow::Scalar>& defaultValue,
const std::shared_ptr<arrow::DataType>& columnType, const std::shared_ptr<NSerialization::ISerializer>& defaultSerializer)
const std::shared_ptr<arrow::DataType>& columnType, const std::shared_ptr<NSerialization::ISerializer>& defaultSerializer,
const std::optional<ui32>& notNullRecordsCount)
: RecordsCount(recordsCount)
, NotNullRecordsCount(notNullRecordsCount)
, DefaultValue(defaultValue)
, ColumnType(columnType)
, DefaultSerializer(defaultSerializer) {
AFL_VERIFY(ColumnType);
AFL_VERIFY(RecordsCount);
AFL_VERIFY(!NotNullRecordsCount || *NotNullRecordsCount <= RecordsCount)("records", RecordsCount)("not_null", NotNullRecordsCount);
AFL_VERIFY(!!DefaultSerializer);
}

TChunkConstructionData TChunkConstructionData::GetSubset(const ui32 recordsCount) const {
TChunkConstructionData TChunkConstructionData::GetSubset(const ui32 recordsCount, const std::optional<ui32>& notNullRecordsCount) const {
AFL_VERIFY(recordsCount <= RecordsCount)("sub", recordsCount)("global", RecordsCount);
return TChunkConstructionData(recordsCount, DefaultValue, ColumnType, DefaultSerializer);
return TChunkConstructionData(recordsCount, DefaultValue, ColumnType, DefaultSerializer, notNullRecordsCount);
}

ui32 TChunkConstructionData::GetNullRecordsCountVerified() const {
AFL_VERIFY(NotNullRecordsCount);
return RecordsCount - *NotNullRecordsCount;
}

} // namespace NKikimr::NArrow::NAccessor
11 changes: 9 additions & 2 deletions ydb/core/formats/arrow/accessor/common/chunk_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,22 @@ namespace NKikimr::NArrow::NAccessor {
class TChunkConstructionData {
private:
YDB_READONLY(ui32, RecordsCount, 0);
YDB_READONLY_DEF(std::optional<ui32>, NotNullRecordsCount);
YDB_READONLY_DEF(std::shared_ptr<arrow::Scalar>, DefaultValue);
YDB_READONLY_DEF(std::shared_ptr<arrow::DataType>, ColumnType);
YDB_READONLY_DEF(std::shared_ptr<NSerialization::ISerializer>, DefaultSerializer);

public:
TChunkConstructionData(const ui32 recordsCount, const std::shared_ptr<arrow::Scalar>& defaultValue,
const std::shared_ptr<arrow::DataType>& columnType, const std::shared_ptr<NSerialization::ISerializer>& defaultSerializer);
const std::shared_ptr<arrow::DataType>& columnType, const std::shared_ptr<NSerialization::ISerializer>& defaultSerializer,
const std::optional<ui32>& notNullRecordsCount = std::nullopt);

TChunkConstructionData GetSubset(const ui32 recordsCount) const;
TChunkConstructionData GetSubset(const ui32 recordsCount, const std::optional<ui32>& notNullRecordsCount = std::nullopt) const;

bool HasNullRecordsCount() const {
return !!NotNullRecordsCount;
}
ui32 GetNullRecordsCountVerified() const;
};

} // namespace NKikimr::NArrow::NAccessor
9 changes: 9 additions & 0 deletions ydb/core/formats/arrow/accessor/plain/accessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ class TTrivialArray: public IChunkedArray {
virtual ui32 DoGetValueRawBytes() const override;

public:
virtual std::shared_ptr<arrow::ChunkedArray> GetChunkedArray() const override {
return std::make_shared<arrow::ChunkedArray>(Array);
}

const std::shared_ptr<arrow::Array>& GetArray() const {
return Array;
}
Expand Down Expand Up @@ -105,6 +109,7 @@ class TTrivialChunkedArray: public IChunkedArray {
}
virtual TLocalDataAddress DoGetLocalData(const std::optional<TCommonChunkAddress>& chunkCurrent, const ui64 position) const override;
virtual std::optional<ui64> DoGetRawSize() const override;

virtual std::shared_ptr<arrow::Scalar> DoGetScalar(const ui32 index) const override {
auto chunk = GetChunkSlow(index);
return NArrow::TStatusValidator::GetValid(chunk.GetArray()->GetScalar(chunk.GetAddress().GetLocalIndex(index)));
Expand All @@ -116,6 +121,10 @@ class TTrivialChunkedArray: public IChunkedArray {
virtual std::shared_ptr<arrow::Scalar> DoGetMaxScalar() const override;

public:
virtual std::shared_ptr<arrow::ChunkedArray> GetChunkedArray() const override {
return Array;
}

TTrivialChunkedArray(const std::shared_ptr<arrow::ChunkedArray>& data)
: TBase(data->length(), EType::ChunkedArray, data->type())
, Array(data) {
Expand Down
10 changes: 6 additions & 4 deletions ydb/core/formats/arrow/accessor/plain/constructor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@ TConclusion<std::shared_ptr<IChunkedArray>> TConstructor::DoDeserializeFromStrin
auto result = externalInfo.GetDefaultSerializer()->Deserialize(originalData, schema);
if (!result.ok()) {
return TConclusionStatus::Fail(result.status().ToString());
} else {
auto rb = TStatusValidator::GetValid(result);
AFL_VERIFY(rb->num_columns() == 1)("count", rb->num_columns())("schema", schema->ToString());
return std::make_shared<NArrow::NAccessor::TTrivialArray>(rb->column(0));
}
auto rb = TStatusValidator::GetValid(result);
AFL_VERIFY(rb->num_columns() == 1)("count", rb->num_columns())("schema", schema->ToString());
if (externalInfo.HasNullRecordsCount()) {
rb->column(0)->data()->SetNullCount(externalInfo.GetNullRecordsCountVerified());
}
return std::make_shared<NArrow::NAccessor::TTrivialArray>(rb->column(0));
}

TConclusion<std::shared_ptr<IChunkedArray>> TConstructor::DoConstructDefault(const TChunkConstructionData& externalInfo) const {
Expand Down
14 changes: 8 additions & 6 deletions ydb/core/formats/arrow/save_load/loader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,18 @@ const std::shared_ptr<arrow::Field>& TColumnLoader::GetField() const {
return ResultField;
}

TChunkConstructionData TColumnLoader::BuildAccessorContext(const ui32 recordsCount) const {
return TChunkConstructionData(recordsCount, DefaultValue, ResultField->type(), Serializer.GetObjectPtr());
TChunkConstructionData TColumnLoader::BuildAccessorContext(
const ui32 recordsCount, const std::optional<ui32>& notNullCount) const {
return TChunkConstructionData(recordsCount, DefaultValue, ResultField->type(), Serializer.GetObjectPtr(), notNullCount);
}

TConclusion<std::shared_ptr<IChunkedArray>> TColumnLoader::ApplyConclusion(const TString& dataStr, const ui32 recordsCount) const {
return BuildAccessor(dataStr, BuildAccessorContext(recordsCount));
TConclusion<std::shared_ptr<IChunkedArray>> TColumnLoader::ApplyConclusion(
const TString& dataStr, const ui32 recordsCount, const std::optional<ui32>& notNullCount) const {
return BuildAccessor(dataStr, BuildAccessorContext(recordsCount, notNullCount));
}

std::shared_ptr<IChunkedArray> TColumnLoader::ApplyVerified(const TString& dataStr, const ui32 recordsCount) const {
return BuildAccessor(dataStr, BuildAccessorContext(recordsCount)).DetachResult();
std::shared_ptr<IChunkedArray> TColumnLoader::ApplyVerified(const TString& dataStr, const ui32 recordsCount, const std::optional<ui32>& notNullCount) const {
return BuildAccessor(dataStr, BuildAccessorContext(recordsCount, notNullCount)).DetachResult();
}

TConclusion<std::shared_ptr<IChunkedArray>> TColumnLoader::BuildAccessor(
Expand Down
8 changes: 5 additions & 3 deletions ydb/core/formats/arrow/save_load/loader.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,11 @@ class TColumnLoader {

const std::shared_ptr<arrow::Field>& GetField() const;

TChunkConstructionData BuildAccessorContext(const ui32 recordsCount) const;
std::shared_ptr<IChunkedArray> ApplyVerified(const TString& data, const ui32 expectedRecordsCount) const;
TConclusion<std::shared_ptr<IChunkedArray>> ApplyConclusion(const TString& data, const ui32 expectedRecordsCount) const;
TChunkConstructionData BuildAccessorContext(const ui32 recordsCount, const std::optional<ui32>& notNullCount = std::nullopt) const;
std::shared_ptr<IChunkedArray> ApplyVerified(
const TString& data, const ui32 expectedRecordsCount, const std::optional<ui32>& notNullCount = std::nullopt) const;
TConclusion<std::shared_ptr<IChunkedArray>> ApplyConclusion(
const TString& data, const ui32 expectedRecordsCount, const std::optional<ui32>& notNullCount = std::nullopt) const;
};

} // namespace NKikimr::NArrow::NAccessor
Loading