Skip to content

Commit 85128f7

Browse files
committed
Refactor IPC/File record batch read/write structure to reflect discussion in ARROW-384
Change-Id: Ic119c5647bdb6b1ff1fb31b3712527058c68e40c
1 parent dbd6ed6 commit 85128f7

File tree

14 files changed

+400
-277
lines changed

14 files changed

+400
-277
lines changed

cpp/CMakeLists.txt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -528,7 +528,6 @@ if(ARROW_BUILD_TESTS)
528528
ExternalProject_Add(gflags_ep
529529
GIT_REPOSITORY https://github.com/gflags/gflags.git
530530
GIT_TAG cce68f0c9c5d054017425e6e6fd54f696d36e8ee
531-
# URL "https://github.com/gflags/gflags/archive/v${GFLAGS_VERSION}.tar.gz"
532531
BUILD_IN_SOURCE 1
533532
CMAKE_ARGS -DCMAKE_BUILD_TYPE=${CMAKE_BUILD_TYPE}
534533
-DCMAKE_INSTALL_PREFIX=${GFLAGS_PREFIX}

cpp/src/arrow/io/memory.cc

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -258,8 +258,11 @@ Status BufferOutputStream::Reserve(int64_t nbytes) {
258258
// ----------------------------------------------------------------------
259259
// In-memory buffer reader
260260

261-
BufferReader::BufferReader(const uint8_t* buffer, int buffer_size)
262-
: buffer_(buffer), buffer_size_(buffer_size), position_(0) {}
261+
BufferReader::BufferReader(const std::shared_ptr<Buffer>& buffer)
262+
: buffer_(buffer), data_(buffer->data()), size_(buffer->size()), position_(0) {}
263+
264+
BufferReader::BufferReader(const uint8_t* data, int64_t size)
265+
: buffer_(nullptr), data_(data), size_(size), position_(0) {}
263266

264267
BufferReader::~BufferReader() {}
265268

@@ -278,26 +281,32 @@ bool BufferReader::supports_zero_copy() const {
278281
}
279282

280283
Status BufferReader::Read(int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) {
281-
memcpy(buffer, buffer_ + position_, nbytes);
282-
*bytes_read = std::min(nbytes, buffer_size_ - position_);
284+
memcpy(buffer, data_ + position_, nbytes);
285+
*bytes_read = std::min(nbytes, size_ - position_);
283286
position_ += *bytes_read;
284287
return Status::OK();
285288
}
286289

287290
Status BufferReader::Read(int64_t nbytes, std::shared_ptr<Buffer>* out) {
288-
int64_t size = std::min(nbytes, buffer_size_ - position_);
289-
*out = std::make_shared<Buffer>(buffer_ + position_, size);
291+
int64_t size = std::min(nbytes, size_ - position_);
292+
293+
if (buffer_ != nullptr) {
294+
*out = SliceBuffer(buffer_, position_, size);
295+
} else {
296+
*out = std::make_shared<Buffer>(data_ + position_, size);
297+
}
298+
290299
position_ += nbytes;
291300
return Status::OK();
292301
}
293302

294303
Status BufferReader::GetSize(int64_t* size) {
295-
*size = buffer_size_;
304+
*size = size_;
296305
return Status::OK();
297306
}
298307

299308
Status BufferReader::Seek(int64_t position) {
300-
if (position < 0 || position >= buffer_size_) {
309+
if (position < 0 || position >= size_) {
301310
return Status::IOError("position out of bounds");
302311
}
303312

cpp/src/arrow/io/memory.h

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,8 @@ class ARROW_EXPORT MemoryMappedFile : public ReadWriteFileInterface {
9999

100100
class ARROW_EXPORT BufferReader : public ReadableFileInterface {
101101
public:
102-
BufferReader(const uint8_t* buffer, int buffer_size);
102+
BufferReader(const std::shared_ptr<Buffer>& buffer);
103+
BufferReader(const uint8_t* data, int64_t size);
103104
~BufferReader();
104105

105106
Status Close() override;
@@ -116,8 +117,9 @@ class ARROW_EXPORT BufferReader : public ReadableFileInterface {
116117
bool supports_zero_copy() const override;
117118

118119
private:
119-
const uint8_t* buffer_;
120-
int buffer_size_;
120+
std::shared_ptr<Buffer> buffer_;
121+
const uint8_t* data_;
122+
int64_t size_;
121123
int64_t position_;
122124
};
123125

0 commit comments

Comments
 (0)