Skip to content
This repository was archived by the owner on May 10, 2024. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 37 additions & 3 deletions src/parquet/arrow/arrow-reader-writer-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "parquet/arrow/test-util.h"
#include "parquet/arrow/writer.h"

#include "arrow/io/memory.h"
#include "arrow/test-util.h"
#include "arrow/types/construct.h"
#include "arrow/types/primitive.h"
Expand Down Expand Up @@ -342,6 +343,29 @@ TYPED_TEST(TestParquetIO, SingleColumnTableRequiredChunkedWrite) {
this->ReadAndCheckSingleColumnTable(values);
}

TYPED_TEST(TestParquetIO, SingleColumnTableRequiredChunkedWriteArrowIO) {
std::shared_ptr<Array> values;
ASSERT_OK(NonNullArray<TypeParam>(LARGE_SIZE, &values));
std::shared_ptr<Table> table = MakeSimpleTable(values, false);
this->sink_ = std::make_shared<InMemoryOutputStream>();
auto buffer = std::make_shared<::arrow::PoolBuffer>();
auto arrow_sink_ = std::make_shared<::arrow::io::BufferOutputStream>(buffer);
ASSERT_OK_NO_THROW(WriteFlatTable(
table.get(), default_memory_pool(), arrow_sink_, 512, default_writer_properties()));

std::shared_ptr<ParquetBuffer> pbuffer =
std::make_shared<ParquetBuffer>(buffer->data(), buffer->size());
std::unique_ptr<RandomAccessSource> source(new BufferReader(pbuffer));
std::shared_ptr<::arrow::Table> out;
this->ReadTableFromFile(ParquetFileReader::Open(std::move(source)), &out);
ASSERT_EQ(1, out->num_columns());
ASSERT_EQ(values->length(), out->num_rows());

std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data();
ASSERT_EQ(1, chunked_array->num_chunks());
ASSERT_TRUE(values->Equals(chunked_array->chunk(0)));
}

TYPED_TEST(TestParquetIO, SingleColumnOptionalChunkedWrite) {
int64_t chunk_size = SMALL_SIZE / 4;
std::shared_ptr<Array> values;
Expand Down Expand Up @@ -456,10 +480,20 @@ TEST_F(TestStringParquetIO, EmptyStringColumnRequiredWrite) {
template <typename T>
using ParquetCDataType = typename ParquetDataType<T>::c_type;

template <typename T>
struct c_type_trait {
using ArrowCType = typename T::c_type;
};

template <>
struct c_type_trait<::arrow::BooleanType> {
using ArrowCType = uint8_t;
};

template <typename TestType>
class TestPrimitiveParquetIO : public TestParquetIO<TestType> {
public:
typedef typename TestType::c_type T;
typedef typename c_type_trait<TestType>::ArrowCType T;

void MakeTestFile(std::vector<T>& values, int num_chunks,
std::unique_ptr<ParquetFileReader>* file_reader) {
Expand Down Expand Up @@ -497,7 +531,7 @@ class TestPrimitiveParquetIO : public TestParquetIO<TestType> {

std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data();
ASSERT_EQ(1, chunked_array->num_chunks());
ExpectArray<TestType>(values.data(), chunked_array->chunk(0).get());
ExpectArrayT<TestType>(values.data(), chunked_array->chunk(0).get());
}

void CheckSingleColumnRequiredRead(int num_chunks) {
Expand All @@ -508,7 +542,7 @@ class TestPrimitiveParquetIO : public TestParquetIO<TestType> {
std::shared_ptr<Array> out;
this->ReadSingleColumnFile(std::move(file_reader), &out);

ExpectArray<TestType>(values.data(), out.get());
ExpectArrayT<TestType>(values.data(), out.get());
}
};

Expand Down
20 changes: 20 additions & 0 deletions src/parquet/arrow/io.cc
Original file line number Diff line number Diff line change
Expand Up @@ -103,5 +103,25 @@ std::shared_ptr<Buffer> ParquetReadSource::Read(int64_t nbytes) {
return result;
}

ParquetWriteSink::ParquetWriteSink(
const std::shared_ptr<::arrow::io::OutputStream>& stream)
: stream_(stream) {}

ParquetWriteSink::~ParquetWriteSink() {}

void ParquetWriteSink::Close() {
PARQUET_THROW_NOT_OK(stream_->Close());
}

int64_t ParquetWriteSink::Tell() {
int64_t position;
PARQUET_THROW_NOT_OK(stream_->Tell(&position));
return position;
}

void ParquetWriteSink::Write(const uint8_t* data, int64_t length) {
PARQUET_THROW_NOT_OK(stream_->Write(data, length));
}

} // namespace arrow
} // namespace parquet
19 changes: 19 additions & 0 deletions src/parquet/arrow/io.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,25 @@ class PARQUET_EXPORT ParquetReadSource : public RandomAccessSource {
ParquetAllocator* allocator_;
};

class PARQUET_EXPORT ParquetWriteSink : public OutputStream {
public:
explicit ParquetWriteSink(const std::shared_ptr<::arrow::io::OutputStream>& stream);

virtual ~ParquetWriteSink();

// Close the output stream
void Close() override;

// Return the current position in the output stream relative to the start
int64_t Tell() override;

// Copy bytes into the output stream
void Write(const uint8_t* data, int64_t length) override;

private:
std::shared_ptr<::arrow::io::OutputStream> stream_;
};

} // namespace arrow
} // namespace parquet

