Skip to content

Commit

Permalink
roundtrip working for the first time
Browse files Browse the repository at this point in the history
  • Loading branch information
pcmoritz committed Aug 15, 2017
1 parent 44fb98b commit 49a4acb
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 3 deletions.
27 changes: 24 additions & 3 deletions python/pyarrow/serialization.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -71,19 +71,40 @@ def deserialize_sequence(PythonObject value, object base):
def write_python_object(PythonObject value, NativeFile sink):
cdef shared_ptr[OutputStream] stream
sink.write_handle(&stream)
cdef shared_ptr[CRecordBatchFileWriter] writer
cdef shared_ptr[CRecordBatchStreamWriter] writer
cdef shared_ptr[CSchema] schema = deref(value.batch).schema()
cdef shared_ptr[CRecordBatch] batch = value.batch
cdef shared_ptr[CTensor] tensor
cdef int32_t metadata_length
cdef int64_t body_length

with nogil:
check_status(CRecordBatchFileWriter.Open(stream.get(), schema, &writer))
check_status(CRecordBatchStreamWriter.Open(stream.get(), schema, &writer))
check_status(deref(writer).WriteRecordBatch(deref(batch)))
check_status(deref(writer).Close())

for tensor in value.tensors:
check_status(WriteTensor(deref(tensor), stream.get(), &metadata_length, &body_length))

# def read_python_object(NativeFile source):
def read_python_object(NativeFile source):
cdef PythonObject result = PythonObject()
cdef shared_ptr[RandomAccessFile] stream
source.read_handle(&stream)
cdef shared_ptr[CRecordBatchStreamReader] reader
cdef shared_ptr[CTensor] tensor
cdef int64_t offset

with nogil:
check_status(CRecordBatchStreamReader.Open(<shared_ptr[InputStream]> stream, &reader))
check_status(reader.get().ReadNextRecordBatch(&result.batch))

check_status(deref(stream).Tell(&offset))

while True:
s = ReadTensor(offset, stream.get(), &tensor)
result.tensors.push_back(tensor)
if not s.ok():
break
check_status(deref(stream).Tell(&offset))

return result
44 changes: 44 additions & 0 deletions python/pyarrow/tests/test_serialization.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import os

import pyarrow as pa
import numpy as np

obj = pa.lib.serialize_sequence([np.array([1, 2, 3]), None, np.array([4, 5, 6])])

SIZE = 4096
arr = np.random.randint(0, 256, size=SIZE).astype('u1')
data = arr.tobytes()[:SIZE]
path = os.path.join("/tmp/temp")
with open(path, 'wb') as f:
f.write(data)

f = pa.memory_map(path, mode="w")

pa.lib.write_python_object(obj, f)

f = pa.memory_map(path, mode="r")

res = pa.lib.read_python_object(f)

pa.lib.deserialize_sequence(res, res)

0 comments on commit 49a4acb

Please sign in to comment.