Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate ORC Writer to pylibcudf #17310

Open
wants to merge 8 commits into
base: branch-24.12
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
180 changes: 97 additions & 83 deletions python/cudf/cudf/_lib/orc.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,9 @@
from libc.stdint cimport int64_t
from libcpp cimport bool, int
from libcpp.map cimport map
from libcpp.memory cimport unique_ptr
from libcpp.string cimport string
from libcpp.utility cimport move
from libcpp.vector cimport vector

import itertools
from collections import OrderedDict

try:
Expand All @@ -16,31 +14,22 @@ except ImportError:
import json

cimport pylibcudf.libcudf.lists.lists_column_view as cpp_lists_column_view
from pylibcudf.libcudf.io.data_sink cimport data_sink
from pylibcudf.libcudf.io.orc cimport (
chunked_orc_writer_options,
orc_chunked_writer,
orc_writer_options,
write_orc as libcudf_write_orc,
)
from pylibcudf.libcudf.io.types cimport (
column_in_metadata,
sink_info,
table_input_metadata,
)
from pylibcudf.libcudf.table.table_view cimport table_view

from cudf._lib.column cimport Column
from cudf._lib.io.utils cimport make_sink_info, update_col_struct_field_names
from cudf._lib.utils cimport data_from_pylibcudf_io, table_view_from_table
from cudf._lib.io.utils cimport update_col_struct_field_names
from cudf._lib.utils cimport data_from_pylibcudf_io

import pylibcudf as plc

import cudf
from cudf._lib.types import SUPPORTED_NUMPY_TO_PYLIBCUDF_TYPES
from cudf._lib.utils import _index_level_name, generate_pandas_metadata
from cudf.core.buffer import acquire_spill_lock

from pylibcudf.io.types cimport TableInputMetadata, SinkInfo
from pylibcudf.io.orc cimport OrcChunkedWriter