Expand Down
73 changes: 73 additions & 0 deletions src/parquet/arrow/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,79 @@ Status FlatColumnReader::Impl::TypedReadBatch(
}
}

template <>
Status FlatColumnReader::Impl::TypedReadBatch<::arrow::BooleanType, BooleanType>(
int batch_size, std::shared_ptr<Array>* out) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is all because the c_type trait disappeared, or something else?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that are all needed changes.

int values_to_read = batch_size;
RETURN_NOT_OK(InitDataBuffer<::arrow::BooleanType>(batch_size));
valid_bits_idx_ = 0;
if (descr_->max_definition_level() > 0) {
valid_bits_buffer_ = std::make_shared<PoolBuffer>(pool_);
int valid_bits_size = ::arrow::BitUtil::CeilByte(batch_size) / 8;
valid_bits_buffer_->Resize(valid_bits_size);
valid_bits_ptr_ = valid_bits_buffer_->mutable_data();
memset(valid_bits_ptr_, 0, valid_bits_size);
null_count_ = 0;
}

while ((values_to_read > 0) && column_reader_) {
values_buffer_.Resize(values_to_read * sizeof(bool));
if (descr_->max_definition_level() > 0) {
def_levels_buffer_.Resize(values_to_read * sizeof(int16_t));
}
auto reader = dynamic_cast<TypedColumnReader<BooleanType>*>(column_reader_.get());
int64_t values_read;
int64_t levels_read;
int16_t* def_levels = reinterpret_cast<int16_t*>(def_levels_buffer_.mutable_data());
auto values = reinterpret_cast<bool*>(values_buffer_.mutable_data());
PARQUET_CATCH_NOT_OK(levels_read = reader->ReadBatch(
values_to_read, def_levels, nullptr, values, &values_read));
values_to_read -= levels_read;
if (descr_->max_definition_level() == 0) {
ReadNonNullableBatch<::arrow::BooleanType, BooleanType>(values, values_read);
} else {
// As per the defintion and checks for flat columns:
// descr_->max_definition_level() == 1
ReadNullableFlatBatch<::arrow::BooleanType, BooleanType>(
def_levels, values, values_read, levels_read);
}
if (!column_reader_->HasNext()) { NextRowGroup(); }
}

if (descr_->max_definition_level() > 0) {
// TODO: Shrink arrays in the case they are too large
if (valid_bits_idx_ < batch_size * 0.8) {
// Shrink arrays as they are larger than the output.
// TODO(PARQUET-761/ARROW-360): Use realloc internally to shrink the arrays
// without the need for a copy. Given a decent underlying allocator this
// should still free some underlying pages to the OS.

auto data_buffer = std::make_shared<PoolBuffer>(pool_);
RETURN_NOT_OK(data_buffer->Resize(valid_bits_idx_ * sizeof(bool)));
memcpy(data_buffer->mutable_data(), data_buffer_->data(), data_buffer->size());
data_buffer_ = data_buffer;

auto valid_bits_buffer = std::make_shared<PoolBuffer>(pool_);
RETURN_NOT_OK(
valid_bits_buffer->Resize(::arrow::BitUtil::CeilByte(valid_bits_idx_) / 8));
memcpy(valid_bits_buffer->mutable_data(), valid_bits_buffer_->data(),
valid_bits_buffer->size());
valid_bits_buffer_ = valid_bits_buffer;
}
*out = std::make_shared<::arrow::BooleanArray>(
field_->type, valid_bits_idx_, data_buffer_, null_count_, valid_bits_buffer_);
// Relase the ownership
data_buffer_.reset();
valid_bits_buffer_.reset();
return Status::OK();
} else {
*out = std::make_shared<::arrow::BooleanArray>(
field_->type, valid_bits_idx_, data_buffer_);
data_buffer_.reset();
return Status::OK();
}
}

