Skip to content

Commit cef7e80

Browse files
committed
ARROW-5559: [C++] Add an IpcOptions structure
Switch existing APIs to take IpcOptions rather than individual option arguments. Closes #5032 from pitrou/ARROW-5559-ipc-options and squashes the following commits: e39ddc6 <Antoine Pitrou> ARROW-5559: Add an IpcOptions structure Authored-by: Antoine Pitrou <antoine@python.org> Signed-off-by: Antoine Pitrou <antoine@python.org>
1 parent 6eae790 commit cef7e80

24 files changed

+299
-175
lines changed

cpp/src/arrow/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,7 @@ if(ARROW_IPC)
255255
ipc/json-simple.cc
256256
ipc/message.cc
257257
ipc/metadata-internal.cc
258+
ipc/options.cc
258259
ipc/reader.cc
259260
ipc/writer.cc)
260261
set(ARROW_SRCS ${ARROW_SRCS} ${ARROW_IPC_SRCS})

cpp/src/arrow/extension_type-test.cc

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,8 @@ auto RoundtripBatch = [](const std::shared_ptr<RecordBatch>& batch,
221221
std::shared_ptr<RecordBatch>* out) {
222222
std::shared_ptr<io::BufferOutputStream> out_stream;
223223
ASSERT_OK(io::BufferOutputStream::Create(1024, default_memory_pool(), &out_stream));
224-
ASSERT_OK(ipc::WriteRecordBatchStream({batch}, out_stream.get()));
224+
ASSERT_OK(ipc::WriteRecordBatchStream({batch}, ipc::IpcOptions::Defaults(),
225+
out_stream.get()));
225226

226227
std::shared_ptr<Buffer> complete_ipc_stream;
227228
ASSERT_OK(out_stream->Finish(&complete_ipc_stream));
@@ -273,7 +274,8 @@ TEST_F(TestExtensionType, UnrecognizedExtension) {
273274
// and ensure that a plain instance of the storage type is created
274275
std::shared_ptr<io::BufferOutputStream> out_stream;
275276
ASSERT_OK(io::BufferOutputStream::Create(1024, default_memory_pool(), &out_stream));
276-
ASSERT_OK(ipc::WriteRecordBatchStream({batch}, out_stream.get()));
277+
ASSERT_OK(ipc::WriteRecordBatchStream({batch}, ipc::IpcOptions::Defaults(),
278+
out_stream.get()));
277279

278280
std::shared_ptr<Buffer> complete_ipc_stream;
279281
ASSERT_OK(out_stream->Finish(&complete_ipc_stream));

cpp/src/arrow/flight/client.cc

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -251,13 +251,13 @@ class GrpcStreamWriter : public FlightStreamWriter {
251251
std::shared_ptr<grpc::ClientReaderWriter<pb::FlightData, pb::PutResult>> writer,
252252
std::unique_ptr<FlightStreamWriter>* out);
253253

254-
Status WriteRecordBatch(const RecordBatch& batch, bool allow_64bit = false) override {
255-
return WriteWithMetadata(batch, nullptr, allow_64bit);
254+
Status WriteRecordBatch(const RecordBatch& batch) override {
255+
return WriteWithMetadata(batch, nullptr);
256256
}
257-
Status WriteWithMetadata(const RecordBatch& batch, std::shared_ptr<Buffer> app_metadata,
258-
bool allow_64bit = false) override {
257+
Status WriteWithMetadata(const RecordBatch& batch,
258+
std::shared_ptr<Buffer> app_metadata) override {
259259
app_metadata_ = app_metadata;
260-
return batch_writer_->WriteRecordBatch(batch, allow_64bit);
260+
return batch_writer_->WriteRecordBatch(batch);
261261
}
262262
void set_memory_pool(MemoryPool* pool) override {
263263
batch_writer_->set_memory_pool(pool);

cpp/src/arrow/flight/client.h

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,8 +86,7 @@ class ARROW_FLIGHT_EXPORT FlightStreamReader : public MetadataRecordBatchReader
8686
class ARROW_FLIGHT_EXPORT FlightStreamWriter : public ipc::RecordBatchWriter {
8787
public:
8888
virtual Status WriteWithMetadata(const RecordBatch& batch,
89-
std::shared_ptr<Buffer> app_metadata,
90-
bool allow_64bit = false) = 0;
89+
std::shared_ptr<Buffer> app_metadata) = 0;
9190
};
9291

9392
#ifdef _MSC_VER

cpp/src/arrow/flight/perf-server.cc

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ class PerfDataStream : public FlightDataStream {
7474
std::shared_ptr<Schema> schema() override { return schema_; }
7575

7676
Status GetSchemaPayload(FlightPayload* payload) override {
77-
return ipc::internal::GetSchemaPayload(*schema_, &dictionary_memo_,
77+
return ipc::internal::GetSchemaPayload(*schema_, ipc_options_, &dictionary_memo_,
7878
&payload->ipc_message);
7979
}
8080

@@ -103,8 +103,8 @@ class PerfDataStream : public FlightDataStream {
103103
} else {
104104
records_sent_ += batch_length_;
105105
}
106-
return ipc::internal::GetRecordBatchPayload(*batch, default_memory_pool(),
107-
&payload->ipc_message);
106+
return ipc::internal::GetRecordBatchPayload(
107+
*batch, ipc_options_, default_memory_pool(), &payload->ipc_message);
108108
}
109109

110110
private:
@@ -115,6 +115,7 @@ class PerfDataStream : public FlightDataStream {
115115
int64_t records_sent_;
116116
std::shared_ptr<Schema> schema_;
117117
ipc::DictionaryMemo dictionary_memo_;
118+
ipc::IpcOptions ipc_options_;
118119
std::shared_ptr<RecordBatch> batch_;
119120
ArrayVector arrays_;
120121
};

cpp/src/arrow/flight/server.cc

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636

3737
#include "arrow/buffer.h"
3838
#include "arrow/ipc/dictionary.h"
39+
#include "arrow/ipc/options.h"
3940
#include "arrow/ipc/reader.h"
4041
#include "arrow/ipc/writer.h"
4142
#include "arrow/memory_pool.h"
@@ -640,13 +641,13 @@ class RecordBatchStream::RecordBatchStreamImpl {
640641

641642
RecordBatchStreamImpl(const std::shared_ptr<RecordBatchReader>& reader,
642643
MemoryPool* pool)
643-
: pool_(pool), reader_(reader) {}
644+
: pool_(pool), reader_(reader), ipc_options_(ipc::IpcOptions::Defaults()) {}
644645

645646
std::shared_ptr<Schema> schema() { return reader_->schema(); }
646647

647648
Status GetSchemaPayload(FlightPayload* payload) {
648-
return ipc::internal::GetSchemaPayload(*reader_->schema(), &dictionary_memo_,
649-
&payload->ipc_message);
649+
return ipc::internal::GetSchemaPayload(*reader_->schema(), ipc_options_,
650+
&dictionary_memo_, &payload->ipc_message);
650651
}
651652

652653
Status Next(FlightPayload* payload) {
@@ -664,7 +665,7 @@ class RecordBatchStream::RecordBatchStreamImpl {
664665
if (stage_ == Stage::DICTIONARY) {
665666
if (dictionary_index_ == static_cast<int>(dictionaries_.size())) {
666667
stage_ = Stage::RECORD_BATCH;
667-
return ipc::internal::GetRecordBatchPayload(*current_batch_, pool_,
668+
return ipc::internal::GetRecordBatchPayload(*current_batch_, ipc_options_, pool_,
668669
&payload->ipc_message);
669670
} else {
670671
return GetNextDictionary(payload);
@@ -679,15 +680,15 @@ class RecordBatchStream::RecordBatchStreamImpl {
679680
payload->ipc_message.metadata = nullptr;
680681
return Status::OK();
681682
} else {
682-
return ipc::internal::GetRecordBatchPayload(*current_batch_, pool_,
683+
return ipc::internal::GetRecordBatchPayload(*current_batch_, ipc_options_, pool_,
683684
&payload->ipc_message);
684685
}
685686
}
686687

687688
private:
688689
Status GetNextDictionary(FlightPayload* payload) {
689690
const auto& it = dictionaries_[dictionary_index_++];
690-
return ipc::internal::GetDictionaryPayload(it.first, it.second, pool_,
691+
return ipc::internal::GetDictionaryPayload(it.first, it.second, ipc_options_, pool_,
691692
&payload->ipc_message);
692693
}
693694

@@ -703,6 +704,7 @@ class RecordBatchStream::RecordBatchStreamImpl {
703704
MemoryPool* pool_;
704705
std::shared_ptr<RecordBatchReader> reader_;
705706
ipc::DictionaryMemo dictionary_memo_;
707+
ipc::IpcOptions ipc_options_;
706708
std::shared_ptr<RecordBatch> current_batch_;
707709
std::vector<std::pair<int64_t, std::shared_ptr<Array>>> dictionaries_;
708710

cpp/src/arrow/ipc/message.h

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
#include <memory>
2525
#include <string>
2626

27+
#include "arrow/ipc/options.h"
2728
#include "arrow/status.h"
2829
#include "arrow/util/macros.h"
2930
#include "arrow/util/visibility.h"
@@ -57,11 +58,6 @@ enum class MetadataVersion : char {
5758
V4
5859
};
5960

60-
// ARROW-109: We set this number arbitrarily to help catch user mistakes. For
61-
// deeply nested schemas, it is expected the user will indicate explicitly the
62-
// maximum allowed recursion depth
63-
constexpr int kMaxNestingDepth = 64;
64-
6561
// Read interface classes. We do not fully deserialize the flatbuffers so that
6662
// individual fields metadata can be retrieved from very large schema without
6763
//

cpp/src/arrow/ipc/options.cc

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#include "arrow/ipc/options.h"
19+
20+
namespace arrow {
21+
namespace ipc {
22+
23+
IpcOptions IpcOptions::Defaults() { return IpcOptions(); }
24+
25+
} // namespace ipc
26+
} // namespace arrow

cpp/src/arrow/ipc/options.h

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#pragma once
19+
20+
#include <cstdint>
21+
22+
#include "arrow/util/visibility.h"
23+
24+
namespace arrow {
25+
namespace ipc {
26+
27+
// ARROW-109: We set this number arbitrarily to help catch user mistakes. For
28+
// deeply nested schemas, it is expected the user will indicate explicitly the
29+
// maximum allowed recursion depth
30+
constexpr int kMaxNestingDepth = 64;
31+
32+
struct ARROW_EXPORT IpcOptions {
33+
// If true, allow field lengths that don't fit in a signed 32-bit int.
34+
// Some implementations may not be able to parse such streams.
35+
bool allow_64bit = false;
36+
// The maximum permitted schema nesting depth.
37+
int max_recursion_depth = kMaxNestingDepth;
38+
39+
static IpcOptions Defaults();
40+
};
41+
42+
} // namespace ipc
43+
} // namespace arrow

cpp/src/arrow/ipc/read-write-benchmark.cc

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ std::shared_ptr<RecordBatch> MakeRecordBatch(int64_t total_size, int64_t num_fie
5050
static void WriteRecordBatch(benchmark::State& state) { // NOLINT non-const reference
5151
// 1MB
5252
constexpr int64_t kTotalSize = 1 << 20;
53+
auto options = ipc::IpcOptions::Defaults();
5354

5455
std::shared_ptr<ResizableBuffer> buffer;
5556
ABORT_NOT_OK(AllocateResizableBuffer(kTotalSize & 2, &buffer));
@@ -60,7 +61,7 @@ static void WriteRecordBatch(benchmark::State& state) { // NOLINT non-const ref
6061
int32_t metadata_length;
6162
int64_t body_length;
6263
if (!ipc::WriteRecordBatch(*record_batch, 0, &stream, &metadata_length, &body_length,
63-
default_memory_pool())
64+
options, default_memory_pool())
6465
.ok()) {
6566
state.SkipWithError("Failed to write!");
6667
}
@@ -71,6 +72,7 @@ static void WriteRecordBatch(benchmark::State& state) { // NOLINT non-const ref
7172
static void ReadRecordBatch(benchmark::State& state) { // NOLINT non-const reference
7273
// 1MB
7374
constexpr int64_t kTotalSize = 1 << 20;
75+
auto options = ipc::IpcOptions::Defaults();
7476

7577
std::shared_ptr<ResizableBuffer> buffer;
7678
ABORT_NOT_OK(AllocateResizableBuffer(kTotalSize & 2, &buffer));
@@ -81,7 +83,7 @@ static void ReadRecordBatch(benchmark::State& state) { // NOLINT non-const refe
8183
int32_t metadata_length;
8284
int64_t body_length;
8385
if (!ipc::WriteRecordBatch(*record_batch, 0, &stream, &metadata_length, &body_length,
84-
default_memory_pool())
86+
options, default_memory_pool())
8587
.ok()) {
8688
state.SkipWithError("Failed to write!");
8789
}

0 commit comments

Comments
 (0)