Skip to content

Commit 121e826

Browse files
xhochywesm
authored andcommitted
ARROW-361: Python: Support reading a column-selection from Parquet files
Author: Uwe L. Korn <uwelk@xhochy.com> Closes apache#197 from xhochy/ARROW-361 and squashes the following commits: c1fb939 [Uwe L. Korn] Cache column indices 0c32213 [Uwe L. Korn] ARROW-361: Python: Support reading a column-selection from Parquet files
1 parent e8bc1fe commit 121e826

File tree

3 files changed

+89
-5
lines changed

3 files changed

+89
-5
lines changed

python/pyarrow/includes/parquet.pxd

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
# distutils: language = c++
1919

2020
from pyarrow.includes.common cimport *
21-
from pyarrow.includes.libarrow cimport CSchema, CStatus, CTable, MemoryPool
21+
from pyarrow.includes.libarrow cimport CArray, CSchema, CStatus, CTable, MemoryPool
2222
from pyarrow.includes.libarrow_io cimport ReadableFileInterface
2323

2424

@@ -32,6 +32,9 @@ cdef extern from "parquet/api/schema.h" namespace "parquet::schema" nogil:
3232
cdef cppclass PrimitiveNode(Node):
3333
pass
3434

35+
cdef cppclass ColumnPath:
36+
c_string ToDotString()
37+
3538
cdef extern from "parquet/api/schema.h" namespace "parquet" nogil:
3639
enum ParquetVersion" parquet::ParquetVersion::type":
3740
PARQUET_1_0" parquet::ParquetVersion::PARQUET_1_0"
@@ -44,13 +47,14 @@ cdef extern from "parquet/api/schema.h" namespace "parquet" nogil:
4447
LZO" parquet::Compression::LZO"
4548
BROTLI" parquet::Compression::BROTLI"
4649

50+
cdef cppclass ColumnDescriptor:
51+
shared_ptr[ColumnPath] path()
52+
4753
cdef cppclass SchemaDescriptor:
54+
const ColumnDescriptor* Column(int i)
4855
shared_ptr[Node] schema()
4956
GroupNode* group()
5057

51-
cdef cppclass ColumnDescriptor:
52-
pass
53-
5458

5559
cdef extern from "parquet/api/reader.h" namespace "parquet" nogil:
5660
cdef cppclass ColumnReader:
@@ -80,10 +84,21 @@ cdef extern from "parquet/api/reader.h" namespace "parquet" nogil:
8084
cdef cppclass RowGroupReader:
8185
pass
8286

87+
cdef cppclass FileMetaData:
88+
uint32_t size()
89+
int num_columns()
90+
int64_t num_rows()
91+
int num_row_groups()
92+
int32_t version()
93+
const c_string created_by()
94+
int num_schema_elements()
95+
const SchemaDescriptor* schema()
96+
8397
cdef cppclass ParquetFileReader:
8498
# TODO: Some default arguments are missing
8599
@staticmethod
86100
unique_ptr[ParquetFileReader] OpenFile(const c_string& path)
101+
const FileMetaData* metadata();
87102

88103

89104
cdef extern from "parquet/api/writer.h" namespace "parquet" nogil:
@@ -124,7 +139,9 @@ cdef extern from "parquet/arrow/reader.h" namespace "parquet::arrow" nogil:
124139

125140
cdef cppclass FileReader:
126141
FileReader(MemoryPool* pool, unique_ptr[ParquetFileReader] reader)
142+
CStatus ReadFlatColumn(int i, shared_ptr[CArray]* out);
127143
CStatus ReadFlatTable(shared_ptr[CTable]* out);
144+
const ParquetFileReader* parquet_reader();
128145

129146

130147
cdef extern from "parquet/arrow/schema.h" namespace "parquet::arrow" nogil:

python/pyarrow/parquet.pyx

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ from pyarrow.includes.parquet cimport *
2424
from pyarrow.includes.libarrow_io cimport ReadableFileInterface
2525
cimport pyarrow.includes.pyarrow as pyarrow
2626

