Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 14 additions & 12 deletions c_glib/parquet-glib/arrow-file-reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,12 +134,13 @@ gparquet_arrow_file_reader_new_arrow(GArrowSeekableInputStream *source, GError *
{
auto arrow_random_access_file = garrow_seekable_input_stream_get_raw(source);
auto arrow_memory_pool = arrow::default_memory_pool();
std::unique_ptr<parquet::arrow::FileReader> parquet_arrow_file_reader;
auto status = parquet::arrow::OpenFile(arrow_random_access_file,
arrow_memory_pool,
&parquet_arrow_file_reader);
if (garrow_error_check(error, status, "[parquet][arrow][file-reader][new-arrow]")) {
return gparquet_arrow_file_reader_new_raw(parquet_arrow_file_reader.release());
auto parquet_arrow_file_reader_result =
parquet::arrow::OpenFile(arrow_random_access_file, arrow_memory_pool);
if (garrow::check(error,
parquet_arrow_file_reader_result,
"[parquet][arrow][file-reader][new-arrow]")) {
return gparquet_arrow_file_reader_new_raw(
parquet_arrow_file_reader_result->release());
} else {
return NULL;
}
Expand Down Expand Up @@ -168,12 +169,13 @@ gparquet_arrow_file_reader_new_path(const gchar *path, GError **error)
std::shared_ptr<arrow::io::RandomAccessFile> arrow_random_access_file =
arrow_memory_mapped_file.ValueOrDie();
auto arrow_memory_pool = arrow::default_memory_pool();
std::unique_ptr<parquet::arrow::FileReader> parquet_arrow_file_reader;
auto status = parquet::arrow::OpenFile(arrow_random_access_file,
arrow_memory_pool,
&parquet_arrow_file_reader);
if (garrow::check(error, status, "[parquet][arrow][file-reader][new-path]")) {
return gparquet_arrow_file_reader_new_raw(parquet_arrow_file_reader.release());
auto parquet_arrow_file_reader_result =
parquet::arrow::OpenFile(arrow_random_access_file, arrow_memory_pool);
if (garrow::check(error,
parquet_arrow_file_reader_result,
"[parquet][arrow][file-reader][new-path]")) {
return gparquet_arrow_file_reader_new_raw(
parquet_arrow_file_reader_result->release());
} else {
return NULL;
}
Expand Down
2 changes: 1 addition & 1 deletion cpp/examples/arrow/parquet_read_write.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ arrow::Status ReadFullFile(std::string path_to_file) {

// Open Parquet file reader
std::unique_ptr<parquet::arrow::FileReader> arrow_reader;
ARROW_RETURN_NOT_OK(parquet::arrow::OpenFile(input, pool, &arrow_reader));
ARROW_ASSIGN_OR_RAISE(arrow_reader, parquet::arrow::OpenFile(input, pool));

// Read entire file as a single Arrow table
std::shared_ptr<arrow::Table> table;
Expand Down
16 changes: 8 additions & 8 deletions cpp/examples/parquet/parquet_arrow/reader_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ void read_whole_file() {
arrow::default_memory_pool()));

std::unique_ptr<parquet::arrow::FileReader> reader;
PARQUET_THROW_NOT_OK(
parquet::arrow::OpenFile(infile, arrow::default_memory_pool(), &reader));
PARQUET_ASSIGN_OR_THROW(reader,
parquet::arrow::OpenFile(infile, arrow::default_memory_pool()));
std::shared_ptr<arrow::Table> table;
PARQUET_THROW_NOT_OK(reader->ReadTable(&table));
std::cout << "Loaded " << table->num_rows() << " rows in " << table->num_columns()
Expand All @@ -85,8 +85,8 @@ void read_single_rowgroup() {
arrow::default_memory_pool()));

std::unique_ptr<parquet::arrow::FileReader> reader;
PARQUET_THROW_NOT_OK(
parquet::arrow::OpenFile(infile, arrow::default_memory_pool(), &reader));
PARQUET_ASSIGN_OR_THROW(reader,
parquet::arrow::OpenFile(infile, arrow::default_memory_pool()));
std::shared_ptr<arrow::Table> table;
PARQUET_THROW_NOT_OK(reader->RowGroup(0)->ReadTable(&table));
std::cout << "Loaded " << table->num_rows() << " rows in " << table->num_columns()
Expand All @@ -102,8 +102,8 @@ void read_single_column() {
arrow::default_memory_pool()));

std::unique_ptr<parquet::arrow::FileReader> reader;
PARQUET_THROW_NOT_OK(
parquet::arrow::OpenFile(infile, arrow::default_memory_pool(), &reader));
PARQUET_ASSIGN_OR_THROW(reader,
parquet::arrow::OpenFile(infile, arrow::default_memory_pool()));
std::shared_ptr<arrow::ChunkedArray> array;
PARQUET_THROW_NOT_OK(reader->ReadColumn(0, &array));
PARQUET_THROW_NOT_OK(arrow::PrettyPrint(*array, 4, &std::cout));
Expand All @@ -122,8 +122,8 @@ void read_single_column_chunk() {
arrow::default_memory_pool()));

std::unique_ptr<parquet::arrow::FileReader> reader;
PARQUET_THROW_NOT_OK(
parquet::arrow::OpenFile(infile, arrow::default_memory_pool(), &reader));
PARQUET_ASSIGN_OR_THROW(reader,
parquet::arrow::OpenFile(infile, arrow::default_memory_pool()));
std::shared_ptr<arrow::ChunkedArray> array;
PARQUET_THROW_NOT_OK(reader->RowGroup(0)->Column(0)->Read(&array));
PARQUET_THROW_NOT_OK(arrow::PrettyPrint(*array, 4, &std::cout));
Expand Down
50 changes: 21 additions & 29 deletions cpp/src/parquet/arrow/arrow_reader_writer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -448,9 +448,8 @@ void DoSimpleRoundtrip(const std::shared_ptr<Table>& table, bool use_threads,
ASSERT_NO_FATAL_FAILURE(
WriteTableToBuffer(table, row_group_size, arrow_properties, &buffer));

std::unique_ptr<FileReader> reader;
ASSERT_OK_NO_THROW(OpenFile(std::make_shared<BufferReader>(buffer),
::arrow::default_memory_pool(), &reader));
ASSERT_OK_AND_ASSIGN(auto reader, OpenFile(std::make_shared<BufferReader>(buffer),
::arrow::default_memory_pool()));

reader->set_use_threads(use_threads);
if (column_subset.size() > 0) {
Expand Down Expand Up @@ -1095,8 +1094,7 @@ TYPED_TEST(TestParquetIO, SingleColumnTableRequiredChunkedWriteArrowIO) {

auto source = std::make_shared<BufferReader>(pbuffer);
std::shared_ptr<::arrow::Table> out;
std::unique_ptr<FileReader> reader;
ASSERT_OK_NO_THROW(OpenFile(source, ::arrow::default_memory_pool(), &reader));
ASSERT_OK_AND_ASSIGN(auto reader, OpenFile(source, ::arrow::default_memory_pool()));
ASSERT_NO_FATAL_FAILURE(this->ReadTableFromFile(std::move(reader), &out));
ASSERT_EQ(1, out->num_columns());
ASSERT_EQ(values->length(), out->num_rows());
Expand Down Expand Up @@ -2295,9 +2293,8 @@ TEST(TestArrowReadWrite, ReadSingleRowGroup) {
ASSERT_NO_FATAL_FAILURE(WriteTableToBuffer(table, num_rows / 2,
default_arrow_writer_properties(), &buffer));

std::unique_ptr<FileReader> reader;
ASSERT_OK_NO_THROW(OpenFile(std::make_shared<BufferReader>(buffer),
::arrow::default_memory_pool(), &reader));
ASSERT_OK_AND_ASSIGN(auto reader, OpenFile(std::make_shared<BufferReader>(buffer),
::arrow::default_memory_pool()));

ASSERT_EQ(2, reader->num_row_groups());

Expand Down Expand Up @@ -2357,9 +2354,8 @@ TEST(TestArrowReadWrite, ReadTableManually) {
ASSERT_NO_FATAL_FAILURE(WriteTableToBuffer(expected, num_rows / 2,
default_arrow_writer_properties(), &buffer));

std::unique_ptr<FileReader> reader;
ASSERT_OK_NO_THROW(OpenFile(std::make_shared<BufferReader>(buffer),
::arrow::default_memory_pool(), &reader));
ASSERT_OK_AND_ASSIGN(auto reader, OpenFile(std::make_shared<BufferReader>(buffer),
::arrow::default_memory_pool()));

ASSERT_EQ(2, reader->num_row_groups());

Expand Down Expand Up @@ -2476,9 +2472,8 @@ TEST(TestArrowReadWrite, CoalescedReadsAndNonCoalescedReads) {
ASSERT_NO_FATAL_FAILURE(WriteTableToBuffer(expected, num_rows / 2,
default_arrow_writer_properties(), &buffer));

std::unique_ptr<FileReader> reader;
ASSERT_OK_NO_THROW(OpenFile(std::make_shared<BufferReader>(buffer),
::arrow::default_memory_pool(), &reader));
ASSERT_OK_AND_ASSIGN(auto reader, OpenFile(std::make_shared<BufferReader>(buffer),
::arrow::default_memory_pool()));

ASSERT_EQ(2, reader->num_row_groups());

Expand Down Expand Up @@ -2594,9 +2589,8 @@ TEST(TestArrowReadWrite, ScanContents) {
ASSERT_NO_FATAL_FAILURE(WriteTableToBuffer(table, num_rows / 2,
default_arrow_writer_properties(), &buffer));

std::unique_ptr<FileReader> reader;
ASSERT_OK_NO_THROW(OpenFile(std::make_shared<BufferReader>(buffer),
::arrow::default_memory_pool(), &reader));
ASSERT_OK_AND_ASSIGN(auto reader, OpenFile(std::make_shared<BufferReader>(buffer),
::arrow::default_memory_pool()));

int64_t num_rows_returned = 0;
ASSERT_OK_NO_THROW(reader->ScanContents({}, 256, &num_rows_returned));
Expand Down Expand Up @@ -2689,18 +2683,17 @@ TEST(TestArrowReadWrite, ListLargeRecords) {
ASSERT_NO_FATAL_FAILURE(WriteTableToBuffer(table, row_group_size,
default_arrow_writer_properties(), &buffer));

std::unique_ptr<FileReader> reader;
ASSERT_OK_NO_THROW(OpenFile(std::make_shared<BufferReader>(buffer),
::arrow::default_memory_pool(), &reader));
ASSERT_OK_AND_ASSIGN(auto reader, OpenFile(std::make_shared<BufferReader>(buffer),
::arrow::default_memory_pool()));

// Read everything
std::shared_ptr<Table> result;
ASSERT_OK_NO_THROW(reader->ReadTable(&result));
ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*table, *result));

// Read 1 record at a time
ASSERT_OK_NO_THROW(OpenFile(std::make_shared<BufferReader>(buffer),
::arrow::default_memory_pool(), &reader));
ASSERT_OK_AND_ASSIGN(reader, OpenFile(std::make_shared<BufferReader>(buffer),
::arrow::default_memory_pool()));

std::unique_ptr<ColumnReader> col_reader;
ASSERT_OK(reader->GetColumn(0, &col_reader));
Expand Down Expand Up @@ -2974,9 +2967,8 @@ TEST(ArrowReadWrite, DecimalStats) {
ASSERT_NO_FATAL_FAILURE(WriteTableToBuffer(table, /*row_group_size=*/100,
default_arrow_writer_properties(), &buffer));

std::unique_ptr<FileReader> reader;
ASSERT_OK_NO_THROW(OpenFile(std::make_shared<BufferReader>(buffer),
::arrow::default_memory_pool(), &reader));
ASSERT_OK_AND_ASSIGN(auto reader, OpenFile(std::make_shared<BufferReader>(buffer),
::arrow::default_memory_pool()));

std::shared_ptr<Scalar> min, max;
ReadSingleColumnFileStatistics(std::move(reader), &min, &max);
Expand Down Expand Up @@ -3575,8 +3567,8 @@ class TestNestedSchemaRead : public ::testing::TestWithParam<Repetition::type> {

void InitReader() {
ASSERT_OK_AND_ASSIGN(auto buffer, nested_parquet_->Finish());
ASSERT_OK_NO_THROW(OpenFile(std::make_shared<BufferReader>(buffer),
::arrow::default_memory_pool(), &reader_));
ASSERT_OK_AND_ASSIGN(reader_, OpenFile(std::make_shared<BufferReader>(buffer),
::arrow::default_memory_pool()));
}

void InitNewParquetFile(const std::shared_ptr<GroupNode>& schema, int num_rows) {
Expand Down Expand Up @@ -5344,8 +5336,8 @@ TEST(TestArrowReadWrite, MultithreadedWrite) {

// Read to verify the data.
std::shared_ptr<Table> result;
std::unique_ptr<FileReader> reader;
ASSERT_OK_NO_THROW(OpenFile(std::make_shared<BufferReader>(buffer), pool, &reader));
ASSERT_OK_AND_ASSIGN(auto reader,
OpenFile(std::make_shared<BufferReader>(buffer), pool));
ASSERT_OK_NO_THROW(reader->ReadTable(&result));
ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*table, *result));
}
Expand Down
7 changes: 6 additions & 1 deletion cpp/src/parquet/arrow/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1372,9 +1372,14 @@ Result<std::unique_ptr<FileReader>> FileReaderBuilder::Build() {

Status OpenFile(std::shared_ptr<::arrow::io::RandomAccessFile> file, MemoryPool* pool,
std::unique_ptr<FileReader>* reader) {
return OpenFile(std::move(file), pool).Value(reader);
}

Result<std::unique_ptr<FileReader>> OpenFile(
std::shared_ptr<::arrow::io::RandomAccessFile> file, MemoryPool* pool) {
FileReaderBuilder builder;
RETURN_NOT_OK(builder.Open(std::move(file)));
return builder.memory_pool(pool)->Build(reader);
return builder.memory_pool(pool)->Build();
}

namespace internal {
Expand Down
10 changes: 10 additions & 0 deletions cpp/src/parquet/arrow/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -357,11 +357,21 @@ class PARQUET_EXPORT FileReaderBuilder {
/// \brief Build FileReader from Arrow file and MemoryPool
///
/// Advanced settings are supported through the FileReaderBuilder class.
///
/// \deprecated Deprecated in 19.0.0. Use arrow::Result version instead.
ARROW_DEPRECATED("Deprecated in 19.0.0. Use arrow::Result version instead.")
PARQUET_EXPORT
::arrow::Status OpenFile(std::shared_ptr<::arrow::io::RandomAccessFile>,
::arrow::MemoryPool* allocator,
std::unique_ptr<FileReader>* reader);

/// \brief Build FileReader from Arrow file and MemoryPool
///
/// Advanced settings are supported through the FileReaderBuilder class.
PARQUET_EXPORT
::arrow::Result<std::unique_ptr<FileReader>> OpenFile(
std::shared_ptr<::arrow::io::RandomAccessFile>, ::arrow::MemoryPool* allocator);

/// @}

PARQUET_EXPORT
Expand Down
4 changes: 3 additions & 1 deletion cpp/src/parquet/arrow/reconstruct_internal_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,9 @@ class FileTester {
protected:
Status Open(std::shared_ptr<Buffer> buffer, MemoryPool* pool) {
pool_ = pool;
return OpenFile(std::make_shared<BufferReader>(buffer), pool_, &file_reader_);
ARROW_ASSIGN_OR_RAISE(file_reader_,
OpenFile(std::make_shared<BufferReader>(buffer), pool_));
return Status::OK();
}

MemoryPool* pool_;
Expand Down
Loading