template <>
Status FlatColumnReader::Impl::TypedReadBatch<::arrow::StringType, ByteArrayType>(
int batch_size, std::shared_ptr<Array>* out) {
Expand Down
20 changes: 12 additions & 8 deletions src/parquet/arrow/test-util.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ using is_arrow_int = std::is_integral<typename ArrowType::c_type>;
template <typename ArrowType>
using is_arrow_string = std::is_same<ArrowType, ::arrow::StringType>;

template <typename ArrowType>
using is_arrow_bool = std::is_same<ArrowType, ::arrow::BooleanType>;

template <class ArrowType>
typename std::enable_if<is_arrow_float<ArrowType>::value, Status>::type NonNullArray(
size_t size, std::shared_ptr<Array>* out) {
Expand Down Expand Up @@ -70,8 +73,9 @@ typename std::enable_if<is_arrow_string<ArrowType>::value, Status>::type NonNull
return builder.Finish(out);
}

template <>
Status NonNullArray<::arrow::BooleanType>(size_t size, std::shared_ptr<Array>* out) {
template <class ArrowType>
typename std::enable_if<is_arrow_bool<ArrowType>::value, Status>::type NonNullArray(
size_t size, std::shared_ptr<Array>* out) {
std::vector<uint8_t> values;
::arrow::test::randint<uint8_t>(size, 0, 1, &values);
::arrow::BooleanBuilder builder(
Expand Down Expand Up @@ -135,8 +139,8 @@ typename std::enable_if<is_arrow_string<ArrowType>::value, Status>::type Nullabl
}

// This helper function only supports (size/2) nulls yet.
template <>
Status NullableArray<::arrow::BooleanType>(
template <class ArrowType>
typename std::enable_if<is_arrow_bool<ArrowType>::value, Status>::type NullableArray(
size_t size, size_t num_nulls, std::shared_ptr<Array>* out) {
std::vector<uint8_t> values;
::arrow::test::randint<uint8_t>(size, 0, 1, &values);
Expand Down Expand Up @@ -176,19 +180,19 @@ void ExpectArray(T* expected, Array* result) {
}

template <typename ArrowType>
void ExpectArray(typename ArrowType::c_type* expected, Array* result) {
void ExpectArrayT(void* expected, Array* result) {
::arrow::PrimitiveArray* p_array = static_cast<::arrow::PrimitiveArray*>(result);
for (int64_t i = 0; i < result->length(); i++) {
EXPECT_EQ(expected[i],
EXPECT_EQ(reinterpret_cast<typename ArrowType::c_type*>(expected)[i],
reinterpret_cast<const typename ArrowType::c_type*>(p_array->data()->data())[i]);
}
}

template <>
void ExpectArray<::arrow::BooleanType>(uint8_t* expected, Array* result) {
void ExpectArrayT<::arrow::BooleanType>(void* expected, Array* result) {
::arrow::BooleanBuilder builder(
::arrow::default_memory_pool(), std::make_shared<::arrow::BooleanType>());
builder.Append(expected, result->length());
builder.Append(reinterpret_cast<uint8_t*>(expected), result->length());

std::shared_ptr<Array> expected_array;
EXPECT_OK(builder.Finish(&expected_array));
Expand Down
8 changes: 8 additions & 0 deletions src/parquet/arrow/writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <algorithm>
#include <vector>

#include "parquet/arrow/io.h"
#include "parquet/arrow/schema.h"
#include "parquet/arrow/utils.h"

Expand Down Expand Up @@ -370,6 +371,13 @@ Status WriteFlatTable(const Table* table, MemoryPool* pool,
return writer.Close();
}

Status WriteFlatTable(const Table* table, MemoryPool* pool,
const std::shared_ptr<::arrow::io::OutputStream>& sink, int64_t chunk_size,
const std::shared_ptr<WriterProperties>& properties) {
auto parquet_sink = std::make_shared<ParquetWriteSink>(sink);
return WriteFlatTable(table, pool, parquet_sink, chunk_size, properties);
}

} // namespace arrow

} // namespace parquet
7 changes: 7 additions & 0 deletions src/parquet/arrow/writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
#include "parquet/api/schema.h"
#include "parquet/api/writer.h"

#include "arrow/io/interfaces.h"

namespace arrow {

class Array;
Expand Down Expand Up @@ -71,6 +73,11 @@ ::arrow::Status PARQUET_EXPORT WriteFlatTable(const ::arrow::Table* table,
int64_t chunk_size,
const std::shared_ptr<WriterProperties>& properties = default_writer_properties());

::arrow::Status PARQUET_EXPORT WriteFlatTable(const ::arrow::Table* table,
::arrow::MemoryPool* pool, const std::shared_ptr<::arrow::io::OutputStream>& sink,
int64_t chunk_size,
const std::shared_ptr<WriterProperties>& properties = default_writer_properties());

} // namespace arrow

} // namespace parquet
Expand Down
2 changes: 1 addition & 1 deletion thirdparty/versions.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# specific language governing permissions and limitations
# under the License.

ARROW_VERSION="d946e7917d55cb220becd6469ae93430f2e60764"
ARROW_VERSION="86f56a6073c3254487ede3aff1dc9d117d24adaf"
ARROW_URL="https://github.com/apache/arrow/archive/${ARROW_VERSION}.tar.gz"
ARROW_BASEDIR="arrow-${ARROW_VERSION}"

Expand Down