Skip to content

Commit 06be7ae

Browse files
xhochywesm
authored andcommitted
ARROW-389: Python: Write Parquet files to pyarrow.io.NativeFile objects
Author: Uwe L. Korn <uwelk@xhochy.com> Closes #214 from xhochy/ARROW-389 and squashes the following commits: e66c895 [Uwe L. Korn] Switch image to deprecated group 876cd65 [Uwe L. Korn] ARROW-389: Python: Write Parquet files to pyarrow.io.NativeFile objects
1 parent 33c731d commit 06be7ae

File tree

4 files changed

+45
-8
lines changed

4 files changed

+45
-8
lines changed

.travis.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ matrix:
2424
- compiler: gcc
2525
language: cpp
2626
os: linux
27+
group: deprecated
2728
before_script:
2829
- export CC="gcc-4.9"
2930
- export CXX="g++-4.9"

python/pyarrow/includes/parquet.pxd

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
from pyarrow.includes.common cimport *
2121
from pyarrow.includes.libarrow cimport CArray, CSchema, CStatus, CTable, MemoryPool
22-
from pyarrow.includes.libarrow_io cimport ReadableFileInterface
22+
from pyarrow.includes.libarrow_io cimport ReadableFileInterface, OutputStream
2323

2424

2525
cdef extern from "parquet/api/schema.h" namespace "parquet::schema" nogil:
@@ -131,6 +131,9 @@ cdef extern from "parquet/arrow/io.h" namespace "parquet::arrow" nogil:
131131
ParquetReadSource(ParquetAllocator* allocator)
132132
Open(const shared_ptr[ReadableFileInterface]& file)
133133

134+
cdef cppclass ParquetWriteSink:
135+
ParquetWriteSink(const shared_ptr[OutputStream]& file)
136+
134137

135138
cdef extern from "parquet/arrow/reader.h" namespace "parquet::arrow" nogil:
136139
CStatus OpenFile(const shared_ptr[ReadableFileInterface]& file,
@@ -154,6 +157,6 @@ cdef extern from "parquet/arrow/schema.h" namespace "parquet::arrow" nogil:
154157
cdef extern from "parquet/arrow/writer.h" namespace "parquet::arrow" nogil:
155158
cdef CStatus WriteFlatTable(
156159
const CTable* table, MemoryPool* pool,
157-
const shared_ptr[ParquetOutputStream]& sink,
160+
const shared_ptr[ParquetWriteSink]& sink,
158161
int64_t chunk_size,
159162
const shared_ptr[WriterProperties]& properties)

python/pyarrow/parquet.pyx

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121

2222
from pyarrow.includes.libarrow cimport *
2323
from pyarrow.includes.parquet cimport *
24-
from pyarrow.includes.libarrow_io cimport ReadableFileInterface
24+
from pyarrow.includes.libarrow_io cimport ReadableFileInterface, OutputStream, FileOutputStream
2525
cimport pyarrow.includes.pyarrow as pyarrow
2626

2727
from pyarrow.array cimport Array
@@ -151,15 +151,15 @@ def read_table(source, columns=None):
151151
return Table.from_arrays(columns, arrays)
152152

153153

154-
def write_table(table, filename, chunk_size=None, version=None,
154+
def write_table(table, sink, chunk_size=None, version=None,
155155
use_dictionary=True, compression=None):
156156
"""
157157
Write a Table to Parquet format
158158
159159
Parameters
160160
----------
161161
table : pyarrow.Table
162-
filename : string
162+
sink: string or pyarrow.io.NativeFile
163163
chunk_size : int
164164
The maximum number of rows in each Parquet RowGroup. As a default,
165165
we will write a single RowGroup per file.
@@ -173,7 +173,8 @@ def write_table(table, filename, chunk_size=None, version=None,
173173
"""
174174
cdef Table table_ = table
175175
cdef CTable* ctable_ = table_.table
176-
cdef shared_ptr[ParquetOutputStream] sink
176+
cdef shared_ptr[ParquetWriteSink] sink_
177+
cdef shared_ptr[FileOutputStream] filesink_
177178
cdef WriterProperties.Builder properties_builder
178179
cdef int64_t chunk_size_ = 0
179180
if chunk_size is None:
@@ -230,7 +231,12 @@ def write_table(table, filename, chunk_size=None, version=None,
230231
else:
231232
raise ArrowException("Unsupport compression codec")
232233

233-
sink.reset(new LocalFileOutputStream(tobytes(filename)))
234+
if isinstance(sink, six.string_types):
235+
check_status(FileOutputStream.Open(tobytes(sink), &filesink_))
236+
sink_.reset(new ParquetWriteSink(<shared_ptr[OutputStream]>filesink_))
237+
elif isinstance(sink, NativeFile):
238+
sink_.reset(new ParquetWriteSink((<NativeFile>sink).wr_file))
239+
234240
with nogil:
235-
check_status(WriteFlatTable(ctable_, default_memory_pool(), sink,
241+
check_status(WriteFlatTable(ctable_, default_memory_pool(), sink_,
236242
chunk_size_, properties_builder.build()))

python/pyarrow/tests/test_parquet.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import pytest
1919

2020
import pyarrow as A
21+
import pyarrow.io as paio
2122

2223
import numpy as np
2324
import pandas as pd
@@ -131,6 +132,32 @@ def test_pandas_column_selection(tmpdir):
131132

132133
pdt.assert_frame_equal(df[['uint8']], df_read)
133134

135+
@parquet
136+
def test_pandas_parquet_native_file_roundtrip(tmpdir):
137+
size = 10000
138+
np.random.seed(0)
139+
df = pd.DataFrame({
140+
'uint8': np.arange(size, dtype=np.uint8),
141+
'uint16': np.arange(size, dtype=np.uint16),
142+
'uint32': np.arange(size, dtype=np.uint32),
143+
'uint64': np.arange(size, dtype=np.uint64),
144+
'int8': np.arange(size, dtype=np.int16),
145+
'int16': np.arange(size, dtype=np.int16),
146+
'int32': np.arange(size, dtype=np.int32),
147+
'int64': np.arange(size, dtype=np.int64),
148+
'float32': np.arange(size, dtype=np.float32),
149+
'float64': np.arange(size, dtype=np.float64),
150+
'bool': np.random.randn(size) > 0
151+
})
152+
arrow_table = A.from_pandas_dataframe(df)
153+
imos = paio.InMemoryOutputStream()
154+
pq.write_table(arrow_table, imos, version="2.0")
155+
buf = imos.get_result()
156+
reader = paio.BufferReader(buf)
157+
df_read = pq.read_table(reader).to_pandas()
158+
pdt.assert_frame_equal(df, df_read)
159+
160+
134161
@parquet
135162
def test_pandas_parquet_configuration_options(tmpdir):
136163
size = 10000

0 commit comments

Comments
 (0)