Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cpp/src/arrow/dataset/dataset_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ inline bool operator==(const SubtreeImpl::Encoded& l, const SubtreeImpl::Encoded
/// but of the wrong type, an error is returned.
template <typename T>
arrow::Result<std::shared_ptr<T>> GetFragmentScanOptions(
const std::string& type_name, ScanOptions* scan_options,
const std::string& type_name, const ScanOptions* scan_options,
const std::shared_ptr<FragmentScanOptions>& default_options) {
auto source = default_options;
if (scan_options && scan_options->fragment_scan_options) {
Expand Down
32 changes: 24 additions & 8 deletions cpp/src/arrow/dataset/file_ipc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "arrow/ipc/writer.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/iterator.h"
#include "arrow/util/logging.h"

namespace arrow {

Expand Down Expand Up @@ -81,15 +82,28 @@ class IpcScanTask : public ScanTask {

Result<RecordBatchIterator> Execute() override {
struct Impl {
static Result<RecordBatchIterator> Make(
const FileSource& source, std::vector<std::string> materialized_fields,
MemoryPool* pool) {
static Result<RecordBatchIterator> Make(const FileSource& source,
FileFormat* format,
const ScanOptions* scan_options) {
ARROW_ASSIGN_OR_RAISE(auto reader, OpenReader(source));

auto options = default_read_options();
options.memory_pool = pool;
ARROW_ASSIGN_OR_RAISE(options.included_fields,
GetIncludedFields(*reader->schema(), materialized_fields));
ARROW_ASSIGN_OR_RAISE(
auto ipc_scan_options,
GetFragmentScanOptions<IpcFragmentScanOptions>(
kIpcTypeName, scan_options, format->default_fragment_scan_options));
auto options = ipc_scan_options->options ? *ipc_scan_options->options
: default_read_options();
options.memory_pool = scan_options->pool;
options.use_threads = false;
if (!options.included_fields.empty()) {
// Cannot set them here
ARROW_LOG(WARNING) << "IpcFragmentScanOptions.options->included_fields was set "
"but will be ignored; included_fields are derived from "
"fields referenced by the scan";
}
ARROW_ASSIGN_OR_RAISE(
options.included_fields,
GetIncludedFields(*reader->schema(), scan_options->MaterializedFields()));

ARROW_ASSIGN_OR_RAISE(reader, OpenReader(source, options));
return RecordBatchIterator(Impl{std::move(reader), 0});
Expand All @@ -107,7 +121,9 @@ class IpcScanTask : public ScanTask {
int i_;
};

return Impl::Make(source_, options_->MaterializedFields(), options_->pool);
return Impl::Make(
source_, internal::checked_pointer_cast<FileFragment>(fragment_)->format().get(),
options_.get());
}

private:
Expand Down
14 changes: 13 additions & 1 deletion cpp/src/arrow/dataset/file_ipc.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,12 @@
namespace arrow {
namespace dataset {

constexpr char kIpcTypeName[] = "ipc";

/// \brief A FileFormat implementation that reads from and writes to Ipc files
class ARROW_DS_EXPORT IpcFileFormat : public FileFormat {
public:
std::string type_name() const override { return "ipc"; }
std::string type_name() const override { return kIpcTypeName; }

bool Equals(const FileFormat& other) const override {
return type_name() == other.type_name();
Expand All @@ -59,6 +61,16 @@ class ARROW_DS_EXPORT IpcFileFormat : public FileFormat {
std::shared_ptr<FileWriteOptions> DefaultWriteOptions() override;
};

/// \brief Per-scan options for IPC fragments
class ARROW_DS_EXPORT IpcFragmentScanOptions : public FragmentScanOptions {
public:
std::string type_name() const override { return kIpcTypeName; }

/// Options passed to the IPC file reader.
/// included_fields, memory_pool, and use_threads are ignored.
std::shared_ptr<ipc::IpcReadOptions> options;
};

Comment on lines +64 to +73
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note I didn't bother exposing this to Python/R since the IPC options in general aren't really exposed.

class ARROW_DS_EXPORT IpcFileWriteOptions : public FileWriteOptions {
public:
/// Options passed to ipc::MakeFileWriter. use_threads is ignored
Expand Down
22 changes: 22 additions & 0 deletions cpp/src/arrow/dataset/file_ipc_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,28 @@ TEST_F(TestIpcFileFormat, ScanRecordBatchReader) {
ASSERT_EQ(row_count, kNumRows);
}

TEST_F(TestIpcFileFormat, FragmentScanOptions) {
auto reader = GetRecordBatchReader(
// ARROW-12077: on Windows/mimalloc/release, nullable list column leads to crash
schema({field("list", list(float64()), false,
key_value_metadata({{"max_length", "1"}})),
field("f64", float64())}));
auto source = GetFileSource(reader.get());

SetSchema(reader->schema()->fields());
ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source));

// Set scan options that ensure reading fails
auto fragment_scan_options = std::make_shared<IpcFragmentScanOptions>();
fragment_scan_options->options = std::make_shared<ipc::IpcReadOptions>();
fragment_scan_options->options->max_recursion_depth = 0;
opts_->fragment_scan_options = fragment_scan_options;
ASSERT_OK_AND_ASSIGN(auto scan_tasks, fragment->Scan(opts_));
ASSERT_OK_AND_ASSIGN(auto scan_task, scan_tasks.Next());
ASSERT_OK_AND_ASSIGN(auto batches, scan_task->Execute());
ASSERT_RAISES(Invalid, batches.Next());
}

TEST_F(TestIpcFileFormat, ScanRecordBatchReaderWithVirtualColumn) {
auto reader = GetRecordBatchReader(schema({field("f64", float64())}));
auto source = GetFileSource(reader.get());
Expand Down
58 changes: 40 additions & 18 deletions cpp/src/arrow/dataset/file_parquet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -128,15 +128,18 @@ class ParquetScanTask : public ScanTask {
};

static parquet::ReaderProperties MakeReaderProperties(
const ParquetFileFormat& format, MemoryPool* pool = default_memory_pool()) {
const ParquetFileFormat& format, ParquetFragmentScanOptions* parquet_scan_options,
MemoryPool* pool = default_memory_pool()) {
// Can't mutate pool after construction
parquet::ReaderProperties properties(pool);
if (format.reader_options.use_buffered_stream) {
if (parquet_scan_options->reader_properties->is_buffered_stream_enabled()) {
properties.enable_buffered_stream();
} else {
properties.disable_buffered_stream();
}
properties.set_buffer_size(format.reader_options.buffer_size);
properties.file_decryption_properties(format.reader_options.file_decryption_properties);
properties.set_buffer_size(parquet_scan_options->reader_properties->buffer_size());
properties.file_decryption_properties(
parquet_scan_options->reader_properties->file_decryption_properties());
return properties;
}

Expand Down Expand Up @@ -249,24 +252,23 @@ bool ParquetFileFormat::Equals(const FileFormat& other) const {
checked_cast<const ParquetFileFormat&>(other).reader_options;

// FIXME implement comparison for decryption options
// FIXME extract these to scan time options so comparison is unnecessary
return reader_options.use_buffered_stream == other_reader_options.use_buffered_stream &&
reader_options.buffer_size == other_reader_options.buffer_size &&
reader_options.dict_columns == other_reader_options.dict_columns;
return reader_options.dict_columns == other_reader_options.dict_columns;
}

ParquetFileFormat::ParquetFileFormat(const parquet::ReaderProperties& reader_properties) {
reader_options.use_buffered_stream = reader_properties.is_buffered_stream_enabled();
reader_options.buffer_size = reader_properties.buffer_size();
reader_options.file_decryption_properties =
reader_properties.file_decryption_properties();
auto parquet_scan_options = std::make_shared<ParquetFragmentScanOptions>();
*parquet_scan_options->reader_properties = reader_properties;
default_fragment_scan_options = std::move(parquet_scan_options);
}

Result<bool> ParquetFileFormat::IsSupported(const FileSource& source) const {
try {
ARROW_ASSIGN_OR_RAISE(auto input, source.Open());
auto reader =
parquet::ParquetFileReader::Open(std::move(input), MakeReaderProperties(*this));
ARROW_ASSIGN_OR_RAISE(auto parquet_scan_options,
GetFragmentScanOptions<ParquetFragmentScanOptions>(
kParquetTypeName, nullptr, default_fragment_scan_options));
auto reader = parquet::ParquetFileReader::Open(
std::move(input), MakeReaderProperties(*this, parquet_scan_options.get()));
std::shared_ptr<parquet::FileMetaData> metadata = reader->metadata();
return metadata != nullptr && metadata->can_decompress();
} catch (const ::parquet::ParquetInvalidOrCorruptedFileException& e) {
Expand All @@ -290,8 +292,11 @@ Result<std::shared_ptr<Schema>> ParquetFileFormat::Inspect(

Result<std::unique_ptr<parquet::arrow::FileReader>> ParquetFileFormat::GetReader(
const FileSource& source, ScanOptions* options) const {
ARROW_ASSIGN_OR_RAISE(auto parquet_scan_options,
GetFragmentScanOptions<ParquetFragmentScanOptions>(
kParquetTypeName, options, default_fragment_scan_options));
MemoryPool* pool = options ? options->pool : default_memory_pool();
auto properties = MakeReaderProperties(*this, pool);
auto properties = MakeReaderProperties(*this, parquet_scan_options.get(), pool);

ARROW_ASSIGN_OR_RAISE(auto input, source.Open());
std::unique_ptr<parquet::ParquetFileReader> reader;
Expand All @@ -310,7 +315,8 @@ Result<std::unique_ptr<parquet::arrow::FileReader>> ParquetFileFormat::GetReader
}

if (options && !options->use_threads) {
arrow_properties.set_use_threads(reader_options.enable_parallel_column_conversion);
arrow_properties.set_use_threads(
parquet_scan_options->enable_parallel_column_conversion);
}

std::unique_ptr<parquet::arrow::FileReader> arrow_reader;
Expand Down Expand Up @@ -356,15 +362,21 @@ Result<ScanTaskIterator> ParquetFileFormat::ScanFile(
auto column_projection = InferColumnProjection(*reader, *options);
ScanTaskVector tasks(row_groups.size());

ARROW_ASSIGN_OR_RAISE(
auto parquet_scan_options,
GetFragmentScanOptions<ParquetFragmentScanOptions>(kParquetTypeName, options.get(),
default_fragment_scan_options));
std::shared_ptr<std::once_flag> pre_buffer_once = nullptr;
if (reader_options.pre_buffer) {
if (parquet_scan_options->arrow_reader_properties->pre_buffer()) {
pre_buffer_once = std::make_shared<std::once_flag>();
}

for (size_t i = 0; i < row_groups.size(); ++i) {
tasks[i] = std::make_shared<ParquetScanTask>(
row_groups[i], column_projection, reader, pre_buffer_once, row_groups,
reader_options.io_context, reader_options.cache_options, options, fragment);
parquet_scan_options->arrow_reader_properties->io_context(),
parquet_scan_options->arrow_reader_properties->cache_options(), options,
fragment);
}

return MakeVectorIterator(std::move(tasks));
Expand Down Expand Up @@ -586,6 +598,16 @@ Result<std::vector<int>> ParquetFileFragment::FilterRowGroups(Expression predica
return row_groups;
}

//
// ParquetFragmentScanOptions
//

ParquetFragmentScanOptions::ParquetFragmentScanOptions() {
reader_properties = std::make_shared<parquet::ReaderProperties>();
arrow_reader_properties =
std::make_shared<parquet::ArrowReaderProperties>(/*use_threads=*/false);
}

//
// ParquetDatasetFactory
//
Expand Down
53 changes: 27 additions & 26 deletions cpp/src/arrow/dataset/file_parquet.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ struct SchemaManifest;
namespace arrow {
namespace dataset {

constexpr char kParquetTypeName[] = "parquet";

/// \brief A FileFormat implementation that reads from Parquet files
class ARROW_DS_EXPORT ParquetFileFormat : public FileFormat {
public:
Expand All @@ -66,45 +68,23 @@ class ARROW_DS_EXPORT ParquetFileFormat : public FileFormat {
/// memory_pool will be ignored.
explicit ParquetFileFormat(const parquet::ReaderProperties& reader_properties);

std::string type_name() const override { return "parquet"; }
std::string type_name() const override { return kParquetTypeName; }

bool splittable() const override { return true; }

bool Equals(const FileFormat& other) const override;

struct ReaderOptions {
/// \defgroup parquet-file-format-reader-properties properties which correspond to
/// members of parquet::ReaderProperties.
///
/// We don't embed parquet::ReaderProperties directly because we get memory_pool from
/// ScanOptions at scan time and provide differing defaults.
///
/// @{
bool use_buffered_stream = false;
int64_t buffer_size = 1 << 13;
std::shared_ptr<parquet::FileDecryptionProperties> file_decryption_properties;
/// @}

/// \defgroup parquet-file-format-arrow-reader-properties properties which correspond
/// to members of parquet::ArrowReaderProperties.
///
/// We don't embed parquet::ReaderProperties directly because we get batch_size from
/// ScanOptions at scan time, and we will never pass use_threads == true (since we
/// defer parallelization of the scan). Additionally column names (rather than
/// indices) are used to indicate dictionary columns.
/// We don't embed parquet::ReaderProperties directly because column names (rather
/// than indices) are used to indicate dictionary columns, and other options are
/// deferred to scan time.
///
/// @{
std::unordered_set<std::string> dict_columns;
bool pre_buffer = false;
arrow::io::CacheOptions cache_options = arrow::io::CacheOptions::Defaults();
arrow::io::IOContext io_context;
/// @}

/// EXPERIMENTAL: Parallelize conversion across columns. This option is ignored if a
/// scan is already parallelized across input files to avoid thread contention. This
/// option will be removed after support is added for simultaneous parallelization
/// across files and columns.
bool enable_parallel_column_conversion = false;
} reader_options;

Result<bool> IsSupported(const FileSource& source) const override;
Expand Down Expand Up @@ -206,6 +186,27 @@ class ARROW_DS_EXPORT ParquetFileFragment : public FileFragment {
friend class ParquetDatasetFactory;
};

/// \brief Per-scan options for Parquet fragments
class ARROW_DS_EXPORT ParquetFragmentScanOptions : public FragmentScanOptions {
public:
ParquetFragmentScanOptions();
std::string type_name() const override { return kParquetTypeName; }

/// Reader properties. Not all properties are respected: memory_pool comes from
/// ScanOptions.
std::shared_ptr<parquet::ReaderProperties> reader_properties;
/// Arrow reader properties. Not all properties are respected: batch_size comes from
/// ScanOptions, and use_threads will be overridden based on
/// enable_parallel_column_conversion. Additionally, dictionary columns come from
/// ParquetFileFormat::ReaderOptions::dict_columns.
std::shared_ptr<parquet::ArrowReaderProperties> arrow_reader_properties;
/// EXPERIMENTAL: Parallelize conversion across columns. This option is ignored if a
/// scan is already parallelized across input files to avoid thread contention. This
/// option will be removed after support is added for simultaneous parallelization
/// across files and columns.
bool enable_parallel_column_conversion = false;
};

class ARROW_DS_EXPORT ParquetFileWriteOptions : public FileWriteOptions {
public:
std::shared_ptr<parquet::WriterProperties> writer_properties;
Expand Down
5 changes: 4 additions & 1 deletion cpp/src/arrow/dataset/file_parquet_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -272,8 +272,10 @@ TEST_F(TestParquetFileFormat, ScanRecordBatchReaderPreBuffer) {
SetSchema(reader->schema()->fields());
SetFilter(literal(true));

format_->reader_options.pre_buffer = true;
ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source));
auto fragment_scan_options = std::make_shared<ParquetFragmentScanOptions>();
fragment_scan_options->arrow_reader_properties->set_pre_buffer(true);
opts_->fragment_scan_options = fragment_scan_options;
ASSERT_OK_AND_ASSIGN(auto scan_task_it, fragment->Scan(opts_));

int64_t task_count = 0;
Expand Down Expand Up @@ -636,6 +638,7 @@ TEST_F(TestParquetFileFormat, WriteRecordBatchReaderCustomOptions) {

options->arrow_writer_properties = parquet::ArrowWriterProperties::Builder()
.coerce_timestamps(coerce_timestamps_to)
->allow_truncated_timestamps()
->build();

EXPECT_OK_AND_ASSIGN(auto writer, format_->MakeWriter(sink, reader->schema(), options));
Expand Down
3 changes: 2 additions & 1 deletion cpp/src/arrow/dataset/test_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
#include "arrow/table.h"
#include "arrow/testing/generator.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/testing/random.h"
#include "arrow/util/io_util.h"
#include "arrow/util/iterator.h"
#include "arrow/util/logging.h"
Expand Down Expand Up @@ -102,7 +103,7 @@ std::unique_ptr<GeneratedRecordBatch<Gen>> MakeGeneratedRecordBatch(

std::unique_ptr<RecordBatchReader> MakeGeneratedRecordBatch(
std::shared_ptr<Schema> schema, int64_t batch_size, int64_t batch_repetitions) {
auto batch = ConstantArrayGenerator::Zeroes(batch_size, schema);
auto batch = random::GenerateBatch(schema->fields(), batch_size, /*seed=*/0);
int64_t i = 0;
return MakeGeneratedRecordBatch(
schema, [batch, i, batch_repetitions](std::shared_ptr<RecordBatch>* out) mutable {
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/arrow/dataset/type_fwd.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,11 @@ struct CsvFragmentScanOptions;
class IpcFileFormat;
class IpcFileWriter;
class IpcFileWriteOptions;
class IpcFragmentScanOptions;

class ParquetFileFormat;
class ParquetFileFragment;
class ParquetFragmentScanOptions;
class ParquetFileWriter;
class ParquetFileWriteOptions;

Expand Down
Loading