Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Warn & replace dataframes with non-unique indexes #691

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
63 commits
Select commit Hold shift + click to select a range
5b90244
Add unittest for issue #686
dagardner-nv Feb 9, 2023
1f168a3
wip
dagardner-nv Feb 10, 2023
f38a07f
wip
dagardner-nv Feb 10, 2023
eca479a
Add 'has_unique_index' helper method to MessageMeta
dagardner-nv Feb 10, 2023
d824440
Add integration test for desrialization stage, along with test for is…
dagardner-nv Feb 10, 2023
5bf99bf
Test for has_unique_index method
dagardner-nv Feb 10, 2023
3add91b
Remove parametrize variables not needed for this test
dagardner-nv Feb 10, 2023
e3be4cf
First pass at replacing a non-unique index
dagardner-nv Feb 10, 2023
e43ac89
Add cpp impl for has_unique_index
dagardner-nv Feb 10, 2023
e60742c
wip
dagardner-nv Feb 10, 2023
53ee170
Move index reset to MutableTableInfo so that the column & index names…
dagardner-nv Feb 10, 2023
3fd0ea3
use logger.warning instead of logger.warn
dagardner-nv Feb 10, 2023
c651744
Update multi-segment test
dagardner-nv Feb 10, 2023
1d41fd6
Select only the columns in the view when writing json
dagardner-nv Feb 10, 2023
f9396be
Log and ignore include_index_col=false, otherwise cudf will throw an …
dagardner-nv Feb 11, 2023
fb141c9
wip
dagardner-nv Feb 11, 2023
c77360f
Document work-around
dagardner-nv Feb 11, 2023
ef3eb30
Fix casing for cuDF
dagardner-nv Feb 13, 2023
0554785
Merge branch 'branch-23.03' into david-warn-non-unique-686
dagardner-nv Feb 13, 2023
ae7d4af
Change fatal log to an error log
dagardner-nv Feb 13, 2023
ccc6e6c
Only set include_index_col=False when writing CSV
dagardner-nv Feb 13, 2023
d9669e2
Merge branch 'branch-23.03' into david-warn-non-unique-686
dagardner-nv Feb 14, 2023
bf4d4e6
Merge branch 'branch-23.03' into david-warn-non-unique-686
dagardner-nv Feb 15, 2023
7acff20
wip
dagardner-nv Feb 15, 2023
2a14392
Move index reset logic to a method on MessageMeta
dagardner-nv Feb 15, 2023
123d216
Merge branch 'david-warn-non-unique-686' of github.com:dagardner-nv/M…
dagardner-nv Feb 15, 2023
aad70ca
Repeat test with dup id occurring at the front and the end of the df
dagardner-nv Feb 15, 2023
d63bdcc
Only use the index for slicing if the index is unique, otherwise use …
dagardner-nv Feb 15, 2023
a46d43e
Add test for replace_non_unique_index method
dagardner-nv Feb 15, 2023
4715a5d
rename reset_index to replace_non_unique_index
dagardner-nv Feb 15, 2023
829cc3d
Remove unused import
dagardner-nv Feb 15, 2023
50dcce7
Add missing docstring
dagardner-nv Feb 15, 2023
a0f1388
Merge branch 'branch-23.03' into david-warn-non-unique-686
dagardner-nv Feb 16, 2023
09c1e0f
Merge branch 'branch-23.03' into david-warn-non-unique-686
dagardner-nv Feb 22, 2023
7c2f770
Merge branch 'branch-23.03' into david-warn-non-unique-686
dagardner-nv Feb 22, 2023
6ad14c8
Merge branch 'branch-23.03' into david-warn-non-unique-686
dagardner-nv Feb 23, 2023
ec0ed04
Merge branch 'branch-23.03' into david-warn-non-unique-686
dagardner-nv Feb 24, 2023
22daccd
Merge branch 'branch-23.03' into david-warn-non-unique-686
dagardner-nv Feb 25, 2023
1f35d00
Add missing includes
dagardner-nv Feb 25, 2023
7ad8d2d
Cleanup includes
dagardner-nv Feb 25, 2023
e4976db
Merge branch 'branch-23.03' into david-warn-non-unique-686
dagardner-nv Feb 27, 2023
0b2d13c
Merge branch 'branch-23.03' into david-warn-non-unique-686
dagardner-nv Mar 7, 2023
4b99d1e
Merge branch 'branch-23.03' into david-warn-non-unique-686
dagardner-nv Mar 7, 2023
482fd45
Adding additional tests to MultiMessage and fixing the bugs it discovers
mdemoret-nv Mar 10, 2023
bb54dad
All multi message tests passing
mdemoret-nv Mar 14, 2023
d4b8761
Most tests now passing
mdemoret-nv Mar 14, 2023
c002a93
Merge branch 'branch-23.03' into david-warn-non-unique-686
mdemoret-nv Mar 15, 2023
f4fb726
Removing files that should not have been committed
mdemoret-nv Mar 15, 2023
51e4e71
Removing stub generation
mdemoret-nv Mar 15, 2023
76921d3
Fixing up post merge failures
mdemoret-nv Mar 15, 2023
65e7edb
Large cleanup and added multi tensor tests
mdemoret-nv Mar 16, 2023
b55f50d
Merge branch 'branch-23.03' into david-warn-non-unique-686
mdemoret-nv Mar 16, 2023
4e92c8b
Style cleanup
mdemoret-nv Mar 16, 2023
68ff815
Merge branch 'branch-23.03' into david-warn-non-unique-686
mdemoret-nv Mar 16, 2023
77e2db0
Cleaning up the code
mdemoret-nv Mar 16, 2023
1ac0c6a
Large cleanup
mdemoret-nv Mar 16, 2023
39beb1f
Non-slow tests passing
mdemoret-nv Mar 17, 2023
42a70b9
Large cleanup. All tests passing locally
mdemoret-nv Mar 17, 2023
1cfa57d
Merge branch 'branch-23.03' into david-warn-non-unique-686
mdemoret-nv Mar 17, 2023
5bf02e9
Removing stubs from the build in CI
mdemoret-nv Mar 17, 2023
345fa78
IWYU fixes
mdemoret-nv Mar 17, 2023
365f583
Final changes to get CI to pass
mdemoret-nv Mar 17, 2023
1d9fe36
Style fixes
mdemoret-nv Mar 17, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Most tests now passing
  • Loading branch information
