Skip to content

Commit

Permalink
ARROW-312: Read and write Arrow IPC file format from Python
Browse files Browse the repository at this point in the history
This also adds some IO scaffolding for interacting with `arrow::Buffer` objects from Python and assorted additions to help with testing.

Author: Wes McKinney <wes.mckinney@twosigma.com>

Closes #164 from wesm/ARROW-312 and squashes the following commits:

7df3e5f [Wes McKinney] Set BUILD_WITH_INSTALL_RPATH on arrow_ipc
be8cee0 [Wes McKinney] Link Cython modules to libarrow* libraries
5716601 [Wes McKinney] Fix accidental deletion
77fb03b [Wes McKinney] Add / test Buffer wrapper. Test that we can write an arrow file to a wrapped buffer. Resize buffer in BufferOutputStream on close
316537d [Wes McKinney] Get ready to wrap Arrow buffers in a Python object
4822d32 [Wes McKinney] Implement RecordBatch::Equals, compare in Python ipc file writes
a931e49 [Wes McKinney] Permit buffers (write padding) in a non-multiple of 64 in an IPC context, to allow zero-copy writing of NumPy arrays
2c49cd4 [Wes McKinney] Some debugging
ca1562b [Wes McKinney] Draft implementations of Arrow file read/write from Python
  • Loading branch information
wesm committed Oct 10, 2016
1 parent eb1491a commit 772800a
Show file tree
Hide file tree
Showing 37 changed files with 1,012 additions and 309 deletions.
25 changes: 25 additions & 0 deletions cpp/src/arrow/io/io-memory-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -121,5 +121,30 @@ TEST_F(TestMemoryMappedFile, InvalidFile) {
IOError, MemoryMappedFile::Open(non_existent_path, FileMode::READ, &result));
}

class TestBufferOutputStream : public ::testing::Test {
public:
void SetUp() {
buffer_.reset(new PoolBuffer(default_memory_pool()));
stream_.reset(new BufferOutputStream(buffer_));
}

protected:
std::shared_ptr<PoolBuffer> buffer_;
std::unique_ptr<OutputStream> stream_;
};