27+
from pyarrow.array cimport Array
2728
from pyarrow.compat import tobytes
2829
from pyarrow.error import ArrowException
2930
from pyarrow.error cimport check_status
@@ -43,6 +44,7 @@ cdef class ParquetReader:
4344
cdef:
4445
ParquetAllocator allocator
4546
unique_ptr[FileReader] reader
47+
column_idx_map
4648

4749
def __cinit__(self):
4850
self.allocator.set_pool(default_memory_pool())
@@ -76,11 +78,55 @@ cdef class ParquetReader:
7678
table.init(ctable)
7779
return table
7880

81+
def column_name_idx(self, column_name):
82+
"""
83+
Find the matching index of a column in the schema.
84+
85+
Parameter
86+
---------
87+
column_name: str
88+
Name of the column, separation of nesting levels is done via ".".
89+
90+
Returns
91+
-------
92+
column_idx: int
93+
Integer index of the position of the column
94+
"""
95+
cdef:
96+
const FileMetaData* metadata = self.reader.get().parquet_reader().metadata()
97+
int i = 0
98+
99+
if self.column_idx_map is None:
100+
self.column_idx_map = {}
101+
for i in range(0, metadata.num_columns()):
102+
self.column_idx_map[str(metadata.schema().Column(i).path().get().ToDotString())] = i
103+
104+
return self.column_idx_map[column_name]
105+
106+
def read_column(self, int column_index):
107+
cdef:
108+
Array array = Array()
109+
shared_ptr[CArray] carray
110+
111+
with nogil:
112+
check_status(self.reader.get().ReadFlatColumn(column_index, &carray))
113+
114+
array.init(carray)
115+
return array
116+
79117

80118
def read_table(source, columns=None):
81119
"""
82120
Read a Table from Parquet format
83121
122+
Parameters
123+
----------
124+
source: str or pyarrow.io.NativeFile
125+
Readable source. For passing Python file objects or byte buffers, see
126+
pyarrow.io.PythonFileInterface or pyarrow.io.BytesReader.
127+
columns: list
128+
If not None, only these columns will be read from the file.
129+
84130
Returns
85131
-------
86132
pyarrow.table.Table
@@ -93,7 +139,12 @@ def read_table(source, columns=None):
93139
elif isinstance(source, NativeFile):
94140
reader.open_native_file(source)
95141

96-
return reader.read_all()
142+
if columns is None:
143+
return reader.read_all()
144+
else:
145+
column_idxs = [reader.column_name_idx(column) for column in columns]
146+
arrays = [reader.read_column(column_idx) for column_idx in column_idxs]
147+
return Table.from_arrays(columns, arrays)
97148

98149

99150
def write_table(table, filename, chunk_size=None, version=None,

python/pyarrow/tests/test_parquet.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,22 @@ def test_pandas_parquet_1_0_rountrip(tmpdir):
115115

116116
pdt.assert_frame_equal(df, df_read)
117117

118+
@parquet
119+
def test_pandas_column_selection(tmpdir):
120+
size = 10000
121+
np.random.seed(0)
122+
df = pd.DataFrame({
123+
'uint8': np.arange(size, dtype=np.uint8),
124+
'uint16': np.arange(size, dtype=np.uint16)
125+
})
126+
filename = tmpdir.join('pandas_rountrip.parquet')
127+
arrow_table = A.from_pandas_dataframe(df)
128+
A.parquet.write_table(arrow_table, filename.strpath)
129+
table_read = pq.read_table(filename.strpath, columns=['uint8'])
130+
df_read = table_read.to_pandas()
131+
132+
pdt.assert_frame_equal(df[['uint8']], df_read)
133+
118134
@parquet
119135
def test_pandas_parquet_configuration_options(tmpdir):
120136
size = 10000

0 commit comments

Comments
 (0)