mdemoret-nv committed Mar 14, 2023
commit d4b8761f2a14cf66fa6b571b495dc1f13b4bc56c
27 changes: 16 additions & 11 deletions morpheus/messages/message_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import abc
import dataclasses
import functools
import typing

from morpheus.config import CppConfig
Expand All @@ -29,8 +30,8 @@ class MessageImpl(abc.ABCMeta):

_cpp_class: typing.Union[type, typing.Callable] = None

def __new__(cls, classname, bases, classdict, cpp_class=None):
result = super().__new__(cls, classname, bases, classdict)
def __new__(cls, name, bases, namespace, /, cpp_class=None, **kwargs):
result = super().__new__(cls, name, bases, namespace, **kwargs)

# Set the C++ class type into the object to use for creation later if desired
result._cpp_class = cpp_class
Expand All @@ -39,6 +40,19 @@ def __new__(cls, classname, bases, classdict, cpp_class=None):
if (cpp_class is not None):
result.register(cpp_class)

# Wrap __new__ to attempt to provide the right type annotations
@functools.wraps(result.__new__)
def _internal_new(other_cls, *args, **kwargs):

# If _cpp_class is set, and use_cpp is enabled, create the C++ instance
if (getattr(other_cls, "_cpp_class", None) is not None and CppConfig.get_should_use_cpp()):
return cpp_class(*args, **kwargs)

# Otherwise, do the default init
return object.__new__(other_cls)

result.__new__ = _internal_new

return result


