Skip to content

Commit

Permalink
Fix CUDF's Column.from_column_view by copying it and adjusting. (nv…
Browse files Browse the repository at this point in the history
…-morpheus#2004)

Closes nv-morpheus#1934

## By Submitting this PR I confirm:
- I am familiar with the [Contributing Guidelines](https://github.com/nv-morpheus/Morpheus/blob/main/docs/source/developer_guide/contributing.md).
- When the PR is ready for review, new or existing tests cover these changes.
- When the PR is ready for review, the documentation is up to date with these changes.

Authors:
  - Christopher Harris (https://github.com/cwharris)

Approvers:
  - Michael Demoret (https://github.com/mdemoret-nv)

URL: nv-morpheus#2004
  • Loading branch information
cwharris authored Oct 28, 2024
1 parent 4d84238 commit a0ac32f
Show file tree
Hide file tree
Showing 3 changed files with 261 additions and 3 deletions.
226 changes: 224 additions & 2 deletions python/morpheus/morpheus/_lib/cudf_helpers.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,228 @@ from cudf._lib.column cimport Column
from cudf._lib.utils cimport data_from_unique_ptr
from cudf._lib.utils cimport table_view_from_table

##### THE FOLLOWING CODE IS COPIED FROM CUDF AND SHOULD BE REMOVED WHEN UPDATING TO cudf>=24.12 #####
# see https://github.com/rapidsai/cudf/pull/17193 for details

# isort: off

# imports needed for get_element, which is required by from_column_view_with_fix
cimport pylibcudf.libcudf.copying as cpp_copying
from pylibcudf.libcudf.column.column_view cimport column_view
from libcpp.memory cimport make_unique, unique_ptr
from pylibcudf.libcudf.scalar.scalar cimport scalar
from cudf._lib.scalar cimport DeviceScalar

# imports needed for from_column_view_with_fix
import rmm
from libc.stdint cimport uintptr_t
from cudf.core.buffer import (
# Buffer,
ExposureTrackedBuffer,
SpillableBuffer,
# acquire_spill_lock,
as_buffer,
# cuda_array_interface_wrapper,
)
cimport pylibcudf.libcudf.types as libcudf_types
from cudf._lib.types cimport (
dtype_from_column_view,
# dtype_to_data_type,
# dtype_to_pylibcudf_type,
)
from cudf._lib.null_mask import bitmask_allocation_size_bytes

# isort: on

cdef get_element(column_view col_view, size_type index):

cdef unique_ptr[scalar] c_output
with nogil:
c_output = move(
cpp_copying.get_element(col_view, index)
)

return DeviceScalar.from_unique_ptr(
move(c_output), dtype=dtype_from_column_view(col_view)
)

cdef Column from_column_view_with_fix(column_view cv, object owner):
"""
Given a ``cudf::column_view``, constructs a ``cudf.Column`` from it,
along with referencing an ``owner`` Python object that owns the memory
lifetime. If ``owner`` is a ``cudf.Column``, we reach inside of it and
make the owner of each newly created ``Buffer`` the respective
``Buffer`` from the ``owner`` ``cudf.Column``.
If ``owner`` is ``None``, we allocate new memory for the resulting
``cudf.Column``.
"""
column_owner = isinstance(owner, Column)
mask_owner = owner
if column_owner and isinstance(owner.dtype, cudf.CategoricalDtype):
owner = owner.base_children[0]

size = cv.size()
offset = cv.offset()
dtype = dtype_from_column_view(cv)
dtype_itemsize = getattr(dtype, "itemsize", 1)

data_ptr = <uintptr_t>(cv.head[void]())
data = None
base_size = size + offset
data_owner = owner

if column_owner:
data_owner = owner.base_data
mask_owner = mask_owner.base_mask
base_size = owner.base_size
base_nbytes = base_size * dtype_itemsize
# special case for string column
is_string_column = (cv.type().id() == libcudf_types.type_id.STRING)
if is_string_column:
if cv.num_children() == 0:
base_nbytes = 0
else:
# get the size from offset child column (device to host copy)
offsets_column_index = 0
offset_child_column = cv.child(offsets_column_index)
if offset_child_column.size() == 0:
base_nbytes = 0
else:
chars_size = get_element(
offset_child_column, offset_child_column.size()-1).value
base_nbytes = chars_size

if data_ptr:
if data_owner is None:
buffer_size = (
base_nbytes
if is_string_column
else ((size + offset) * dtype_itemsize)
)
data = as_buffer(
rmm.DeviceBuffer(ptr=data_ptr,
size=buffer_size)
)
elif (
column_owner and
isinstance(data_owner, ExposureTrackedBuffer)
):
data = as_buffer(
data=data_ptr,
size=base_nbytes,
owner=data_owner,
exposed=False,
)
elif (
# This is an optimization of the most common case where
# from_column_view creates a "view" that is identical to
# the owner.
column_owner and
isinstance(data_owner, SpillableBuffer) and
# We check that `data_owner` is spill locked (not spillable)
# and that it points to the same memory as `data_ptr`.
not data_owner.spillable and
data_owner.memory_info() == (data_ptr, base_nbytes, "gpu")
):
data = data_owner
else:
# At this point we don't know the relationship between data_ptr
# and data_owner thus we mark both of them exposed.
# TODO: try to discover their relationship and create a
# SpillableBufferSlice instead.
data = as_buffer(
data=data_ptr,
size=base_nbytes,
owner=data_owner,
exposed=True,
)
if isinstance(data_owner, ExposureTrackedBuffer):
# accessing the pointer marks it exposed permanently.
data_owner.mark_exposed()
elif isinstance(data_owner, SpillableBuffer):
if data_owner.is_spilled:
raise ValueError(
f"{data_owner} is spilled, which invalidates "
f"the exposed data_ptr ({hex(data_ptr)})"
)
# accessing the pointer marks it exposed permanently.
data_owner.mark_exposed()
else:
data = as_buffer(
rmm.DeviceBuffer(ptr=data_ptr, size=0)
)

mask = None
mask_ptr = <uintptr_t>(cv.null_mask())
if mask_ptr:
if mask_owner is None:
if column_owner:
# if we reached here, it means `owner` is a `Column`
# that does not have a null mask, but `cv` thinks it
# should have a null mask. This can happen in the
# following sequence of events:
#
# 1) `cv` is constructed as a view into a
# `cudf::column` that is nullable (i.e., it has
# a null mask), but contains no nulls.
# 2) `owner`, a `Column`, is constructed from the
# same `cudf::column`. Because `cudf::column`
# is memory owning, `owner` takes ownership of
# the memory owned by the
# `cudf::column`. Because the column has a null
# count of 0, it may choose to discard the null
# mask.
# 3) Now, `cv` points to a discarded null mask.
#
# TL;DR: we should not include a null mask in the
# result:
mask = None
else:
mask = as_buffer(
rmm.DeviceBuffer(
ptr=mask_ptr,
size=bitmask_allocation_size_bytes(base_size)
)
)
else:
mask = as_buffer(
data=mask_ptr,
size=bitmask_allocation_size_bytes(base_size),
owner=mask_owner,
exposed=True
)

if cv.has_nulls():
null_count = cv.null_count()
else:
null_count = 0

children = []
for child_index in range(cv.num_children()):
child_owner = owner
if column_owner:
child_owner = owner.base_children[child_index]
children.append(
from_column_view_with_fix(
cv.child(child_index),
child_owner
)
)
children = tuple(children)

result = cudf.core.column.build_column(
data=data,
dtype=dtype,
mask=mask,
size=size,
offset=offset,
null_count=null_count,
children=tuple(children)
)

return result

##### THE PREVIOUS CODE IS COPIED FROM CUDF AND SHOULD BE REMOVED WHEN UPDATING TO cudf>=24.12 #####

cdef vector[string] get_column_names(object tbl, object index):
cdef vector[string] column_names
Expand Down Expand Up @@ -188,7 +410,7 @@ cdef public api:
if table_owner:
column_owner = owner._index._columns[column_idx]
index_columns.append(
Column.from_column_view(
from_column_view_with_fix(
tv.column(column_idx),
column_owner
)
Expand All @@ -205,7 +427,7 @@ cdef public api:
if table_owner:
column_owner = owner._columns[column_indices[source_column_idx]]
data_columns.append(
Column.from_column_view(tv.column(column_idx), column_owner)
from_column_view_with_fix(tv.column(column_idx), column_owner)
)
column_idx += 1
source_column_idx += 1
Expand Down
12 changes: 11 additions & 1 deletion python/morpheus/morpheus/_lib/cudf_helpers/__init__.pyi
Original file line number Diff line number Diff line change
@@ -1,14 +1,24 @@
from __future__ import annotations
import morpheus._lib.cudf_helpers
import typing
from cudf.core.buffer.exposure_tracked_buffer import ExposureTrackedBuffer
from cudf.core.buffer.spillable_buffer import SpillableBuffer
from cudf.core.dtypes import StructDtype
import _cython_3_0_11
import cudf
import rmm

__all__ = [
"ExposureTrackedBuffer",
"SpillableBuffer",
"StructDtype",
"cudf"
"as_buffer",
"bitmask_allocation_size_bytes",
"cudf",
"rmm"
]


__pyx_capi__: dict # value = {'make_table_from_table_with_metadata': <capsule object "PyObject *(cudf::io::table_with_metadata, int)">, 'make_table_from_table_info_data': <capsule object "PyObject *(morpheus::TableInfoData, PyObject *)">, 'make_table_info_data_from_table': <capsule object "morpheus::TableInfoData (PyObject *)">, 'data_from_table_view_indexed': <capsule object "PyObject *(cudf::table_view, PyObject *, PyObject *, PyObject *, PyObject *)">}
__test__ = {}
bitmask_allocation_size_bytes: _cython_3_0_11.cython_function_or_method # value = <cyfunction bitmask_allocation_size_bytes>
26 changes: 26 additions & 0 deletions python/morpheus/morpheus/_lib/tests/messages/test_message_meta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,20 @@

#include <gtest/gtest.h> // for TestInfo, TEST_F
#include <pybind11/gil.h> // for gil_scoped_release
#include <pybind11/pybind11.h>
#include <pybind11/pytypes.h>
#include <rmm/cuda_stream_view.hpp> // for cuda_stream_per_thread
#include <rmm/device_buffer.hpp> // for device_buffer

#include <cstdint> // for int64_t
#include <filesystem> // for operator/, path
#include <memory> // for allocator, __shared_ptr_access, shared_ptr, make_shared
#include <utility> // for move
#include <vector> // for vector

using namespace morpheus;
using namespace morpheus::test;
using namespace pybind11::literals;

using TestMessageMeta = morpheus::test::TestMessages; // NOLINT(readability-identifier-naming)

Expand Down Expand Up @@ -82,3 +86,25 @@ TEST_F(TestMessageMeta, CopyRangeAndSlicing)
assert_eq_device_to_host(sliced_meta->get_info().get_column(0), sliced_expected_int);
assert_eq_device_to_host(sliced_meta->get_info().get_column(1), sliced_expected_double);
}

TEST_F(TestMessageMeta, Issue1934)
{
// Reproduce issue 1934 (https://github.com/nv-morpheus/Morpheus/issues/1934)
// The bug causes a segfault when calling `get_data_frame` on a message meta object
namespace py = pybind11;
py::gil_scoped_acquire gil;

auto cudf_mod = py::module_::import("cudf");
auto a_col = py::list();
auto v1 = py::list();
v1.append(py::str("a"));
a_col.attr("append")(std::move(v1));
a_col.attr("append")(py::none());

auto df = cudf_mod.attr("DataFrame")(py::dict("a"_a = std::move(a_col)));
df.attr("drop")(0, "inplace"_a = true);

auto msg = MessageMetaInterfaceProxy::init_python(std::move(df));

auto df_copy = MessageMetaInterfaceProxy::get_data_frame(*msg);
}

0 comments on commit a0ac32f

Please sign in to comment.