Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
a78b47e
Refactoring to simplify internal IPC read path for reuse
wesm Feb 18, 2020
a18f105
Rebase
wesm Mar 3, 2020
3bc293d
Fix benchmark
wesm Mar 3, 2020
d51ff24
Proposed API for field projections on reading record batch
wesm Mar 3, 2020
ff3cbde
Rename confusing IPC function
wesm Mar 16, 2020
7c98ee8
Use Result<T> for reader APIs, implement unit tests for field selecti…
wesm Mar 17, 2020
c4d5454
Switch to result APIs in Python
wesm Mar 17, 2020
96b8770
Add missing export
wesm Mar 17, 2020
10be7d5
R fixes
wesm Mar 17, 2020
36a68b7
Fix up Flight
wesm Mar 17, 2020
c714f2b
Fix Flight Python build issues
wesm Mar 17, 2020
4a60848
glib fixes, lift write constructors out of the classes
wesm Mar 17, 2020
9c54534
Address various code review comments. Rename IpcOptions to IpcWriteOp…
wesm Mar 19, 2020
a8a37c3
Create boolean mask when filtering fields on IPC load
wesm Mar 19, 2020
ac22b0e
Code review comments
wesm Mar 19, 2020
9ff78b9
Fix child array loading
wesm Mar 20, 2020
3c9176d
Fixes for windows, fuzzing executables, run Integration tests seriall…
wesm Mar 20, 2020
23f27d6
Add missing export
wesm Mar 20, 2020
3e8a341
IWYU
wesm Mar 20, 2020
67ad393
Fix bugs turned up by CI, fuzzing corpus
wesm Mar 21, 2020
bffef74
Restore Result-returning RecordBatch{Stream,File}Writer::Open
kou Mar 23, 2020
b9f47c5
Use garrow::check() with Result
kou Mar 24, 2020
db5a9a8
Revert R changes temporarily
wesm Mar 24, 2020
e20343e
Restore deprecated Result-returning functions
wesm Mar 24, 2020
ce08470
Revert "Revert R changes temporarily"
wesm Mar 24, 2020
e7a4943
Skip dataset test that causes segfault on 32-bit Windows R
nealrichardson Mar 24, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 9 additions & 12 deletions c_glib/arrow-glib/reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -296,14 +296,12 @@ GArrowRecordBatchStreamReader *
garrow_record_batch_stream_reader_new(GArrowInputStream *stream,
GError **error)
{
using BaseType = arrow::ipc::RecordBatchReader;
using ReaderType = arrow::ipc::RecordBatchStreamReader;

auto arrow_input_stream = garrow_input_stream_get_raw(stream);
std::shared_ptr<BaseType> arrow_reader;
auto status = ReaderType::Open(arrow_input_stream, &arrow_reader);
if (garrow_error_check(error, status, "[record-batch-stream-reader][open]")) {
auto subtype = std::dynamic_pointer_cast<ReaderType>(arrow_reader);
auto arrow_reader = ReaderType::Open(arrow_input_stream);
if (garrow::check(error, arrow_reader, "[record-batch-stream-reader][open]")) {
auto subtype = std::dynamic_pointer_cast<ReaderType>(*arrow_reader);
return garrow_record_batch_stream_reader_new_raw(&subtype);
} else {
return NULL;
Expand Down Expand Up @@ -411,14 +409,13 @@ GArrowRecordBatchFileReader *
garrow_record_batch_file_reader_new(GArrowSeekableInputStream *file,
GError **error)
{
auto arrow_random_access_file = garrow_seekable_input_stream_get_raw(file);
using ReaderType = arrow::ipc::RecordBatchFileReader;

std::shared_ptr<arrow::ipc::RecordBatchFileReader> arrow_reader;
auto status =
arrow::ipc::RecordBatchFileReader::Open(arrow_random_access_file,
&arrow_reader);
if (garrow_error_check(error, status, "[record-batch-file-reader][open]")) {
return garrow_record_batch_file_reader_new_raw(&arrow_reader);
auto arrow_random_access_file = garrow_seekable_input_stream_get_raw(file);
auto arrow_reader = ReaderType::Open(arrow_random_access_file);
if (garrow::check(error, arrow_reader, "[record-batch-file-reader][open]")) {
auto subtype = std::dynamic_pointer_cast<ReaderType>(*arrow_reader);
return garrow_record_batch_file_reader_new_raw(&subtype);
} else {
return NULL;
}
Expand Down
43 changes: 20 additions & 23 deletions c_glib/arrow-glib/writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -238,17 +238,15 @@ garrow_record_batch_stream_writer_new(GArrowOutputStream *sink,
GArrowSchema *schema,
GError **error)
{
using BaseType = arrow::ipc::RecordBatchWriter;
using WriterType = arrow::ipc::RecordBatchStreamWriter;

auto arrow_sink = garrow_output_stream_get_raw(sink).get();
std::shared_ptr<BaseType> arrow_writer;
auto status = WriterType::Open(arrow_sink,
garrow_schema_get_raw(schema),
&arrow_writer);
if (garrow_error_check(error, status, "[record-batch-stream-writer][open]")) {
auto subtype = std::dynamic_pointer_cast<WriterType>(arrow_writer);
return garrow_record_batch_stream_writer_new_raw(&subtype);
auto arrow_schema = garrow_schema_get_raw(schema);
auto arrow_writer_result =
arrow::ipc::NewStreamWriter(arrow_sink, arrow_schema);
if (garrow::check(error,
arrow_writer_result,
"[record-batch-stream-writer][open]")) {
auto arrow_writer = *arrow_writer_result;
return garrow_record_batch_stream_writer_new_raw(&arrow_writer);
} else {
return NULL;
}
Expand Down Expand Up @@ -285,17 +283,16 @@ garrow_record_batch_file_writer_new(GArrowOutputStream *sink,
GArrowSchema *schema,
GError **error)
{
using BaseType = arrow::ipc::RecordBatchWriter;
using WriterType = arrow::ipc::RecordBatchFileWriter;

auto arrow_sink = garrow_output_stream_get_raw(sink);
std::shared_ptr<BaseType> arrow_writer;
auto status = WriterType::Open(arrow_sink.get(),
garrow_schema_get_raw(schema),
&arrow_writer);
if (garrow_error_check(error, status, "[record-batch-file-writer][open]")) {
auto subtype = std::dynamic_pointer_cast<WriterType>(arrow_writer);
return garrow_record_batch_file_writer_new_raw(&subtype);
auto arrow_sink = garrow_output_stream_get_raw(sink).get();
auto arrow_schema = garrow_schema_get_raw(schema);
std::shared_ptr<arrow::ipc::RecordBatchWriter> arrow_writer;
auto arrow_writer_result =
arrow::ipc::NewFileWriter(arrow_sink, arrow_schema);
if (garrow::check(error,
arrow_writer_result,
"[record-batch-file-writer][open]")) {
auto arrow_writer = *arrow_writer_result;
return garrow_record_batch_file_writer_new_raw(&arrow_writer);
} else {
return NULL;
}
Expand Down Expand Up @@ -529,7 +526,7 @@ garrow_record_batch_writer_get_raw(GArrowRecordBatchWriter *writer)
}

GArrowRecordBatchStreamWriter *
garrow_record_batch_stream_writer_new_raw(std::shared_ptr<arrow::ipc::RecordBatchStreamWriter> *arrow_writer)
garrow_record_batch_stream_writer_new_raw(std::shared_ptr<arrow::ipc::RecordBatchWriter> *arrow_writer)
{
auto writer =
GARROW_RECORD_BATCH_STREAM_WRITER(
Expand All @@ -540,7 +537,7 @@ garrow_record_batch_stream_writer_new_raw(std::shared_ptr<arrow::ipc::RecordBatc
}

GArrowRecordBatchFileWriter *
garrow_record_batch_file_writer_new_raw(std::shared_ptr<arrow::ipc::RecordBatchFileWriter> *arrow_writer)
garrow_record_batch_file_writer_new_raw(std::shared_ptr<arrow::ipc::RecordBatchWriter> *arrow_writer)
{
auto writer =
GARROW_RECORD_BATCH_FILE_WRITER(
Expand Down
4 changes: 2 additions & 2 deletions c_glib/arrow-glib/writer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@
GArrowRecordBatchWriter *garrow_record_batch_writer_new_raw(std::shared_ptr<arrow::ipc::RecordBatchWriter> *arrow_writer);
std::shared_ptr<arrow::ipc::RecordBatchWriter> garrow_record_batch_writer_get_raw(GArrowRecordBatchWriter *writer);

GArrowRecordBatchStreamWriter *garrow_record_batch_stream_writer_new_raw(std::shared_ptr<arrow::ipc::RecordBatchStreamWriter> *arrow_writer);
GArrowRecordBatchStreamWriter *garrow_record_batch_stream_writer_new_raw(std::shared_ptr<arrow::ipc::RecordBatchWriter> *arrow_writer);

GArrowRecordBatchFileWriter *garrow_record_batch_file_writer_new_raw(std::shared_ptr<arrow::ipc::RecordBatchFileWriter> *arrow_writer);
GArrowRecordBatchFileWriter *garrow_record_batch_file_writer_new_raw(std::shared_ptr<arrow::ipc::RecordBatchWriter> *arrow_writer);

GArrowFeatherFileWriter *garrow_feather_file_writer_new_raw(arrow::ipc::feather::TableWriter *arrow_writer);
arrow::ipc::feather::TableWriter *garrow_feather_file_writer_get_raw(GArrowFeatherFileWriter *writer);
2 changes: 1 addition & 1 deletion cpp/src/arrow/array.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

#pragma once

#include <atomic>
#include <atomic> // IWYU pragma: export
#include <cstdint>
#include <iosfwd>
#include <memory>
Expand Down
17 changes: 13 additions & 4 deletions cpp/src/arrow/compute/kernel.h
Original file line number Diff line number Diff line change
Expand Up @@ -273,15 +273,24 @@ class ARROW_EXPORT BinaryKernel : public OpKernel {
Datum* out) = 0;
};

// TODO doxygen 1.8.16 does not like the following code
///@cond INTERNAL

static inline bool CollectionEquals(const std::vector<Datum>& left,
const std::vector<Datum>& right) {
if (left.size() != right.size()) return false;

for (size_t i = 0; i < left.size(); i++)
if (!left[i].Equals(right[i])) return false;
if (left.size() != right.size()) {
return false;
}

for (size_t i = 0; i < left.size(); i++) {
if (!left[i].Equals(right[i])) {
return false;
}
}
return true;
}

///@endcond

} // namespace compute
} // namespace arrow
12 changes: 4 additions & 8 deletions cpp/src/arrow/dataset/file_ipc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,14 @@

#include <algorithm>
#include <memory>
#include <unordered_set>
#include <utility>
#include <vector>

#include "arrow/dataset/dataset_internal.h"
#include "arrow/dataset/file_base.h"
#include "arrow/dataset/filter.h"
#include "arrow/dataset/scanner.h"
#include "arrow/ipc/reader.h"
#include "arrow/ipc/writer.h"
#include "arrow/util/iterator.h"

namespace arrow {
Expand All @@ -40,12 +39,11 @@ Result<std::shared_ptr<ipc::RecordBatchFileReader>> OpenReader(
}

std::shared_ptr<ipc::RecordBatchFileReader> reader;
auto status = ipc::RecordBatchFileReader::Open(std::move(input), &reader);
auto status = ipc::RecordBatchFileReader::Open(std::move(input)).Value(&reader);
if (!status.ok()) {
return status.WithMessage("Could not open IPC input source '", source.path(),
"': ", status.message());
}
Comment on lines +42 to 46
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: avoid using Result::Value unless wrapping a result returning API into a status returning API

Suggested change
auto status = ipc::RecordBatchFileReader::Open(std::move(input)).Value(&reader);
if (!status.ok()) {
return status.WithMessage("Could not open IPC input source '", source.path(),
"': ", status.message());
}
auto maybe_reader = ipc::RecordBatchFileReader::Open(std::move(input));
auto status = maybe_reader.status();
if (!status.ok()) {
return status.WithMessage("Could not open IPC input source '", source.path(),
"': ", status.message());
}
return std::move(maybe_reader).ValueOrDie();

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is using Result::Value bad? Not obvious from reading the docstrings

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not bad, just less preferred due to mixing argout Status return and Result usage


return reader;
}

Expand Down Expand Up @@ -161,10 +159,8 @@ Result<std::shared_ptr<WriteTask>> IpcFileFormat::WriteFragment(
RETURN_NOT_OK(CreateDestinationParentDir());

ARROW_ASSIGN_OR_RAISE(auto out_stream, destination_.OpenWritable());

ARROW_ASSIGN_OR_RAISE(auto writer, ipc::RecordBatchFileWriter::Open(
out_stream.get(), fragment_->schema()));

ARROW_ASSIGN_OR_RAISE(auto writer,
ipc::NewFileWriter(out_stream.get(), fragment_->schema()));
ARROW_ASSIGN_OR_RAISE(auto scan_task_it, fragment_->Scan(scan_context_));

for (auto maybe_scan_task : scan_task_it) {
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/dataset/file_ipc.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@

#include <memory>
#include <string>
#include <utility>

#include "arrow/dataset/file_base.h"
#include "arrow/dataset/type_fwd.h"
#include "arrow/dataset/visibility.h"
#include "arrow/result.h"

namespace arrow {
namespace dataset {
Expand Down
7 changes: 2 additions & 5 deletions cpp/src/arrow/dataset/file_ipc_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,7 @@ class ArrowIpcWriterMixin : public ::testing::Test {
std::shared_ptr<Buffer> Write(RecordBatchReader* reader) {
EXPECT_OK_AND_ASSIGN(auto sink, io::BufferOutputStream::Create());

EXPECT_OK_AND_ASSIGN(auto writer,
ipc::RecordBatchFileWriter::Open(sink.get(), reader->schema()));
EXPECT_OK_AND_ASSIGN(auto writer, ipc::NewFileWriter(sink.get(), reader->schema()));

std::vector<std::shared_ptr<RecordBatch>> batches;
ARROW_EXPECT_OK(reader->ReadAll(&batches));
Expand All @@ -63,9 +62,7 @@ class ArrowIpcWriterMixin : public ::testing::Test {

std::shared_ptr<Buffer> Write(const Table& table) {
EXPECT_OK_AND_ASSIGN(auto sink, io::BufferOutputStream::Create());

EXPECT_OK_AND_ASSIGN(auto writer,
ipc::RecordBatchFileWriter::Open(sink.get(), table.schema()));
EXPECT_OK_AND_ASSIGN(auto writer, ipc::NewFileWriter(sink.get(), table.schema()));

ARROW_EXPECT_OK(writer->WriteTable(table));

Expand Down
8 changes: 4 additions & 4 deletions cpp/src/arrow/extension_type_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -218,14 +218,14 @@ TEST_F(TestExtensionType, ExtensionTypeTest) {
auto RoundtripBatch = [](const std::shared_ptr<RecordBatch>& batch,
std::shared_ptr<RecordBatch>* out) {
ASSERT_OK_AND_ASSIGN(auto out_stream, io::BufferOutputStream::Create());
ASSERT_OK(ipc::WriteRecordBatchStream({batch}, ipc::IpcOptions::Defaults(),
ASSERT_OK(ipc::WriteRecordBatchStream({batch}, ipc::IpcWriteOptions::Defaults(),
out_stream.get()));

ASSERT_OK_AND_ASSIGN(auto complete_ipc_stream, out_stream->Finish());

io::BufferReader reader(complete_ipc_stream);
std::shared_ptr<RecordBatchReader> batch_reader;
ASSERT_OK(ipc::RecordBatchStreamReader::Open(&reader, &batch_reader));
ASSERT_OK_AND_ASSIGN(batch_reader, ipc::RecordBatchStreamReader::Open(&reader));
ASSERT_OK(batch_reader->ReadNext(out));
};

Expand Down Expand Up @@ -256,7 +256,7 @@ TEST_F(TestExtensionType, UnrecognizedExtension) {
// Write full IPC stream including schema, then unregister type, then read
// and ensure that a plain instance of the storage type is created
ASSERT_OK_AND_ASSIGN(auto out_stream, io::BufferOutputStream::Create());
ASSERT_OK(ipc::WriteRecordBatchStream({batch}, ipc::IpcOptions::Defaults(),
ASSERT_OK(ipc::WriteRecordBatchStream({batch}, ipc::IpcWriteOptions::Defaults(),
out_stream.get()));

ASSERT_OK_AND_ASSIGN(auto complete_ipc_stream, out_stream->Finish());
Expand All @@ -270,7 +270,7 @@ TEST_F(TestExtensionType, UnrecognizedExtension) {

io::BufferReader reader(complete_ipc_stream);
std::shared_ptr<RecordBatchReader> batch_reader;
ASSERT_OK(ipc::RecordBatchStreamReader::Open(&reader, &batch_reader));
ASSERT_OK_AND_ASSIGN(batch_reader, ipc::RecordBatchStreamReader::Open(&reader));
std::shared_ptr<RecordBatch> read_batch;
ASSERT_OK(batch_reader->ReadNext(&read_batch));
CompareBatch(*batch_no_ext, *read_batch);
Expand Down
9 changes: 3 additions & 6 deletions cpp/src/arrow/flight/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ class GrpcStreamReader : public FlightStreamReader {

private:
friend class GrpcIpcMessageReader;
std::unique_ptr<ipc::RecordBatchReader> batch_reader_;
std::shared_ptr<ipc::RecordBatchReader> batch_reader_;
std::shared_ptr<Buffer> last_app_metadata_;
std::shared_ptr<ClientRpc> rpc_;
};
Expand Down Expand Up @@ -327,8 +327,8 @@ Status GrpcStreamReader::Open(std::unique_ptr<ClientRpc> rpc,
out->get()->rpc_ = std::move(rpc);
std::unique_ptr<GrpcIpcMessageReader> message_reader(
new GrpcIpcMessageReader(out->get(), out->get()->rpc_, std::move(stream)));
return ipc::RecordBatchStreamReader::Open(std::move(message_reader),
&(*out)->batch_reader_);
return (ipc::RecordBatchStreamReader::Open(std::move(message_reader))
.Value(&(*out)->batch_reader_));
}

std::shared_ptr<Schema> GrpcStreamReader::schema() const {
Expand Down Expand Up @@ -385,9 +385,6 @@ class GrpcStreamWriter : public FlightStreamWriter {
}
return Status::OK();
}
void set_memory_pool(MemoryPool* pool) override {
batch_writer_->set_memory_pool(pool);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add const IpcWriteOptions& options to GrpcStreamWriter::Open() instead?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we should do that. It will have to happen after @lidavidm's refactoring goes in. Opening a JIRA https://issues.apache.org/jira/browse/ARROW-8190

Status Close() override { return batch_writer_->Close(); }

private:
Expand Down
6 changes: 3 additions & 3 deletions cpp/src/arrow/flight/perf_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ class PerfDataStream : public FlightDataStream {
} else {
records_sent_ += batch_length_;
}
return ipc::internal::GetRecordBatchPayload(
*batch, ipc_options_, default_memory_pool(), &payload->ipc_message);
return ipc::internal::GetRecordBatchPayload(*batch, ipc_options_,
&payload->ipc_message);
}

private:
Expand All @@ -114,7 +114,7 @@ class PerfDataStream : public FlightDataStream {
int64_t records_sent_;
std::shared_ptr<Schema> schema_;
ipc::DictionaryMemo dictionary_memo_;
ipc::IpcOptions ipc_options_;
ipc::IpcWriteOptions ipc_options_;
std::shared_ptr<RecordBatch> batch_;
ArrayVector arrays_;
};
Expand Down
16 changes: 9 additions & 7 deletions cpp/src/arrow/flight/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,8 @@ class FlightMessageReaderImpl : public FlightMessageReader {
Status Init() {
message_reader_ = new FlightIpcMessageReader(reader_, &last_metadata_);
return ipc::RecordBatchStreamReader::Open(
std::unique_ptr<ipc::MessageReader>(message_reader_), &batch_reader_);
std::unique_ptr<ipc::MessageReader>(message_reader_))
.Value(&batch_reader_);
}

const FlightDescriptor& descriptor() const override {
Expand Down Expand Up @@ -804,7 +805,9 @@ class RecordBatchStream::RecordBatchStreamImpl {

RecordBatchStreamImpl(const std::shared_ptr<RecordBatchReader>& reader,
MemoryPool* pool)
: pool_(pool), reader_(reader), ipc_options_(ipc::IpcOptions::Defaults()) {}
: reader_(reader), ipc_options_(ipc::IpcWriteOptions::Defaults()) {
ipc_options_.memory_pool = pool;
}

std::shared_ptr<Schema> schema() { return reader_->schema(); }

Expand All @@ -828,7 +831,7 @@ class RecordBatchStream::RecordBatchStreamImpl {
if (stage_ == Stage::DICTIONARY) {
if (dictionary_index_ == static_cast<int>(dictionaries_.size())) {
stage_ = Stage::RECORD_BATCH;
return ipc::internal::GetRecordBatchPayload(*current_batch_, ipc_options_, pool_,
return ipc::internal::GetRecordBatchPayload(*current_batch_, ipc_options_,
&payload->ipc_message);
} else {
return GetNextDictionary(payload);
Expand All @@ -843,15 +846,15 @@ class RecordBatchStream::RecordBatchStreamImpl {
payload->ipc_message.metadata = nullptr;
return Status::OK();
} else {
return ipc::internal::GetRecordBatchPayload(*current_batch_, ipc_options_, pool_,
return ipc::internal::GetRecordBatchPayload(*current_batch_, ipc_options_,
&payload->ipc_message);
}
}

private:
Status GetNextDictionary(FlightPayload* payload) {
const auto& it = dictionaries_[dictionary_index_++];
return ipc::internal::GetDictionaryPayload(it.first, it.second, ipc_options_, pool_,
return ipc::internal::GetDictionaryPayload(it.first, it.second, ipc_options_,
&payload->ipc_message);
}

Expand All @@ -864,10 +867,9 @@ class RecordBatchStream::RecordBatchStreamImpl {
}

Stage stage_ = Stage::NEW;
MemoryPool* pool_;
std::shared_ptr<RecordBatchReader> reader_;
ipc::DictionaryMemo dictionary_memo_;
ipc::IpcOptions ipc_options_;
ipc::IpcWriteOptions ipc_options_;
std::shared_ptr<RecordBatch> current_batch_;
std::vector<std::pair<int64_t, std::shared_ptr<Array>>> dictionaries_;

Expand Down
Loading