Expand All @@ -48,15 +62,6 @@ class MessageBase(metaclass=MessageImpl):
class has an associated C++ implementation (`cpp_class`), returns the Python implementation for all others.
"""

def __new__(cls, *args, **kwargs):

# If _cpp_class is set, and use_cpp is enabled, create the C++ instance
if (getattr(cls, "_cpp_class", None) is not None and CppConfig.get_should_use_cpp()):
return cls._cpp_class(*args, **kwargs)

# Otherwise, do the default init
return super().__new__(cls)


@dataclasses.dataclass
class MessageData(MessageBase):
Expand Down
6 changes: 3 additions & 3 deletions morpheus/messages/multi_ae_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ def __init__(self,
meta: MessageMeta,
mess_offset: int = 0,
mess_count: int = -1,
model: AutoEncoder = None,
train_scores_mean: float = float("NaN"),
train_scores_std: float = float("NaN")):
model: AutoEncoder,
train_scores_mean: float,
train_scores_std: float):
super().__init__(meta=meta, mess_offset=mess_offset, mess_count=mess_count)

self.model = model
Expand Down
91 changes: 78 additions & 13 deletions morpheus/messages/multi_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
from morpheus.messages.message_base import MessageData
from morpheus.messages.message_meta import MessageMeta

Self = typing.TypeVar("Self", bound="MultiMessage")


@dataclasses.dataclass
class MultiMessage(MessageData, cpp_class=_messages.MultiMessage):
Expand Down Expand Up @@ -135,18 +137,18 @@ def _calc_message_slice_bounds(self, start: int, stop: int):

return offset, count

def _duplicate_message_with_kwargs(self, **kwargs):
all_fields = dataclasses.fields(self)

# Now loop over all fields and copy any derived properties
for f in all_fields:
if (f.name in kwargs or not f.init):
continue
@typing.overload
def get_meta(self) -> cudf.DataFrame:
...

kwargs[f.name] = getattr(self, f.name)
@typing.overload
def get_meta(self, columns: str) -> cudf.Series:
...

# Make sure to return an instance of the class
return self.__class__(**kwargs)
@typing.overload
def get_meta(self, columns: typing.List[str]) -> cudf.DataFrame:
...

def get_meta(self, columns: typing.Union[None, str, typing.List[str]] = None):
"""
Expand Down Expand Up @@ -276,7 +278,7 @@ def get_slice(self, start, stop):
# Calc the offset and count. This checks the bounds for us
offset, count = self._calc_message_slice_bounds(start=start, stop=stop)

return self._duplicate_message_with_kwargs(meta=self.meta, mess_offset=offset, mess_count=count)
return self.from_message(self, meta=self.meta, mess_offset=offset, mess_count=count)