# TODO: Consider inlining this function since it seems to only be used in one place.
cpdef read_parsed_orc_statistics(filepath_or_buffer):
Expand Down Expand Up @@ -246,61 +235,68 @@ def write_orc(
--------
cudf.read_orc
"""
cdef unique_ptr[data_sink] data_sink_c
cdef sink_info sink_info_c = make_sink_info(path_or_buf, data_sink_c)
cdef table_input_metadata tbl_meta
cdef map[string, string] user_data
user_data[str.encode("pandas")] = str.encode(generate_pandas_metadata(
table, index)
)

user_data = {}
user_data["pandas"] = generate_pandas_metadata(table, index)
if index is True or (
index is None and not isinstance(table._index, cudf.RangeIndex)
):
tv = table_view_from_table(table)
tbl_meta = table_input_metadata(tv)
if table._index is not None:
plc_table = plc.Table(
[
col.to_pylibcudf(mode="read")
for col in itertools.chain(table.index._columns, table._columns)
]
)
else:
plc_table = plc.Table(
[
col.to_pylibcudf(mode="read")
for col in table._columns
]
)
Matt711 marked this conversation as resolved.
Show resolved Hide resolved
tbl_meta = TableInputMetadata(plc_table)
for level, idx_name in enumerate(table._index.names):
tbl_meta.column_metadata[level].set_name(
str.encode(
_index_level_name(idx_name, level, table._column_names)
)
tbl_meta.c_obj.column_metadata[level].set_name(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the reviewer, c_obj is not usually accessed in the cudf._lib modules. I did it here because I didn't want to have to maintain a column_data attribute in TableInputMetadata. You'd have to "maintain" it because self.c_obj.column_metadata and self.column_metadata are different. Another approach is to have ColumnInMetadata's c_obj be a pointer. WDYT?

Here's what it looks like if ColumnInMetadata's c_obj` is a pointer.

diff --git a/python/pylibcudf/pylibcudf/io/types.pxd b/python/pylibcudf/pylibcudf/io/types.pxd
index 6e42c9c309..f8d1c515db 100644
--- a/python/pylibcudf/pylibcudf/io/types.pxd
+++ b/python/pylibcudf/pylibcudf/io/types.pxd
@@ -48,7 +48,7 @@ cdef class SinkInfo:
     cdef sink_info c_obj
 
 cdef class ColumnInMetadata:
-    cdef column_in_metadata c_obj
+    cdef column_in_metadata* c_obj
 
     cpdef ColumnInMetadata set_name(self, str name)
 
@@ -75,7 +75,7 @@ cdef class ColumnInMetadata:
     cpdef str get_name(self)
 
     @staticmethod
-    cdef ColumnInMetadata from_libcudf(column_in_metadata data)
+    cdef ColumnInMetadata from_libcudf(column_in_metadata* data)
 
 cdef class TableInputMetadata:
     cdef public Table table

and

diff --git a/python/pylibcudf/pylibcudf/io/types.pyx b/python/pylibcudf/pylibcudf/io/types.pyx
index c72fe3d4aa..f31f5c4d30 100644
--- a/python/pylibcudf/pylibcudf/io/types.pyx
+++ b/python/pylibcudf/pylibcudf/io/types.pyx
@@ -33,6 +33,7 @@ from pylibcudf.libcudf.io.types import (
     quote_style as QuoteStyle,  # no-cython-lint
     statistics_freq as StatisticsFreq, # no-cython-lint
 )
+from cython.operator cimport dereference
 
 __all__ = [
     "ColumnEncoding",
@@ -345,10 +346,10 @@ cdef class ColumnInMetadata:
         )
 
     @staticmethod
-    cdef ColumnInMetadata from_libcudf(column_in_metadata data):
+    cdef ColumnInMetadata from_libcudf(column_in_metadata* data):
         """Create a Python ColumnInMetadata from a libcudf column_in_metadata."""
         cdef ColumnInMetadata out = ColumnInMetadata.__new__(ColumnInMetadata)
-        out.c_obj = move(data)
+        out.c_obj = data
         return out
 
     cpdef ColumnInMetadata set_name(self, str name):
@@ -362,7 +363,7 @@ cdef class ColumnInMetadata:
         -------
         Self
         """
-        self.c_obj.set_name(name.encode())
+        dereference(self.c_obj).set_name(name.encode())
         return self

...
 
     cpdef ColumnInMetadata set_decimal_precision(self, int precision):
@@ -417,7 +418,7 @@ cdef class ColumnInMetadata:
         -------
         Self
         """
-        self.c_obj.set_decimal_precision(precision)
+        dereference(self.c_obj).set_decimal_precision(precision)
         return self
 
     cpdef ColumnInMetadata child(self, int i):
@@ -431,9 +432,8 @@ cdef class ColumnInMetadata:
         -------
         ColumnInMetadata
         """
-        return ColumnInMetadata.from_libcudf(
-            move(self.c_obj.child(i))
-        )
+        cdef column_in_metadata* child_c_obj = &dereference(self.c_obj).child(i)
+        return ColumnInMetadata.from_libcudf(child_c_obj)
 
...
 
 cdef class TableInputMetadata:
@@ -514,3 +514,8 @@ cdef class TableInputMetadata:
     """
     def __init__(self, Table table):
         self.c_obj = table_input_metadata(table.view())
+        self.column_metadata = []
+
+        cdef int num_columns = self.c_obj.column_metadata.size()
+        for i in range(num_columns):
+            self.column_metadata.append(ColumnInMetadata.from_libcudf(&self.c_obj.column_metadata[i]))

str.encode(_index_level_name(idx_name, level, table._column_names))
)
num_index_cols_meta = len(table._index.names)
else:
tv = table_view_from_table(table, ignore_index=True)
tbl_meta = table_input_metadata(tv)
plc_table = plc.Table(
[col.to_pylibcudf(mode="read") for col in table._columns]
)
tbl_meta = TableInputMetadata(plc_table)
num_index_cols_meta = 0

if cols_as_map_type is not None:
cols_as_map_type = set(cols_as_map_type)

for i, name in enumerate(table._column_names, num_index_cols_meta):
tbl_meta.column_metadata[i].set_name(name.encode())
tbl_meta.c_obj.column_metadata[i].set_name(name.encode())
_set_col_children_metadata(
table[name]._column,
tbl_meta.column_metadata[i],
tbl_meta.c_obj.column_metadata[i],
(cols_as_map_type is not None)
and (name in cols_as_map_type),
)

cdef orc_writer_options c_orc_writer_options = move(
orc_writer_options.builder(
sink_info_c, tv
).metadata(tbl_meta)
.key_value_metadata(move(user_data))
options = (
plc.io.orc.OrcWriterOptions.builder(
plc.io.SinkInfo([path_or_buf]), plc_table
)
.metadata(tbl_meta)
.key_value_metadata(user_data)
.compression(_get_comp_type(compression))
.enable_statistics(_get_orc_stat_freq(statistics))
.build()
)
if stripe_size_bytes is not None:
c_orc_writer_options.set_stripe_size_bytes(stripe_size_bytes)
options.set_stripe_size_bytes(stripe_size_bytes)
if stripe_size_rows is not None:
c_orc_writer_options.set_stripe_size_rows(stripe_size_rows)
options.set_stripe_size_rows(stripe_size_rows)
if row_index_stride is not None:
c_orc_writer_options.set_row_index_stride(row_index_stride)
options.set_row_index_stride(row_index_stride)

with nogil:
libcudf_write_orc(c_orc_writer_options)
plc.io.orc.write_orc(options)


cdef int64_t get_skiprows_arg(object arg) except*:
Expand All @@ -326,13 +322,12 @@ cdef class ORCWriter:
cudf.io.orc.to_orc
"""
cdef bool initialized
cdef unique_ptr[orc_chunked_writer] writer
cdef sink_info sink
cdef unique_ptr[data_sink] _data_sink
cdef OrcChunkedWriter writer
cdef SinkInfo sink
cdef str statistics
cdef object compression
cdef object index
cdef table_input_metadata tbl_meta
cdef TableInputMetadata tbl_meta
cdef object cols_as_map_type
cdef object stripe_size_bytes
cdef object stripe_size_rows
Expand All @@ -347,8 +342,7 @@ cdef class ORCWriter:
object stripe_size_bytes=None,
object stripe_size_rows=None,
object row_index_stride=None):