TEST_F(TestBufferOutputStream, CloseResizes) {
std::string data = "data123456";

const int64_t nbytes = static_cast<int64_t>(data.size());
const int K = 100;
for (int i = 0; i < K; ++i) {
EXPECT_OK(stream_->Write(reinterpret_cast<const uint8_t*>(data.c_str()), nbytes));
}

ASSERT_OK(stream_->Close());
ASSERT_EQ(K * nbytes, buffer_->size());
}

} // namespace io
} // namespace arrow
13 changes: 10 additions & 3 deletions cpp/src/arrow/io/memory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,11 @@ BufferOutputStream::BufferOutputStream(const std::shared_ptr<ResizableBuffer>& b
mutable_data_(buffer->mutable_data()) {}

Status BufferOutputStream::Close() {
return Status::OK();
if (position_ < capacity_) {
return buffer_->Resize(position_);
} else {
return Status::OK();
}
}

Status BufferOutputStream::Tell(int64_t* position) {
Expand All @@ -228,8 +232,11 @@ Status BufferOutputStream::Write(const uint8_t* data, int64_t nbytes) {
}

Status BufferOutputStream::Reserve(int64_t nbytes) {
while (position_ + nbytes > capacity_) {
int64_t new_capacity = std::max(kBufferMinimumSize, capacity_ * 2);
int64_t new_capacity = capacity_;
while (position_ + nbytes > new_capacity) {
new_capacity = std::max(kBufferMinimumSize, new_capacity * 2);
}
if (new_capacity > capacity_) {
RETURN_NOT_OK(buffer_->Resize(new_capacity));
capacity_ = new_capacity;
}
Expand Down
7 changes: 7 additions & 0 deletions cpp/src/arrow/ipc/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,13 @@ SET_TARGET_PROPERTIES(arrow_ipc PROPERTIES
LINKER_LANGUAGE CXX
LINK_FLAGS "${ARROW_IPC_LINK_FLAGS}")

if (APPLE)
set_target_properties(arrow_ipc
PROPERTIES
BUILD_WITH_INSTALL_RPATH ON
INSTALL_NAME_DIR "@rpath")
endif()

ADD_ARROW_TEST(ipc-adapter-test)
ARROW_TEST_LINK_LIBRARIES(ipc-adapter-test
${ARROW_IPC_TEST_LINK_LIBS})
Expand Down
16 changes: 10 additions & 6 deletions cpp/src/arrow/ipc/adapter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -162,15 +162,14 @@ class RecordBatchWriter {
for (size_t i = 0; i < buffers_.size(); ++i) {
const Buffer* buffer = buffers_[i].get();
int64_t size = 0;
int64_t padding = 0;

// The buffer might be null if we are handling zero row lengths.
if (buffer) {
// We use capacity here, because size might not reflect the padding
// requirements of buffers but capacity always should.
size = buffer->capacity();
// check that padding is appropriate
RETURN_NOT_OK(CheckMultipleOf64(size));
size = buffer->size();
padding = util::RoundUpToMultipleOf64(size) - size;
}

// TODO(wesm): We currently have no notion of shared memory page id's,
// but we've included it in the metadata IDL for when we have it in the
// future. Use page=0 for now
Expand All @@ -179,12 +178,17 @@ class RecordBatchWriter {
// are using from any OS-level shared memory. The thought is that systems
// may (in the future) associate integer page id's with physical memory
// pages (according to whatever is the desired shared memory mechanism)
buffer_meta_.push_back(flatbuf::Buffer(0, position, size));
buffer_meta_.push_back(flatbuf::Buffer(0, position, size + padding));

if (size > 0) {
RETURN_NOT_OK(dst->Write(buffer->data(), size));
position += size;
}

if (padding > 0) {
RETURN_NOT_OK(dst->Write(kPaddingBytes, padding));
position += padding;
}
}

*body_end_offset = position;
Expand Down
6 changes: 5 additions & 1 deletion cpp/src/arrow/ipc/util.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,11 @@ namespace ipc {

// Align on 8-byte boundaries
static constexpr int kArrowAlignment = 8;
static constexpr uint8_t kPaddingBytes[kArrowAlignment] = {0};

// Buffers are padded to 64-byte boundaries (for SIMD)
static constexpr int kArrowBufferAlignment = 64;

static constexpr uint8_t kPaddingBytes[kArrowBufferAlignment] = {0};

static inline int64_t PaddedLength(int64_t nbytes, int64_t alignment = kArrowAlignment) {
return ((nbytes + alignment - 1) / alignment) * alignment;
Expand Down
27 changes: 27 additions & 0 deletions cpp/src/arrow/table-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -123,4 +123,31 @@ TEST_F(TestTable, InvalidColumns) {
ASSERT_RAISES(Invalid, table_->ValidateColumns());
}

class TestRecordBatch : public TestBase {};

TEST_F(TestRecordBatch, Equals) {
const int length = 10;

auto f0 = std::make_shared<Field>("f0", INT32);
auto f1 = std::make_shared<Field>("f1", UINT8);
auto f2 = std::make_shared<Field>("f2", INT16);

vector<shared_ptr<Field>> fields = {f0, f1, f2};
auto schema = std::make_shared<Schema>(fields);

auto a0 = MakePrimitive<Int32Array>(length);
auto a1 = MakePrimitive<UInt8Array>(length);
auto a2 = MakePrimitive<Int16Array>(length);

RecordBatch b1(schema, length, {a0, a1, a2});
RecordBatch b2(schema, 5, {a0, a1, a2});
RecordBatch b3(schema, length, {a0, a1});
RecordBatch b4(schema, length, {a0, a1, a1});

ASSERT_TRUE(b1.Equals(b1));
ASSERT_FALSE(b1.Equals(b2));
ASSERT_FALSE(b1.Equals(b3));
ASSERT_FALSE(b1.Equals(b4));
}

} // namespace arrow
16 changes: 16 additions & 0 deletions cpp/src/arrow/table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <memory>
#include <sstream>

#include "arrow/array.h"
#include "arrow/column.h"
#include "arrow/schema.h"
#include "arrow/util/status.h"
Expand All @@ -35,6 +36,21 @@ const std::string& RecordBatch::column_name(int i) const {
return schema_->field(i)->name;
}

bool RecordBatch::Equals(const RecordBatch& other) const {
if (num_columns() != other.num_columns() || num_rows_ != other.num_rows()) {
return false;
}

for (int i = 0; i < num_columns(); ++i) {
if (!column(i)->Equals(other.column(i))) { return false; }
}

return true;
}

// ----------------------------------------------------------------------
// Table methods

Table::Table(const std::string& name, const std::shared_ptr<Schema>& schema,
const std::vector<std::shared_ptr<Column>>& columns)
: name_(name), schema_(schema), columns_(columns) {
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/arrow/table.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ class ARROW_EXPORT RecordBatch {
RecordBatch(const std::shared_ptr<Schema>& schema, int32_t num_rows,
const std::vector<std::shared_ptr<Array>>& columns);

bool Equals(const RecordBatch& other) const;

// @returns: the table's schema
const std::shared_ptr<Schema>& schema() const { return schema_; }

Expand Down
3 changes: 1 addition & 2 deletions cpp/src/arrow/types/primitive-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -238,8 +238,7 @@ void TestPrimitiveBuilder<PBoolean>::Check(
}

typedef ::testing::Types<PBoolean, PUInt8, PUInt16, PUInt32, PUInt64, PInt8, PInt16,
PInt32, PInt64, PFloat, PDouble>
Primitives;
PInt32, PInt64, PFloat, PDouble> Primitives;

TYPED_TEST_CASE(TestPrimitiveBuilder, Primitives);

Expand Down
13 changes: 13 additions & 0 deletions cpp/src/arrow/util/bit-util.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#define ARROW_UTIL_BIT_UTIL_H

#include <cstdint>
#include <limits>
#include <memory>
#include <vector>

Expand Down Expand Up @@ -77,6 +78,18 @@ static inline bool is_multiple_of_64(int64_t n) {
return (n & 63) == 0;
}

inline int64_t RoundUpToMultipleOf64(int64_t num) {
// TODO(wesm): is this definitely needed?
// DCHECK_GE(num, 0);
constexpr int64_t round_to = 64;
constexpr int64_t force_carry_addend = round_to - 1;
constexpr int64_t truncate_bitmask = ~(round_to - 1);
constexpr int64_t max_roundable_num = std::numeric_limits<int64_t>::max() - round_to;
if (num <= max_roundable_num) { return (num + force_carry_addend) & truncate_bitmask; }
// handle overflow case. This should result in a malloc error upstream
return num;
}

void bytes_to_bits(const std::vector<uint8_t>& bytes, uint8_t* bits);
ARROW_EXPORT Status bytes_to_bits(const std::vector<uint8_t>&, std::shared_ptr<Buffer>*);

Expand Down
16 changes: 2 additions & 14 deletions cpp/src/arrow/util/buffer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,13 @@
#include <cstdint>
#include <limits>

#include "arrow/util/bit-util.h"
#include "arrow/util/logging.h"
#include "arrow/util/memory-pool.h"
#include "arrow/util/status.h"

namespace arrow {

namespace {
int64_t RoundUpToMultipleOf64(int64_t num) {
DCHECK_GE(num, 0);
constexpr int64_t round_to = 64;
constexpr int64_t force_carry_addend = round_to - 1;
constexpr int64_t truncate_bitmask = ~(round_to - 1);
constexpr int64_t max_roundable_num = std::numeric_limits<int64_t>::max() - round_to;
if (num <= max_roundable_num) { return (num + force_carry_addend) & truncate_bitmask; }
// handle overflow case. This should result in a malloc error upstream
return num;
}
} // namespace

Buffer::Buffer(const std::shared_ptr<Buffer>& parent, int64_t offset, int64_t size) {
data_ = parent->data() + offset;
size_ = size;
Expand All @@ -64,7 +52,7 @@ PoolBuffer::~PoolBuffer() {
Status PoolBuffer::Reserve(int64_t new_capacity) {
if (!mutable_data_ || new_capacity > capacity_) {
uint8_t* new_data;
new_capacity = RoundUpToMultipleOf64(new_capacity);
new_capacity = util::RoundUpToMultipleOf64(new_capacity);
if (mutable_data_) {
RETURN_NOT_OK(pool_->Allocate(new_capacity, &new_data));
memcpy(new_data, mutable_data_, size_);
Expand Down
1 change: 0 additions & 1 deletion cpp/src/arrow/util/buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
#include <cstring>
#include <memory>

#include "arrow/util/bit-util.h"
#include "arrow/util/macros.h"
#include "arrow/util/status.h"
#include "arrow/util/visibility.h"
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/arrow/util/logging.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,9 @@ class CerrLog {
class FatalLog : public CerrLog {
public:
explicit FatalLog(int /* severity */) // NOLINT
: CerrLog(ARROW_FATAL){} // NOLINT
: CerrLog(ARROW_FATAL) {} // NOLINT

[[noreturn]] ~FatalLog() {
[[noreturn]] ~FatalLog() {
if (has_logged_) { std::cerr << std::endl; }
std::exit(1);
}
Expand Down
8 changes: 6 additions & 2 deletions python/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,8 @@ ADD_THIRDPARTY_LIB(arrow
SHARED_LIB ${ARROW_SHARED_LIB})
ADD_THIRDPARTY_LIB(arrow_io
SHARED_LIB ${ARROW_IO_SHARED_LIB})
ADD_THIRDPARTY_LIB(arrow_ipc
SHARED_LIB ${ARROW_IPC_SHARED_LIB})

############################################################
# Linker setup
Expand Down Expand Up @@ -415,6 +417,8 @@ if (UNIX)
set(CMAKE_BUILD_WITH_INSTALL_RPATH TRUE)
endif()

SET(CMAKE_INSTALL_RPATH_USE_LINK_PATH TRUE)

add_subdirectory(src/pyarrow)
add_subdirectory(src/pyarrow/util)

Expand All @@ -423,6 +427,7 @@ set(CYTHON_EXTENSIONS
config
error
io
ipc
scalar
schema
table
Expand All @@ -442,6 +447,7 @@ set(PYARROW_SRCS
set(LINK_LIBS
arrow
arrow_io
arrow_ipc
)

if(PARQUET_FOUND AND PARQUET_ARROW_FOUND)
Expand All @@ -455,8 +461,6 @@ if(PARQUET_FOUND AND PARQUET_ARROW_FOUND)
parquet)
endif()

SET(CMAKE_INSTALL_RPATH_USE_LINK_PATH TRUE)

add_library(pyarrow SHARED
${PYARROW_SRCS})
target_link_libraries(pyarrow ${LINK_LIBS})
Expand Down
11 changes: 11 additions & 0 deletions python/cmake_modules/FindArrow.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -47,20 +47,31 @@ find_library(ARROW_IO_LIB_PATH NAMES arrow_io
${ARROW_SEARCH_LIB_PATH}
NO_DEFAULT_PATH)

find_library(ARROW_IPC_LIB_PATH NAMES arrow_ipc
PATHS
${ARROW_SEARCH_LIB_PATH}
NO_DEFAULT_PATH)

if (ARROW_INCLUDE_DIR AND ARROW_LIB_PATH)
set(ARROW_FOUND TRUE)
set(ARROW_LIB_NAME libarrow)
set(ARROW_IO_LIB_NAME libarrow_io)
set(ARROW_IPC_LIB_NAME libarrow_ipc)

set(ARROW_LIBS ${ARROW_SEARCH_LIB_PATH})
set(ARROW_STATIC_LIB ${ARROW_SEARCH_LIB_PATH}/${ARROW_LIB_NAME}.a)
set(ARROW_SHARED_LIB ${ARROW_LIBS}/${ARROW_LIB_NAME}${CMAKE_SHARED_LIBRARY_SUFFIX})

set(ARROW_IO_STATIC_LIB ${ARROW_SEARCH_LIB_PATH}/${ARROW_IO_LIB_NAME}.a)
set(ARROW_IO_SHARED_LIB ${ARROW_LIBS}/${ARROW_IO_LIB_NAME}${CMAKE_SHARED_LIBRARY_SUFFIX})

set(ARROW_IPC_STATIC_LIB ${ARROW_SEARCH_LIB_PATH}/${ARROW_IPC_LIB_NAME}.a)
set(ARROW_IPC_SHARED_LIB ${ARROW_LIBS}/${ARROW_IPC_LIB_NAME}${CMAKE_SHARED_LIBRARY_SUFFIX})

if (NOT Arrow_FIND_QUIETLY)
message(STATUS "Found the Arrow core library: ${ARROW_LIB_PATH}")
message(STATUS "Found the Arrow IO library: ${ARROW_IO_LIB_PATH}")
message(STATUS "Found the Arrow IPC library: ${ARROW_IPC_LIB_PATH}")
endif ()
else ()
if (NOT Arrow_FIND_QUIETLY)
Expand Down
3 changes: 1 addition & 2 deletions python/pyarrow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,4 @@
list_, struct, field,
DataType, Field, Schema, schema)

from pyarrow.array import RowBatch
from pyarrow.table import Column, Table, from_pandas_dataframe
from pyarrow.table import Column, RecordBatch, Table, from_pandas_dataframe
Loading

0 comments on commit 772800a

Please sign in to comment.