def _ranges_to_mask(self, df, ranges):
if isinstance(df, cudf.DataFrame):
Expand Down Expand Up @@ -335,6 +337,69 @@ def copy_ranges(self, ranges: typing.List[typing.Tuple[int, int]]):
"""
sliced_rows = self.copy_meta_ranges(ranges)

return self._duplicate_message_with_kwargs(meta=MessageMeta(sliced_rows),
mess_offset=0,
mess_count=len(sliced_rows))
return self.from_message(self, meta=MessageMeta(sliced_rows), mess_offset=0, mess_count=len(sliced_rows))

@classmethod
def from_message(cls: typing.Type[Self],
message: "MultiMessage",
*,
meta: MessageMeta = None,
mess_offset: int = -1,
mess_count: int = -1,
**kwargs) -> Self:

if (message is None):
raise ValueError("Must define `message` when creating a MultiMessage with `from_message`")

if (mess_offset == -1):
if (meta is not None):
mess_offset = 0
else:
mess_offset = message.mess_offset

if (mess_count == -1):
if (meta is not None):
# Subtract offset here so we dont go over the end
mess_count = meta.count - mess_offset
else:
mess_count = message.mess_count

# Do meta last
if meta is None:
meta = message.meta

# Update the kwargs
kwargs.update({
"meta": meta,
"mess_offset": mess_offset,
"mess_count": mess_count,
})

import inspect

signature = inspect.signature(cls.__init__)

for p_name, param in signature.parameters.items():

if (p_name == "self"):
# Skip self until this this is fixed (python 3.9) https://github.com/python/cpython/issues/85074
# After that, switch to using inspect.signature(cls)
continue

# Skip if its already defined
if (p_name in kwargs):
continue

if (not hasattr(message, p_name)):
# Check for a default
if (param.default == inspect.Parameter.empty):
raise AttributeError(
f"Cannot create message of type {cls}, from {message}. Missing property '{p_name}'")

# Otherwise, we can ignore
continue

kwargs[p_name] = getattr(message, p_name)

# Create a new instance using the kwargs
return cls(**kwargs)
3 changes: 2 additions & 1 deletion morpheus/messages/multi_response_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import morpheus._lib.messages as _messages
from morpheus.messages.memory.tensor_memory import TensorMemory
from morpheus.messages.message_meta import MessageMeta
from morpheus.messages.multi_message import MultiMessage
from morpheus.messages.multi_tensor_message import MultiTensorMessage


Expand Down Expand Up @@ -111,7 +112,7 @@ def __init__(self,
meta: MessageMeta,
mess_offset: int = 0,
mess_count: int = -1,
memory: TensorMemory = None,
memory: TensorMemory,
offset: int = 0,
count: int = -1):

Expand Down
66 changes: 56 additions & 10 deletions morpheus/messages/multi_tensor_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
from morpheus.messages.message_meta import MessageMeta
from morpheus.messages.multi_message import MultiMessage

Self = typing.TypeVar("Self", bound="MultiTensorMessage")


@dataclasses.dataclass
class MultiTensorMessage(MultiMessage, cpp_class=_messages.MultiTensorMessage):
Expand Down Expand Up @@ -49,7 +51,7 @@ def __init__(self,
meta: MessageMeta,
mess_offset: int = 0,
mess_count: int = -1,
memory: TensorMemory = None,
memory: TensorMemory,
offset: int = 0,
count: int = -1):

Expand Down Expand Up @@ -230,15 +232,15 @@ def copy_ranges(self, ranges: typing.List[typing.Tuple[int, int]]):
sliced_count = len(sliced_rows)
sliced_tensors = self.copy_tensor_ranges(ranges, mask=mask)

mem = TensorMemory(count=sliced_count)
mem.tensors = sliced_tensors
mem = TensorMemory(count=sliced_count, tensors=sliced_tensors)

return self._duplicate_message_with_kwargs(meta=MessageMeta(sliced_rows),
mess_offset=0,
mess_count=sliced_count,
memory=mem,
offset=0,
count=sliced_count)
return self.from_message(self,
meta=MessageMeta(sliced_rows),
mess_offset=0,
mess_count=sliced_count,
memory=mem,
offset=0,
count=sliced_count)

def get_slice(self, start, stop):
"""
Expand Down Expand Up @@ -273,4 +275,48 @@ def get_slice(self, start, stop):
"count": count,
}

return self._duplicate_message_with_kwargs(**kwargs)
return self.from_message(self, **kwargs)

@classmethod
def from_message(cls: typing.Type[Self],
message: "MultiTensorMessage",
*,
meta: MessageMeta = None,
mess_offset: int = -1,
mess_count: int = -1,
memory: TensorMemory = None,
offset: int = -1,
count: int = -1,
**kwargs) -> Self:

if (message is None):
raise ValueError("Must define `message` when creating a MultiMessage with `from_message`")

if (offset == -1):
if (memory is not None):
offset = 0
else:
offset = message.offset

if (count == -1):
if (memory is not None):
# Subtract offset here so we dont go over the end
count = memory.count - offset
else:
count = message.count

# Do meta last
if memory is None:
memory = message.memory

# Update the kwargs
kwargs.update({
"meta": meta,
"mess_offset": mess_offset,
"mess_count": mess_count,
"memory": memory,
"offset": offset,
"count": count,
})