self.sink = make_sink_info(path, self._data_sink)
self.sink = plc.io.SinkInfo([path])
self.statistics = statistics
self.compression = compression
self.index = index
Expand All @@ -368,17 +362,23 @@ cdef class ORCWriter:
table._index.name is not None or
isinstance(table._index, cudf.core.multiindex.MultiIndex)
)
tv = table_view_from_table(table, not keep_index)
if keep_index:
columns = [
col.to_pylibcudf(mode="read")
for col in itertools.chain(table.index._columns, table._columns)
]
else:
columns = [col.to_pylibcudf(mode="read") for col in table._columns]

with nogil:
self.writer.get()[0].write(tv)
self.writer.write(
plc.Table(columns)
)
Matt711 marked this conversation as resolved.
Show resolved Hide resolved

def close(self):
if not self.initialized:
return

with nogil:
self.writer.get()[0].close()
self.writer.close()

def __dealloc__(self):
self.close()
Expand All @@ -387,60 +387,74 @@ cdef class ORCWriter:
"""
Prepare all the values required to build the
chunked_orc_writer_options anb creates a writer"""
cdef table_view tv

num_index_cols_meta = 0
self.tbl_meta = table_input_metadata(
table_view_from_table(table, ignore_index=True),
plc_table = plc.Table(
[
col.to_pylibcudf(mode="read")
for col in table._columns
]
)
self.tbl_meta = TableInputMetadata(plc_table)
if self.index is not False:
if isinstance(table._index, cudf.core.multiindex.MultiIndex):
tv = table_view_from_table(table)
self.tbl_meta = table_input_metadata(tv)
plc_table = plc.Table(
[
col.to_pylibcudf(mode="read")
for col in itertools.chain(table.index._columns, table._columns)
]
)
self.tbl_meta = TableInputMetadata(plc_table)
for level, idx_name in enumerate(table._index.names):
self.tbl_meta.column_metadata[level].set_name(
(str.encode(idx_name))
self.tbl_meta.c_obj.column_metadata[level].set_name(
str.encode(idx_name)
)
num_index_cols_meta = len(table._index.names)
else:
if table._index.name is not None:
tv = table_view_from_table(table)
self.tbl_meta = table_input_metadata(tv)
self.tbl_meta.column_metadata[0].set_name(
plc_table = plc.Table(
[
col.to_pylibcudf(mode="read")
for col in itertools.chain(
table.index._columns, table._columns
)
]
)
self.tbl_meta = TableInputMetadata(plc_table)
self.tbl_meta.c_obj.column_metadata[0].set_name(
str.encode(table._index.name)
)
num_index_cols_meta = 1

for i, name in enumerate(table._column_names, num_index_cols_meta):
self.tbl_meta.column_metadata[i].set_name(name.encode())
self.tbl_meta.c_obj.column_metadata[i].set_name(name.encode())
_set_col_children_metadata(
table[name]._column,
self.tbl_meta.column_metadata[i],
self.tbl_meta.c_obj.column_metadata[i],
(self.cols_as_map_type is not None)
and (name in self.cols_as_map_type),
)

cdef map[string, string] user_data
user_data = {}
pandas_metadata = generate_pandas_metadata(table, self.index)
user_data[str.encode("pandas")] = str.encode(pandas_metadata)

cdef chunked_orc_writer_options c_opts = move(
chunked_orc_writer_options.builder(self.sink)
.metadata(self.tbl_meta)
.key_value_metadata(move(user_data))
.compression(_get_comp_type(self.compression))
.enable_statistics(_get_orc_stat_freq(self.statistics))
.build()
)
user_data["pandas"] = pandas_metadata

options = (
plc.io.orc.ChunkedOrcWriterOptions.builder(self.sink)
.metadata(self.tbl_meta)
.key_value_metadata(user_data)
.compression(_get_comp_type(self.compression))
.enable_statistics(_get_orc_stat_freq(self.statistics))
.build()
)
if self.stripe_size_bytes is not None:
c_opts.set_stripe_size_bytes(self.stripe_size_bytes)
options.set_stripe_size_bytes(self.stripe_size_bytes)
if self.stripe_size_rows is not None:
c_opts.set_stripe_size_rows(self.stripe_size_rows)
options.set_stripe_size_rows(self.stripe_size_rows)
if self.row_index_stride is not None:
c_opts.set_row_index_stride(self.row_index_stride)
options.set_row_index_stride(self.row_index_stride)

with nogil:
self.writer.reset(new orc_chunked_writer(c_opts))
self.writer = plc.io.orc.OrcChunkedWriter.from_options(options)

self.initialized = True

Expand Down
65 changes: 63 additions & 2 deletions python/pylibcudf/pylibcudf/io/orc.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,33 @@ from libcpp cimport bool
from libcpp.optional cimport optional
from libcpp.string cimport string
from libcpp.vector cimport vector
from pylibcudf.io.types cimport SourceInfo, TableWithMetadata
from libcpp.memory cimport unique_ptr
from libcpp.map cimport map
from pylibcudf.io.types cimport (
SourceInfo,
SinkInfo,
TableWithMetadata,
TableInputMetadata,
)
from pylibcudf.libcudf.io.orc_metadata cimport (
column_statistics,
parsed_orc_statistics,
statistics_type,
)
from pylibcudf.libcudf.io.orc cimport (
orc_chunked_writer,
orc_writer_options,
orc_writer_options_builder,
chunked_orc_writer_options,
chunked_orc_writer_options_builder,
)
from pylibcudf.libcudf.types cimport size_type
from pylibcudf.types cimport DataType

from pylibcudf.table cimport Table
from pylibcudf.libcudf.io.types cimport (
compression_type,
statistics_freq,
)

cpdef TableWithMetadata read_orc(
SourceInfo source_info,
Expand Down Expand Up @@ -48,3 +66,46 @@ cdef class ParsedOrcStatistics:
cpdef ParsedOrcStatistics read_parsed_orc_statistics(
SourceInfo source_info
)

cdef class OrcWriterOptions:
cdef orc_writer_options c_obj
cdef Table table
cdef SinkInfo sink
cpdef void set_stripe_size_bytes(self, size_t size_bytes)
cpdef void set_stripe_size_rows(self, size_t size_rows)
cpdef void set_row_index_stride(self, size_t stride)

cdef class OrcWriterOptionsBuilder:
cdef orc_writer_options_builder c_obj
cdef Table table
cdef SinkInfo sink
cpdef OrcWriterOptionsBuilder compression(self, compression_type comp)
cpdef OrcWriterOptionsBuilder enable_statistics(self, statistics_freq val)
cpdef OrcWriterOptionsBuilder key_value_metadata(self, dict kvm)
cpdef OrcWriterOptionsBuilder metadata(self, TableInputMetadata meta)
cpdef OrcWriterOptions build(self)

cpdef void write_orc(OrcWriterOptions options)

cdef class OrcChunkedWriter:
cdef unique_ptr[orc_chunked_writer] c_obj
cpdef void close(self)
cpdef write(self, Table table)

cdef class ChunkedOrcWriterOptions:
cdef chunked_orc_writer_options c_obj
cdef SinkInfo sink
cpdef void set_stripe_size_bytes(self, size_t size_bytes)
cpdef void set_stripe_size_rows(self, size_t size_rows)
cpdef void set_row_index_stride(self, size_t stride)

cdef class ChunkedOrcWriterOptionsBuilder:
cdef chunked_orc_writer_options_builder c_obj
cdef SinkInfo sink
cpdef ChunkedOrcWriterOptionsBuilder compression(self, compression_type comp)
cpdef ChunkedOrcWriterOptionsBuilder enable_statistics(self, statistics_freq val)
cpdef ChunkedOrcWriterOptionsBuilder key_value_metadata(
self, dict kvm
)
cpdef ChunkedOrcWriterOptionsBuilder metadata(self, TableInputMetadata meta)
cpdef ChunkedOrcWriterOptions build(self)
Loading
Loading