Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 21 additions & 4 deletions python/pyarrow/includes/parquet.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
# distutils: language = c++

from pyarrow.includes.common cimport *
from pyarrow.includes.libarrow cimport CSchema, CStatus, CTable, MemoryPool
from pyarrow.includes.libarrow cimport CArray, CSchema, CStatus, CTable, MemoryPool
from pyarrow.includes.libarrow_io cimport ReadableFileInterface


Expand All @@ -32,6 +32,9 @@ cdef extern from "parquet/api/schema.h" namespace "parquet::schema" nogil:
cdef cppclass PrimitiveNode(Node):
pass

cdef cppclass ColumnPath:
c_string ToDotString()

cdef extern from "parquet/api/schema.h" namespace "parquet" nogil:
enum ParquetVersion" parquet::ParquetVersion::type":
PARQUET_1_0" parquet::ParquetVersion::PARQUET_1_0"
Expand All @@ -44,13 +47,14 @@ cdef extern from "parquet/api/schema.h" namespace "parquet" nogil:
LZO" parquet::Compression::LZO"
BROTLI" parquet::Compression::BROTLI"

cdef cppclass ColumnDescriptor:
shared_ptr[ColumnPath] path()

cdef cppclass SchemaDescriptor:
const ColumnDescriptor* Column(int i)
shared_ptr[Node] schema()
GroupNode* group()

cdef cppclass ColumnDescriptor:
pass


cdef extern from "parquet/api/reader.h" namespace "parquet" nogil:
cdef cppclass ColumnReader:
Expand Down Expand Up @@ -80,10 +84,21 @@ cdef extern from "parquet/api/reader.h" namespace "parquet" nogil:
cdef cppclass RowGroupReader:
pass

cdef cppclass FileMetaData:
uint32_t size()
int num_columns()
int64_t num_rows()
int num_row_groups()
int32_t version()
const c_string created_by()
int num_schema_elements()
const SchemaDescriptor* schema()

cdef cppclass ParquetFileReader:
# TODO: Some default arguments are missing
@staticmethod
unique_ptr[ParquetFileReader] OpenFile(const c_string& path)
const FileMetaData* metadata();


cdef extern from "parquet/api/writer.h" namespace "parquet" nogil:
Expand Down Expand Up @@ -124,7 +139,9 @@ cdef extern from "parquet/arrow/reader.h" namespace "parquet::arrow" nogil:

cdef cppclass FileReader:
FileReader(MemoryPool* pool, unique_ptr[ParquetFileReader] reader)
CStatus ReadFlatColumn(int i, shared_ptr[CArray]* out);
CStatus ReadFlatTable(shared_ptr[CTable]* out);
const ParquetFileReader* parquet_reader();


cdef extern from "parquet/arrow/schema.h" namespace "parquet::arrow" nogil:
Expand Down
53 changes: 52 additions & 1 deletion python/pyarrow/parquet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ from pyarrow.includes.parquet cimport *
from pyarrow.includes.libarrow_io cimport ReadableFileInterface
cimport pyarrow.includes.pyarrow as pyarrow

from pyarrow.array cimport Array
from pyarrow.compat import tobytes
from pyarrow.error import ArrowException
from pyarrow.error cimport check_status
Expand All @@ -43,6 +44,7 @@ cdef class ParquetReader:
cdef:
ParquetAllocator allocator
unique_ptr[FileReader] reader
column_idx_map

def __cinit__(self):
self.allocator.set_pool(default_memory_pool())
Expand Down Expand Up @@ -76,11 +78,55 @@ cdef class ParquetReader:
table.init(ctable)
return table

def column_name_idx(self, column_name):
"""
Find the matching index of a column in the schema.

Parameter
---------
column_name: str
Name of the column, separation of nesting levels is done via ".".

Returns
-------
column_idx: int
Integer index of the position of the column
"""
cdef:
const FileMetaData* metadata = self.reader.get().parquet_reader().metadata()
int i = 0

if self.column_idx_map is None:
self.column_idx_map = {}
for i in range(0, metadata.num_columns()):
self.column_idx_map[str(metadata.schema().Column(i).path().get().ToDotString())] = i

return self.column_idx_map[column_name]

def read_column(self, int column_index):
cdef:
Array array = Array()
shared_ptr[CArray] carray

with nogil:
check_status(self.reader.get().ReadFlatColumn(column_index, &carray))

array.init(carray)
return array


def read_table(source, columns=None):
"""
Read a Table from Parquet format

Parameters
----------
source: str or pyarrow.io.NativeFile
Readable source. For passing Python file objects or byte buffers, see
pyarrow.io.PythonFileInterface or pyarrow.io.BytesReader.
columns: list
If not None, only these columns will be read from the file.

Returns
-------
pyarrow.table.Table
Expand All @@ -93,7 +139,12 @@ def read_table(source, columns=None):
elif isinstance(source, NativeFile):
reader.open_native_file(source)

return reader.read_all()
if columns is None:
return reader.read_all()
else:
column_idxs = [reader.column_name_idx(column) for column in columns]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without a hash map this has O(N * K) complexity

arrays = [reader.read_column(column_idx) for column_idx in column_idxs]
return Table.from_arrays(columns, arrays)


def write_table(table, filename, chunk_size=None, version=None,
Expand Down
16 changes: 16 additions & 0 deletions python/pyarrow/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,22 @@ def test_pandas_parquet_1_0_rountrip(tmpdir):

pdt.assert_frame_equal(df, df_read)

@parquet
def test_pandas_column_selection(tmpdir):
size = 10000
np.random.seed(0)
df = pd.DataFrame({
'uint8': np.arange(size, dtype=np.uint8),
'uint16': np.arange(size, dtype=np.uint16)
})
filename = tmpdir.join('pandas_rountrip.parquet')
arrow_table = A.from_pandas_dataframe(df)
A.parquet.write_table(arrow_table, filename.strpath)
table_read = pq.read_table(filename.strpath, columns=['uint8'])
df_read = table_read.to_pandas()

pdt.assert_frame_equal(df[['uint8']], df_read)

@parquet
def test_pandas_parquet_configuration_options(tmpdir):
size = 10000
Expand Down