Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cpp/src/arrow/table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ bool ChunkedArray::Equals(const ChunkedArray& other) const {
if (null_count_ != other.null_count()) {
return false;
}
if (length_ == 0) {
return type_->Equals(other.type_);
}

// Check contents of the underlying arrays. This checks for equality of
// the underlying data independently of the chunk size.
Expand Down
2 changes: 2 additions & 0 deletions python/pyarrow/includes/libarrow.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,8 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil:
int64_t length()
int64_t null_count()
int num_chunks()
c_bool Equals(const CChunkedArray& other)

shared_ptr[CArray] chunk(int i)
shared_ptr[CDataType] type()
shared_ptr[CChunkedArray] Slice(int64_t offset, int64_t length) const
Expand Down
88 changes: 82 additions & 6 deletions python/pyarrow/table.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ cdef class ChunkedArray:
self.sp_chunked_array = chunked_array
self.chunked_array = chunked_array.get()

def __reduce__(self):
return chunked_array, (self.chunks, self.type)

property type:

def __get__(self):
Expand Down Expand Up @@ -95,6 +98,28 @@ cdef class ChunkedArray:
else:
index -= self.chunked_array.chunk(j).get().length()

def equals(self, ChunkedArray other):
"""
Return whether the contents of two chunked arrays are equal

Parameters
----------
other : pyarrow.ChunkedArray

Returns
-------
are_equal : boolean
"""
cdef:
CChunkedArray* this_arr = self.chunked_array
CChunkedArray* other_arr = other.chunked_array
c_bool result

with nogil:
result = this_arr.Equals(deref(other_arr))

return result

def to_pandas(self,
c_bool strings_to_categorical=False,
c_bool zero_copy_only=False,
Expand Down Expand Up @@ -342,6 +367,9 @@ cdef class Column:
self.sp_column = column
self.column = column.get()

def __reduce__(self):
return column, (self.field, self.data)

def __repr__(self):
from pyarrow.compat import StringIO
result = StringIO()
Expand Down Expand Up @@ -638,6 +666,9 @@ cdef class RecordBatch:
self.sp_batch = batch
self.batch = batch.get()

def __reduce__(self):
return _reconstruct_record_batch, (self.columns, self.schema)

def __len__(self):
return self.batch.num_rows()

Expand Down Expand Up @@ -704,6 +735,17 @@ cdef class RecordBatch:

return self._schema

@property
def columns(self):
"""
List of all columns in numerical order

Returns
-------
list of pa.Column
"""
return [self.column(i) for i in range(self.num_columns)]

def column(self, i):
"""
Select single column from record batcha
Expand Down Expand Up @@ -842,16 +884,16 @@ cdef class RecordBatch:
return cls.from_arrays(arrays, names, metadata)

@staticmethod
def from_arrays(list arrays, list names, dict metadata=None):
def from_arrays(list arrays, names, dict metadata=None):
"""
Construct a RecordBatch from multiple pyarrow.Arrays

Parameters
----------
arrays: list of pyarrow.Array
column-wise data vectors
names: list of str
Labels for the columns
names: pyarrow.Schema or list of str
schema or list of labels for the columns

Returns
-------
Expand All @@ -860,7 +902,7 @@ cdef class RecordBatch:
cdef:
Array arr
c_string c_name
shared_ptr[CSchema] schema
shared_ptr[CSchema] c_schema
vector[shared_ptr[CArray]] c_arrays
int64_t num_rows
int64_t i
Expand All @@ -870,7 +912,10 @@ cdef class RecordBatch:
num_rows = len(arrays[0])
else:
num_rows = 0
_schema_from_arrays(arrays, names, metadata, &schema)
if isinstance(names, Schema):
c_schema = (<Schema> names).sp_schema
else:
_schema_from_arrays(arrays, names, metadata, &c_schema)

c_arrays.reserve(len(arrays))
for arr in arrays:
Expand All @@ -880,7 +925,14 @@ cdef class RecordBatch:
c_arrays.push_back(arr.sp_array)

return pyarrow_wrap_batch(
CRecordBatch.Make(schema, num_rows, c_arrays))
CRecordBatch.Make(c_schema, num_rows, c_arrays))


def _reconstruct_record_batch(columns, schema):
"""
Internal: reconstruct RecordBatch from pickled components.
"""
return RecordBatch.from_arrays(columns, schema)


def table_to_blocks(PandasOptions options, Table table,
Expand Down Expand Up @@ -935,6 +987,12 @@ cdef class Table:
with nogil:
check_status(self.table.Validate())

def __reduce__(self):
# Reduce the columns as ChunkedArrays to avoid serializing schema
# data twice
columns = [col.data for col in self.columns]
return _reconstruct_table, (columns, self.schema)

def replace_schema_metadata(self, dict metadata=None):
"""
EXPERIMENTAL: Create shallow copy of table by replacing schema
Expand Down Expand Up @@ -1340,6 +1398,17 @@ cdef class Table:
for i in range(self.num_columns):
yield self.column(i)

@property
def columns(self):
"""
List of all columns in numerical order

Returns
-------
list of pa.Column
"""
return [self._column(i) for i in range(self.num_columns)]

@property
def num_columns(self):
"""
Expand Down Expand Up @@ -1443,6 +1512,13 @@ cdef class Table:
return table


def _reconstruct_table(arrays, schema):
"""
Internal: reconstruct pa.Table from pickled components.
"""
return Table.from_arrays(arrays, schema=schema)


def concat_tables(tables):
"""
Perform zero-copy concatenation of pyarrow.Table objects. Raises exception
Expand Down
123 changes: 121 additions & 2 deletions python/pyarrow/tests/test_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
# under the License.

from collections import OrderedDict, Iterable
import pickle

import numpy as np
from pandas.util.testing import assert_frame_equal
import pandas as pd
Expand Down Expand Up @@ -72,6 +74,72 @@ def test_chunked_array_iter():
assert isinstance(arr, Iterable)


def test_chunked_array_equals():
def eq(xarrs, yarrs):
if isinstance(xarrs, pa.ChunkedArray):
x = xarrs
else:
x = pa.chunked_array(xarrs)
if isinstance(yarrs, pa.ChunkedArray):
y = yarrs
else:
y = pa.chunked_array(yarrs)
assert x.equals(y)
assert y.equals(x)

def ne(xarrs, yarrs):
if isinstance(xarrs, pa.ChunkedArray):
x = xarrs
else:
x = pa.chunked_array(xarrs)
if isinstance(yarrs, pa.ChunkedArray):
y = yarrs
else:
y = pa.chunked_array(yarrs)
assert not x.equals(y)
assert not y.equals(x)

eq(pa.chunked_array([], type=pa.int32()),
pa.chunked_array([], type=pa.int32()))
ne(pa.chunked_array([], type=pa.int32()),
pa.chunked_array([], type=pa.int64()))

a = pa.array([0, 2], type=pa.int32())
b = pa.array([0, 2], type=pa.int64())
c = pa.array([0, 3], type=pa.int32())
d = pa.array([0, 2, 0, 3], type=pa.int32())

eq([a], [a])
ne([a], [b])
eq([a, c], [a, c])
eq([a, c], [d])
ne([c, a], [a, c])


@pytest.mark.parametrize(
('data', 'typ'),
[
([True, False, True, True], pa.bool_()),
([1, 2, 4, 6], pa.int64()),
([1.0, 2.5, None], pa.float64()),
(['a', None, 'b'], pa.string()),
([], pa.list_(pa.uint8())),
([[1, 2], [3]], pa.list_(pa.int64())),
([['a'], None, ['b', 'c']], pa.list_(pa.string())),
([(1, 'a'), (2, 'c'), None],
pa.struct([pa.field('a', pa.int64()), pa.field('b', pa.string())]))
]
)
def test_chunked_array_pickle(data, typ):
arrays = []
while data:
arrays.append(pa.array(data[:2], type=typ))
data = data[2:]
array = pa.chunked_array(arrays, type=typ)
result = pickle.loads(pickle.dumps(array))
assert result.equals(array)


def test_column_basics():
data = [
pa.array([-10, -5, 0, 5, 10])
Expand Down Expand Up @@ -108,6 +176,17 @@ def test_column_factory_function():
pa.Column.from_array(pa.field('foo', pa.string()), arr)


def test_column_pickle():
arr = pa.chunked_array([[1, 2], [5, 6, 7]], type=pa.int16())
field = pa.field("ints", pa.int16()).add_metadata({b"foo": b"bar"})
col = pa.column(field, arr)

result = pickle.loads(pickle.dumps(col))
assert result.equals(col)
assert result.data.num_chunks == 2
assert result.field == field


def test_column_to_pandas():
data = [
pa.array([-10, -5, 0, 5, 10])
Expand Down Expand Up @@ -154,8 +233,7 @@ def test_recordbatch_basics():
]

batch = pa.RecordBatch.from_arrays(data, ['c0', 'c1'])

batch.schema.metadata
assert not batch.schema.metadata

assert len(batch) == 5
assert batch.num_rows == 5
Expand All @@ -169,6 +247,13 @@ def test_recordbatch_basics():
# bounds checking
batch[2]

# Schema passed explicitly
schema = pa.schema([pa.field('c0', pa.int16()),
pa.field('c1', pa.int32())],
metadata={b'foo': b'bar'})
batch = pa.RecordBatch.from_arrays(data, schema)
assert batch.schema == schema


def test_recordbatch_from_arrays_validate_lengths():
# ARROW-2820
Expand Down Expand Up @@ -209,6 +294,21 @@ def test_recordbatch_empty_metadata():
assert batch.schema.metadata is None


def test_recordbatch_pickle():
data = [
pa.array(range(5)),
pa.array([-10, -5, 0, 5, 10])
]
schema = pa.schema([pa.field('ints', pa.int8()),
pa.field('floats', pa.float32()),
]).add_metadata({b'foo': b'bar'})
batch = pa.RecordBatch.from_arrays(data, schema)

result = pickle.loads(pickle.dumps(batch))
assert result.equals(batch)
assert result.schema == schema


def test_recordbatch_slice_getitem():
data = [
pa.array(range(5)),
Expand Down Expand Up @@ -341,7 +441,9 @@ def test_table_basics():
('b', [-10, -5, 0, 5, 10])
])

columns = []
for col in table.itercolumns():
columns.append(col)
for chunk in col.data.iterchunks():
assert chunk is not None

Expand All @@ -351,6 +453,8 @@ def test_table_basics():
with pytest.raises(IndexError):
col.data.chunk(col.data.num_chunks)

assert table.columns == columns


def test_table_from_arrays_invalid_names():
data = [
Expand All @@ -364,6 +468,21 @@ def test_table_from_arrays_invalid_names():
pa.Table.from_arrays(data, names=['a'])


def test_table_pickle():
data = [
pa.chunked_array([[1, 2], [3, 4]], type=pa.uint32()),
pa.chunked_array([["some", "strings", None, ""]], type=pa.string()),
]
schema = pa.schema([pa.field('ints', pa.uint32()),
pa.field('strs', pa.string())],
metadata={b'foo': b'bar'})
table = pa.Table.from_arrays(data, schema=schema)

result = pickle.loads(pickle.dumps(table))
result._validate()
assert result.equals(table)


def test_table_select_column():
data = [
pa.array(range(5)),
Expand Down