return super().from_message(message, **kwargs)
13 changes: 7 additions & 6 deletions morpheus/stages/inference/auto_encoder_inference_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,10 @@ def _convert_one_response(memory: ResponseMemory, inf: MultiInferenceMessage, re
for i, idx in enumerate(mess_ids):
probs[idx, :] = cp.maximum(probs[idx, :], res.probs[i, :])

return MultiResponseAEMessage(meta=inf.meta,
mess_offset=inf.mess_offset,
mess_count=inf.mess_count,
memory=memory,
offset=inf.offset,
count=inf.count)
return MultiResponseAEMessage.from_message(inf,
meta=inf.meta,
mess_offset=inf.mess_offset,
mess_count=inf.mess_count,
memory=memory,
offset=inf.offset,
count=inf.count)
9 changes: 8 additions & 1 deletion morpheus/stages/postprocess/filter_detections_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,11 @@ def filter_copy(self, x: MultiMessage) -> MultiMessage:
return None

true_pairs = self._find_detections(x)

# If we didnt have any detections, return None
if (true_pairs.shape[0] == 0):
return None

return x.copy_ranges(true_pairs)

def filter_slice(self, x: MultiMessage) -> typing.List[MultiMessage]:
Expand Down Expand Up @@ -206,7 +211,9 @@ def _build_single(self, builder: mrc.Builder, input_stream: StreamPair) -> Strea
self._field_name)
else:
if self._copy:
node = builder.make_node(self.unique_name, self.filter_copy)
node = builder.make_node(self.unique_name,
ops.map(self.filter_copy),
ops.filter(lambda x: x is not None))
else:
# Convert list back to individual messages
def flatten_fn(obs: mrc.Observable, sub: mrc.Subscriber):
Expand Down
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ filterwarnings = [
# Warning coming from mlflow's usage of numpy
'ignore:`np.object` is a deprecated alias for the builtin `object`. To silence this warning, use `object` by itself. Doing this will not modify any behavior and is safe',
'ignore:Warning the df property returns a copy, please use the copy_dataframe method or the mutable_dataframe context manager to modify the DataFrame in-place instead.',
'ignore:`np.MachAr` is deprecated \(NumPy 1.22\):DeprecationWarning',
'ignore:Please use `spmatrix` from the `scipy.sparse` namespace, the `scipy.sparse.base` namespace is deprecated:DeprecationWarning'
]

testpaths = ["tests"]
Expand Down
4 changes: 2 additions & 2 deletions tests/test_filter_detections_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,9 @@ def test_filter_copy(config):
probs = cp.array([[0.1, 0.5, 0.3], [0.2, 0.3, 0.4]])
mock_message = _make_message(df, probs)

# All values are at or below the threshold
# All values are at or below the threshold so nothing should be returned
output_message = fds.filter_copy(mock_message)
assert len(output_message.get_meta()) == 0
assert output_message is None

# Only one row has a value above the threshold
probs = cp.array([
Expand Down
18 changes: 11 additions & 7 deletions tests/test_inference_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,19 @@ def _get_inference_worker(self, pq):

def _mk_message(count=1, mess_count=1, offset=0, mess_offset=0):
m = mock.MagicMock()
m.count = count
m.offset = offset
m.meta = mock.MagicMock()
m.meta.count = mess_count
m.mess_offset = mess_offset
m.mess_count = mess_count
m.probs = cp.array([[0.1, 0.5, 0.8], [0.2, 0.6, 0.9]])

m.memory = mock.MagicMock()
m.memory.count = count
m.count = count
m.offset = offset

m.probs = cp.random.rand(count, 2)
m.seq_ids = cp.array([list(range(count)), list(range(count)), list(range(count))])
m.get_input.return_value = cp.array([[0, 1, 2], [0, 1, 2], [0, 1, 2]])
m.get_input.return_value = cp.array([list(range(count)), list(range(count)), list(range(count))])
return m


Expand Down Expand Up @@ -136,9 +142,7 @@ def test_py_inf_fn_on_next(mock_ops, mock_future, config):

mock_message = _mk_message()

mock_slice = mock.MagicMock()
mock_slice.mess_count = 1
mock_slice.count = 1
mock_slice = _mk_message(count=1, mess_count=1)
mock_slice.seq_ids = mock_message.seq_ids
mock_message.get_slice.return_value = mock_slice

Expand Down
Loading