From 9cb5a4d8da8b2d1998b99cacb0085b03fe86b6f6 Mon Sep 17 00:00:00 2001 From: David Gardner <96306125+dagardner-nv@users.noreply.github.com> Date: Tue, 7 Mar 2023 08:59:37 -0800 Subject: [PATCH] Fix Python bindings for TensorMemory (#655) * Makes the Python API compatible with the Python C++ bindings for `TensorMemory`, `InferenceMemory` & `ResponseMemory` classes. * Replaces the `tensors` attribute for memory classes with explicit `get_tensors` and `set_tensors` methods. * Make get_input, set_input, get_output & set_output methods actual methods on the base * Associated interface proxy classes now inherit from each other limiting redundant python conversion code. * Expose `MultiTensorMessage` class to Python allowing Python inheritance hierarchy to match that of C++, and consolidating some duplicated code. The reason for removing the attribute is that on the C++ side returning a Python representation of a tensor is rather costly and is always returned as a copy. We want to avoid the obvious bugs that can occur with anyone doing: ```python m = tensor_memory.TensorMemory(10) m.tensors['c'] = cp.zeros(count) ``` Which would have worked when C++ execution is disabled, and the old API is implying that it *should* work. Instead the API is changed to: ```python m = tensor_memory.TensorMemory(10) tensors = m.get_tensors() tensors['c'] = cp.zeros(count) m.set_tensors(tensors) ``` fixes #604 Authors: - David Gardner (https://github.com/dagardner-nv) - Michael Demoret (https://github.com/mdemoret-nv) Approvers: - Michael Demoret (https://github.com/mdemoret-nv) URL: https://github.com/nv-morpheus/Morpheus/pull/655 --- ci/iwyu/mappings.imp | 3 + .../messages/memory/inference_memory.hpp | 19 +- .../messages/memory/inference_memory_fil.hpp | 33 +--- .../messages/memory/inference_memory_nlp.hpp | 27 +-- .../messages/memory/response_memory.hpp | 30 ++- .../messages/memory/response_memory_probs.hpp | 23 +-- .../messages/memory/tensor_memory.hpp | 138 ++++++++++++- .../morpheus/messages/multi_inference.hpp | 55 +----- .../morpheus/messages/multi_inference_fil.hpp | 27 ++- .../morpheus/messages/multi_inference_nlp.hpp | 38 +--- .../morpheus/messages/multi_response.hpp | 32 +-- .../messages/multi_response_probs.hpp | 27 +-- .../morpheus/messages/multi_tensor.hpp | 80 +++++++- .../include/morpheus/utilities/cupy_util.hpp | 36 +++- .../src/messages/memory/inference_memory.cpp | 17 +- .../messages/memory/inference_memory_fil.cpp | 51 ++--- .../messages/memory/inference_memory_nlp.cpp | 68 ++----- .../src/messages/memory/response_memory.cpp | 25 +-- .../messages/memory/response_memory_probs.cpp | 25 +-- .../src/messages/memory/tensor_memory.cpp | 125 +++++++++++- .../_lib/src/messages/multi_inference.cpp | 31 --- .../_lib/src/messages/multi_inference_fil.cpp | 16 +- .../_lib/src/messages/multi_inference_nlp.cpp | 34 +--- morpheus/_lib/src/messages/multi_response.cpp | 26 --- .../src/messages/multi_response_probs.cpp | 25 +-- morpheus/_lib/src/messages/multi_tensor.cpp | 68 ++++++- .../src/objects/mutable_table_ctx_mgr.cpp | 2 +- morpheus/_lib/src/python_modules/messages.cpp | 153 ++++++++------- morpheus/_lib/src/stages/preprocess_fil.cpp | 3 +- morpheus/_lib/src/utilities/cupy_util.cpp | 23 +++ morpheus/messages/__init__.py | 16 +- morpheus/messages/memory/__init__.py | 0 morpheus/messages/memory/inference_memory.py | 138 +++++++++++++ morpheus/messages/memory/response_memory.py | 101 ++++++++++ morpheus/messages/memory/tensor_memory.py | 148 ++++++++++++++ morpheus/messages/multi_ae_message.py | 11 +- morpheus/messages/multi_inference_message.py | 185 ++---------------- morpheus/messages/multi_message.py | 11 +- morpheus/messages/multi_response_message.py | 182 ++--------------- morpheus/messages/multi_tensor_message.py | 176 +++++++++++++++++ morpheus/messages/tensor_memory.py | 41 ---- tests/conftest.py | 29 +-- tests/test_inference_stage.py | 9 +- tests/test_messages.py | 17 +- tests/test_tensor_memory.py | 174 ++++++++++++++++ 45 files changed, 1521 insertions(+), 977 deletions(-) create mode 100644 morpheus/messages/memory/__init__.py create mode 100644 morpheus/messages/memory/inference_memory.py create mode 100644 morpheus/messages/memory/response_memory.py create mode 100644 morpheus/messages/memory/tensor_memory.py create mode 100644 morpheus/messages/multi_tensor_message.py delete mode 100644 morpheus/messages/tensor_memory.py create mode 100644 tests/test_tensor_memory.py diff --git a/ci/iwyu/mappings.imp b/ci/iwyu/mappings.imp index 7390bc26a2..41ff475679 100644 --- a/ci/iwyu/mappings.imp +++ b/ci/iwyu/mappings.imp @@ -37,6 +37,9 @@ # Protobuf { "include": [ "", private, "", "public" ] }, +# pybind11 +{ "include": [ "", private, "", "public" ] }, + # rxcpp # Hide includes that are exported by { "include": [ "\"rx-includes.hpp\"", private, "", "public" ] }, diff --git a/morpheus/_lib/include/morpheus/messages/memory/inference_memory.hpp b/morpheus/_lib/include/morpheus/messages/memory/inference_memory.hpp index 7b36b9d2a0..67db0b2c06 100644 --- a/morpheus/_lib/include/morpheus/messages/memory/inference_memory.hpp +++ b/morpheus/_lib/include/morpheus/messages/memory/inference_memory.hpp @@ -20,7 +20,10 @@ #include "morpheus/messages/memory/tensor_memory.hpp" #include "morpheus/types.hpp" // for TensorMap -#include +#include // for object + +#include // for size_t +#include // for shared_ptr #include namespace morpheus { @@ -54,7 +57,7 @@ class InferenceMemory : public TensorMemory InferenceMemory(size_t count, TensorMap&& tensors); /** - * @brief Checks if a tensor named `name` exists in `tensors` + * @brief Checks if a tensor named `name` exists in `tensors`. Alias for `has_tensor`. * * @param name * @return true @@ -67,15 +70,17 @@ class InferenceMemory : public TensorMemory /** * @brief Interface proxy, used to insulate python bindings. */ -struct InferenceMemoryInterfaceProxy +struct InferenceMemoryInterfaceProxy : public TensorMemoryInterfaceProxy { /** - * @brief Get the count object + * @brief Create and initialize a InferenceMemory object, and return a shared pointer to the result. Each array in + * `tensors` should be of length `count`. * - * @param self - * @return std::size_t + * @param count : Lenght of each array in `tensors` + * @param tensors : Map of string on to cupy arrays + * @return std::shared_ptr */ - static std::size_t get_count(InferenceMemory& self); + static std::shared_ptr init(std::size_t count, pybind11::object& tensors); }; #pragma GCC visibility pop diff --git a/morpheus/_lib/include/morpheus/messages/memory/inference_memory_fil.hpp b/morpheus/_lib/include/morpheus/messages/memory/inference_memory_fil.hpp index 26e285627c..4a68a29a42 100644 --- a/morpheus/_lib/include/morpheus/messages/memory/inference_memory_fil.hpp +++ b/morpheus/_lib/include/morpheus/messages/memory/inference_memory_fil.hpp @@ -25,7 +25,6 @@ #include #include -#include namespace morpheus { /****** Component public implementations *******************/ @@ -52,7 +51,7 @@ class InferenceMemoryFIL : public InferenceMemory * @param seq_ids : Ids used to index from an inference input to a message. Necessary since there can be more * inference inputs than messages (i.e., if some messages get broken into multiple inference requests) */ - InferenceMemoryFIL(size_t count, TensorObject input__0, TensorObject seq_ids); + InferenceMemoryFIL(size_t count, TensorObject&& input__0, TensorObject&& seq_ids); /** * @brief Returns the 'input__0' tensor, throws a `std::runtime_error` if it does not exist @@ -73,19 +72,18 @@ class InferenceMemoryFIL : public InferenceMemory /** * @brief Sets a tensor named 'input__0' * - * @param input_ids - * @throw std::runtime_error - * @throw std::runtime_error + * @param input__0 + * @throws std::length_error If the number of rows in `input__0` does not match `count`. */ - void set_input__0(TensorObject input_ids); + void set_input__0(TensorObject&& input__0); /** * @brief Sets a tensor named 'seq_ids' * * @param seq_ids - * @throw std::runtime_error + * @throws std::length_error If the number of rows in `seq_ids` does not match `count`. */ - void set_seq_ids(TensorObject seq_ids); + void set_seq_ids(TensorObject&& seq_ids); }; /****** InferenceMemoryFILInterfaceProxy *************************/ @@ -93,7 +91,7 @@ class InferenceMemoryFIL : public InferenceMemory /** * @brief Interface proxy, used to insulate python bindings */ -struct InferenceMemoryFILInterfaceProxy +struct InferenceMemoryFILInterfaceProxy : public InferenceMemoryInterfaceProxy { /** * @brief Create and initialize an InferenceMemoryFIL object, and return a shared pointer to the result @@ -108,23 +106,6 @@ struct InferenceMemoryFILInterfaceProxy pybind11::object input__0, pybind11::object seq_ids); - /** - * Get messages count in the inference memory instance - * - * @param self - * @return std::size_t - */ - static std::size_t count(InferenceMemoryFIL& self); - - /** - * Return the requested tensor for a given name - * - * @param self - * @param name Tensor name - * @return TensorObject - */ - static TensorObject get_tensor(InferenceMemoryFIL& self, const std::string& name); - /** * @brief Returns the 'input__0' as cupy array * diff --git a/morpheus/_lib/include/morpheus/messages/memory/inference_memory_nlp.hpp b/morpheus/_lib/include/morpheus/messages/memory/inference_memory_nlp.hpp index 07ba1e87b9..983d5c6ba6 100644 --- a/morpheus/_lib/include/morpheus/messages/memory/inference_memory_nlp.hpp +++ b/morpheus/_lib/include/morpheus/messages/memory/inference_memory_nlp.hpp @@ -52,12 +52,13 @@ class InferenceMemoryNLP : public InferenceMemory * @param seq_ids : Ids used to index from an inference input to a message. Necessary since there can be more inference inputs than messages (i.e., if some messages get broken into multiple inference requests) */ - InferenceMemoryNLP(std::size_t count, TensorObject input_ids, TensorObject input_mask, TensorObject seq_ids); + InferenceMemoryNLP(std::size_t count, TensorObject&& input_ids, TensorObject&& input_mask, TensorObject&& seq_ids); /** * @brief Get the input ids object * * @return const TensorObject& + * @throws std::runtime_error If no tensor named "input_ids" exists */ const TensorObject& get_input_ids() const; @@ -65,6 +66,7 @@ class InferenceMemoryNLP : public InferenceMemory * @brief Get the input mask object * * @return const TensorObject& + * @throws std::runtime_error If no tensor named "input_mask" exists */ const TensorObject& get_input_mask() const; @@ -72,6 +74,7 @@ class InferenceMemoryNLP : public InferenceMemory * @brief Get the seq ids object * * @return const TensorObject& + * @throws std::runtime_error If no tensor named "seq_ids" exists */ const TensorObject& get_seq_ids() const; @@ -79,22 +82,25 @@ class InferenceMemoryNLP : public InferenceMemory * @brief Set the input ids object * * @param input_ids + * @throws std::length_error If the number of rows in `input_ids` does not match `count`. */ - void set_input_ids(TensorObject input_ids); + void set_input_ids(TensorObject&& input_ids); /** * @brief Set the input mask object * * @param input_mask + * @throws std::length_error If the number of rows in `input_mask` does not match `count`. */ - void set_input_mask(TensorObject input_mask); + void set_input_mask(TensorObject&& input_mask); /** * @brief Set the seq ids object * * @param seq_ids + * @throws std::length_error If the number of rows in `seq_ids` does not match `count`. */ - void set_seq_ids(TensorObject seq_ids); + void set_seq_ids(TensorObject&& seq_ids); }; /****** InferenceMemoryNLPInterfaceProxy********************/ @@ -102,7 +108,7 @@ class InferenceMemoryNLP : public InferenceMemory /** * @brief Interface proxy, used to insulate python bindings. */ -struct InferenceMemoryNLPInterfaceProxy +struct InferenceMemoryNLPInterfaceProxy : public InferenceMemoryInterfaceProxy { /** * @brief Create and initialize an InferenceMemoryNLP object, and return a shared pointer to the result @@ -119,19 +125,12 @@ struct InferenceMemoryNLPInterfaceProxy pybind11::object input_mask, pybind11::object seq_ids); - /** - * Get messages count in the inference memory object - * - * @param self - * @return std::size_t - */ - static std::size_t count(InferenceMemoryNLP& self); - /** * @brief : Returns token-ids for each string padded with 0s to max_length as python object * * @param self * @return pybind11::object + * @throws pybind11::attribute_error */ static pybind11::object get_input_ids(InferenceMemoryNLP& self); @@ -148,6 +147,7 @@ struct InferenceMemoryNLPInterfaceProxy * * @param self * @return pybind11::object + * @throws pybind11::attribute_error */ static pybind11::object get_input_mask(InferenceMemoryNLP& self); @@ -164,6 +164,7 @@ struct InferenceMemoryNLPInterfaceProxy * * @param self * @return pybind11::object + * @throws pybind11::attribute_error */ static pybind11::object get_seq_ids(InferenceMemoryNLP& self); diff --git a/morpheus/_lib/include/morpheus/messages/memory/response_memory.hpp b/morpheus/_lib/include/morpheus/messages/memory/response_memory.hpp index 1c0a12da5e..d8e87462f2 100644 --- a/morpheus/_lib/include/morpheus/messages/memory/response_memory.hpp +++ b/morpheus/_lib/include/morpheus/messages/memory/response_memory.hpp @@ -18,12 +18,12 @@ #pragma once #include "morpheus/messages/memory/tensor_memory.hpp" -#include "morpheus/objects/tensor_object.hpp" // for TensorObject -#include "morpheus/types.hpp" // for TensorMap +#include "morpheus/types.hpp" // for TensorMap -#include +#include // for object #include // for size_t +#include // for shared_ptr #include namespace morpheus { @@ -57,7 +57,7 @@ class ResponseMemory : public TensorMemory ResponseMemory(size_t count, TensorMap&& tensors); /** - * @brief Checks if a tensor named `name` exists in `tensors` + * @brief Checks if a tensor named `name` exists in `tensors`. Alias for `has_tensor`. * * @param name * @return true @@ -72,25 +72,17 @@ class ResponseMemory : public TensorMemory * @brief Interface proxy, used to insulate python bindings. * */ -struct ResponseMemoryInterfaceProxy +struct ResponseMemoryInterfaceProxy : public TensorMemoryInterfaceProxy { /** - * @brief Get the output object + * @brief Create and initialize a ResponseMemory object, and return a shared pointer to the result. Each array in + * `cupy_tensors` should be of length `count`. * - * @param self - * @param name - * @return pybind11::object - */ - static pybind11::object get_output(ResponseMemory& self, const std::string& name); - - /** - * @brief Get the output tensor object - * - * @param self - * @param name - * @return TensorObject + * @param count : Lenght of each array in `cupy_tensors` + * @param cupy_tensors : Map of string on to cupy arrays + * @return std::shared_ptr */ - static TensorObject get_output_tensor(ResponseMemory& self, const std::string& name); + static std::shared_ptr init(std::size_t count, pybind11::object& tensors); }; #pragma GCC visibility pop diff --git a/morpheus/_lib/include/morpheus/messages/memory/response_memory_probs.hpp b/morpheus/_lib/include/morpheus/messages/memory/response_memory_probs.hpp index ef81feb672..15d3f1e2fb 100644 --- a/morpheus/_lib/include/morpheus/messages/memory/response_memory_probs.hpp +++ b/morpheus/_lib/include/morpheus/messages/memory/response_memory_probs.hpp @@ -50,7 +50,7 @@ class ResponseMemoryProbs : public ResponseMemory * @param count * @param probs */ - ResponseMemoryProbs(size_t count, TensorObject probs); + ResponseMemoryProbs(size_t count, TensorObject&& probs); /** * @brief Construct a new Response Memory Probs object * @@ -60,9 +60,10 @@ class ResponseMemoryProbs : public ResponseMemory ResponseMemoryProbs(size_t count, TensorMap&& tensors); /** - * @brief Returns the tensor named 'probs', throws a `std::runtime_error` if it does not exist + * @brief Returns the tensor named 'probs'. alias for `get_tensor("probs")` * * @return const TensorObject& + * @throws std::runtime_error If no tensor named "probs" exists */ const TensorObject& get_probs() const; @@ -70,8 +71,9 @@ class ResponseMemoryProbs : public ResponseMemory * @brief Update the tensor named 'probs' * * @param probs + * @throws std::length_error If the number of rows in `probs` does not match `count`. */ - void set_probs(TensorObject probs); + void set_probs(TensorObject&& probs); }; /****** ResponseMemoryProbsInterfaceProxy*******************/ @@ -79,7 +81,7 @@ class ResponseMemoryProbs : public ResponseMemory /** * @brief Interface proxy, used to insulate python bindings */ -struct ResponseMemoryProbsInterfaceProxy +struct ResponseMemoryProbsInterfaceProxy : public ResponseMemoryInterfaceProxy { /** * @brief Create and initialize a ResponseMemoryProbs object, and return a shared pointer to the result @@ -91,23 +93,16 @@ struct ResponseMemoryProbsInterfaceProxy static std::shared_ptr init(cudf::size_type count, pybind11::object probs); /** - * @brief Get messages count in the response memory probs object - * - * @param self - * @return std::size_t - */ - static std::size_t count(ResponseMemoryProbs& self); - - /** - * @brief Get the response memory probs object + * @brief Get the response memory probs object () * * @param self * @return pybind11::object + * @throws pybind11::key_error When no tensor named "probs" exists. */ static pybind11::object get_probs(ResponseMemoryProbs& self); /** - * @brief Set the response memory probs object + * @brief Set the response memory probs object (alias for `set_tensor("probs", cupy_values)`) * * @param self * @param cupy_values diff --git a/morpheus/_lib/include/morpheus/messages/memory/tensor_memory.hpp b/morpheus/_lib/include/morpheus/messages/memory/tensor_memory.hpp index bc3a130362..64ae44e0d1 100644 --- a/morpheus/_lib/include/morpheus/messages/memory/tensor_memory.hpp +++ b/morpheus/_lib/include/morpheus/messages/memory/tensor_memory.hpp @@ -17,9 +17,14 @@ #pragma once -#include "morpheus/types.hpp" // for TensorMap, TensorIndex +#include "morpheus/objects/tensor_object.hpp" // for TensorObject +#include "morpheus/types.hpp" // for TensorMap, TensorIndex +#include "morpheus/utilities/cupy_util.hpp" // for CupyUtil + +#include // for object #include // for size_t +#include // for shared_ptr #include #include // for pair #include @@ -34,6 +39,7 @@ namespace morpheus { * @file */ +#pragma GCC visibility push(default) /** * @brief Container for holding a collection of named `TensorObject`s in a `std::map` keyed by name. * Base class for `InferenceMemory` & `ResponseMemory` @@ -79,7 +85,137 @@ class TensorMemory */ TensorMap copy_tensor_ranges(const std::vector>& ranges, size_t num_selected_rows) const; + + /** + * @brief Get the tensor object identified by `name` + * + * @param name + * @return TensorObject& + * @throws std::runtime_error If no tensor matching `name` exists + */ + TensorObject& get_tensor(const std::string& name); + + /** + * @brief Get the tensor object identified by `name` + * + * @param name + * @return const TensorObject& + * @throws std::runtime_error If no tensor matching `name` exists + */ + const TensorObject& get_tensor(const std::string& name) const; + + /** + * @brief Set the tensor object identified by `name` + * + * @param name + * @param tensor + * @throws std::length_error If the number of rows in `tensor` does not match `count`. + */ + void set_tensor(const std::string& name, TensorObject&& tensor); + + /** + * @brief Set the tensors object + * + * @param tensors + * @throws std::length_error If the number of rows in the `tensors` do not match `count`. + */ + void set_tensors(TensorMap&& tensors); + + protected: + /** + * @brief Checks if the number of rows in `tensor` matches `count` + * + * @param tensor + * @throws std::length_error If the number of rows in `tensor` do not match `count`. + */ + void check_tensor_length(const TensorObject& tensor); + + /** + * @brief Checks each tesnor in `tensors` verifying that the number of rows matches count + * + * @param tensors + * @throws std::length_error If the number of rows in the `tensors` do not match `count`. + */ + void check_tensors_length(const TensorMap& tensors); + + /** + * @brief Verify that a tensor identified by `name` exists, throws a `runtime_error` othwerwise. + * + * @param name + * @throws std::runtime_error If no tensor matching `name` exists + */ + void verify_tensor_exists(const std::string& name) const; +}; + +/****** TensorMemoryInterfaceProxy *************************/ +/** + * @brief Interface proxy, used to insulate python bindings. + */ +struct TensorMemoryInterfaceProxy +{ + /** + * @brief Create and initialize a TensorMemory object, and return a shared pointer to the result. Each array in + * `tensors` should be of length `count`. + * + * @param count : Lenght of each array in `tensors` + * @param tensors : Map of string on to cupy arrays + * @return std::shared_ptr + */ + static std::shared_ptr init(std::size_t count, pybind11::object& tensors); + + /** + * @brief Get the count object + * + * @param self + * @return std::size_t + */ + static std::size_t get_count(TensorMemory& self); + + /** + * @brief Get the tensors converted to CuPy arrays. Pybind11 will convert the std::map to a Python dict. + * + * @param self + * @return py_tensor_map_t + */ + static CupyUtil::py_tensor_map_t get_tensors(TensorMemory& self); + + /** + * @brief Set the tensors object converting a map of CuPy arrays to Tensors + * + * @param self + * @param tensors + */ + static void set_tensors(TensorMemory& self, CupyUtil::py_tensor_map_t tensors); + + /** + * @brief Get the tensor object identified by `name` + * + * @param self + * @param name + * @return pybind11::object + * @throws pybind11::key_error When no matching tensor exists. + */ + static pybind11::object get_tensor(TensorMemory& self, const std::string name); + + /** + * @brief Same as `get_tensor` but used when the method is being bound to a python property + * + * @param self + * @param name + * @return pybind11::object + * @throws pybind11::attribute_error When no matching tensor exists. + */ + static pybind11::object get_tensor_property(TensorMemory& self, const std::string name); + + /** + * @brief Set the tensor object identified by `name` + * + * @param self + * @param cupy_tensor + */ + static void set_tensor(TensorMemory& self, const std::string name, const pybind11::object& cupy_tensor); }; +#pragma GCC visibility pop /** @} */ // end of group } // namespace morpheus diff --git a/morpheus/_lib/include/morpheus/messages/multi_inference.hpp b/morpheus/_lib/include/morpheus/messages/multi_inference.hpp index e1732a83b8..460602301a 100644 --- a/morpheus/_lib/include/morpheus/messages/multi_inference.hpp +++ b/morpheus/_lib/include/morpheus/messages/multi_inference.hpp @@ -24,7 +24,6 @@ #include "morpheus/objects/tensor_object.hpp" #include -#include #include #include @@ -72,18 +71,20 @@ class MultiInferenceMessage : public DerivedMultiMessage memory, cudf::size_type offset, cudf::size_type count); - - /** - * @brief Get inference memory object shared pointer - * - * @param self - * @return std::shared_ptr - */ - static std::shared_ptr memory(MultiInferenceMessage& self); - - /** - * @brief Get message offset - * - * @param self - * @return std::size_t - */ - static std::size_t offset(MultiInferenceMessage& self); - - /** - * @brief Get messages count - * - * @param self - * @return std::size_t - */ - static std::size_t count(MultiInferenceMessage& self); - - /** - * @brief Get 'input_id' tensor as a python object, throws a `std::runtime_error` if it does not exist - * - * @param self - * @param name - * @return pybind11::object - */ - static pybind11::object get_input(MultiInferenceMessage& self, const std::string& name); - - /** - * @brief Get the shared pointer of a sliced batches based on offsets supplied. Automatically calculates the correct - * `mess_offset` and `mess_count` - * - * @param self - * @param start : Start offset address - * @param stop : Stop offset address - * @return std::shared_ptr - */ - static std::shared_ptr get_slice(MultiInferenceMessage& self, - std::size_t start, - std::size_t stop); }; #pragma GCC visibility pop /** @} */ // end of group diff --git a/morpheus/_lib/include/morpheus/messages/multi_inference_fil.hpp b/morpheus/_lib/include/morpheus/messages/multi_inference_fil.hpp index 2bdfc2c69c..531071134e 100644 --- a/morpheus/_lib/include/morpheus/messages/multi_inference_fil.hpp +++ b/morpheus/_lib/include/morpheus/messages/multi_inference_fil.hpp @@ -23,6 +23,7 @@ #include "morpheus/objects/tensor_object.hpp" #include +#include // for object #include // for size_t #include @@ -69,6 +70,7 @@ class MultiInferenceFILMessage : public MultiInferenceMessage * * @param name * @return const TensorObject + * @throws std::runtime_error If no tensor named "input__0" exists */ const TensorObject get_input__0() const; @@ -84,6 +86,7 @@ class MultiInferenceFILMessage : public MultiInferenceMessage * * @param name * @return const TensorObject + * @throws std::runtime_error If no tensor named "seq_ids" exists */ const TensorObject get_seq_ids() const; @@ -99,7 +102,7 @@ class MultiInferenceFILMessage : public MultiInferenceMessage /** * @brief Interface proxy, used to insulate python bindings. */ -struct MultiInferenceFILMessageInterfaceProxy +struct MultiInferenceFILMessageInterfaceProxy : public MultiInferenceMessageInterfaceProxy { /** * @brief Create and initialize a MultiInferenceFILMessage, and return a shared pointer to the result @@ -121,28 +124,22 @@ struct MultiInferenceFILMessageInterfaceProxy cudf::size_type count); /** - * @brief Returns a shared pointer of a inference memory object + * @brief Get 'input__0' tensor as a python object * * @param self - * @return std::shared_ptr + * @return pybind11::object + * @throws pybind11::attribute_error When no tensor named "input__0" exists. */ - static std::shared_ptr memory(MultiInferenceFILMessage& self); + static pybind11::object input__0(MultiInferenceFILMessage& self); /** - * @brief Message offset in inference memory object + * @brief Get 'seq_ids' tensor as a python object * * @param self - * @return std::size_t + * @return pybind11::object + * @throws pybind11::attribute_error When no tensor named "seq_ids" exists. */ - static std::size_t offset(MultiInferenceFILMessage& self); - - /** - * @brief Message count in inference memory object - * - * @param self - * @return std::size_t - */ - static std::size_t count(MultiInferenceFILMessage& self); + static pybind11::object seq_ids(MultiInferenceFILMessage& self); }; #pragma GCC visibility pop /** @} */ // end of group diff --git a/morpheus/_lib/include/morpheus/messages/multi_inference_nlp.hpp b/morpheus/_lib/include/morpheus/messages/multi_inference_nlp.hpp index a49cb86362..2ed51f63cd 100644 --- a/morpheus/_lib/include/morpheus/messages/multi_inference_nlp.hpp +++ b/morpheus/_lib/include/morpheus/messages/multi_inference_nlp.hpp @@ -70,6 +70,7 @@ class MultiInferenceNLPMessage : public MultiInferenceMessage * * @param name * @return const TensorObject + * @throws std::runtime_error If no tensor named "input_ids" exists */ const TensorObject get_input_ids() const; @@ -85,6 +86,7 @@ class MultiInferenceNLPMessage : public MultiInferenceMessage * * @param name * @return const TensorObject + * @throws std::runtime_error If no tensor named "input_mask" exists */ const TensorObject get_input_mask() const; @@ -100,6 +102,7 @@ class MultiInferenceNLPMessage : public MultiInferenceMessage * * @param name * @return const TensorObject + * @throws std::runtime_error If no tensor named "seq_ids" exists */ const TensorObject get_seq_ids() const; @@ -115,7 +118,7 @@ class MultiInferenceNLPMessage : public MultiInferenceMessage /** * @brief Interface proxy, used to insulate python bindings. */ -struct MultiInferenceNLPMessageInterfaceProxy +struct MultiInferenceNLPMessageInterfaceProxy : public MultiInferenceMessageInterfaceProxy { /** * @brief Create and initialize a MultiInferenceNLPMessage, and return a shared pointer to the result @@ -137,50 +140,29 @@ struct MultiInferenceNLPMessageInterfaceProxy cudf::size_type count); /** - * @brief Get inference memory object shared pointer - * - * @param self - * @return std::shared_ptr - */ - static std::shared_ptr memory(MultiInferenceNLPMessage& self); - - /** - * @brief Get message offset - * - * @param self - * @return std::size_t - */ - static std::size_t offset(MultiInferenceNLPMessage& self); - - /** - * @brief Get messages count - * - * @param self - * @return std::size_t - */ - static std::size_t count(MultiInferenceNLPMessage& self); - - /** - * @brief Get 'input_ids' tensor as a python object, throws a `std::runtime_error` if it does not exist + * @brief Get 'input_ids' tensor as a python object * * @param self * @return pybind11::object + * @throws pybind11::attribute_error When no tensor named "input_ids" exists. */ static pybind11::object input_ids(MultiInferenceNLPMessage& self); /** - * @brief Get 'input_mask' tensor as a python object, throws a `std::runtime_error` if it does not exist + * @brief Get 'input_mask' tensor as a python object * * @param self * @return pybind11::object + * @throws pybind11::attribute_error When no tensor named "input_mask" exists. */ static pybind11::object input_mask(MultiInferenceNLPMessage& self); /** - * @brief Get 'seq_ids' tensor as a python object, throws a `std::runtime_error` if it does not exist + * @brief Get 'seq_ids' tensor as a python object * * @param self * @return pybind11::object + * @throws pybind11::attribute_error When no tensor named "seq_ids" exists. */ static pybind11::object seq_ids(MultiInferenceNLPMessage& self); }; diff --git a/morpheus/_lib/include/morpheus/messages/multi_response.hpp b/morpheus/_lib/include/morpheus/messages/multi_response.hpp index a48e25597f..87a250993a 100644 --- a/morpheus/_lib/include/morpheus/messages/multi_response.hpp +++ b/morpheus/_lib/include/morpheus/messages/multi_response.hpp @@ -73,18 +73,20 @@ class MultiResponseMessage : public DerivedMultiMessage - */ - static std::shared_ptr memory(MultiResponseMessage& self); - - /** - * @brief Message offset in response memory probs object - * - * @param self - * @return std::size_t - */ - static std::size_t offset(MultiResponseMessage& self); - - /** - * @brief Messages count in response memory probs object - * - * @param self - * @return std::size_t - */ - static std::size_t count(MultiResponseMessage& self); - /** * @brief Returns the output tensor for a given name * * @param self * @param name : Tensor name * @return pybind11::object + * @throws pybind11::key_error When no matching tensor exists. */ static pybind11::object get_output(MultiResponseMessage& self, const std::string& name); }; diff --git a/morpheus/_lib/include/morpheus/messages/multi_response_probs.hpp b/morpheus/_lib/include/morpheus/messages/multi_response_probs.hpp index 2b6b252402..5d15879504 100644 --- a/morpheus/_lib/include/morpheus/messages/multi_response_probs.hpp +++ b/morpheus/_lib/include/morpheus/messages/multi_response_probs.hpp @@ -90,7 +90,7 @@ class MultiResponseProbsMessage : public DerivedMultiMessage - */ - static std::shared_ptr memory(MultiResponseProbsMessage& self); - - /** - * @brief Message offset in response memory probs object - * - * @param self - * @return std::size_t - */ - static std::size_t offset(MultiResponseProbsMessage& self); - - /** - * @brief Messages count in response memory probs object - * - * @param self - * @return std::size_t - */ - static std::size_t count(MultiResponseProbsMessage& self); - /** * @brief Return the `probs` (probabilities) output tensor * * @param self * @return pybind11::object + * @throws pybind11::attribute_error When no tensor named "probs" exists. */ static pybind11::object probs(MultiResponseProbsMessage& self); }; diff --git a/morpheus/_lib/include/morpheus/messages/multi_tensor.hpp b/morpheus/_lib/include/morpheus/messages/multi_tensor.hpp index 043fc2081b..ca146c0c52 100644 --- a/morpheus/_lib/include/morpheus/messages/multi_tensor.hpp +++ b/morpheus/_lib/include/morpheus/messages/multi_tensor.hpp @@ -22,6 +22,8 @@ #include "morpheus/messages/multi.hpp" #include "morpheus/objects/tensor_object.hpp" +#include // for object + #include #include #include @@ -83,27 +85,30 @@ class MultiTensorMessage : public DerivedMultiMessage + */ + static std::shared_ptr init(std::shared_ptr meta, + std::size_t mess_offset, + std::size_t mess_count, + std::shared_ptr memory, + std::size_t offset, + std::size_t count); + + /** + * @brief Returns a shared pointer of a tensor memory object + * + * @return std::shared_ptr + */ + static std::shared_ptr memory(MultiTensorMessage& self); + + /** + * @brief Message offset in tensor memory object + * + * @param self + * @return std::size_t + */ + static std::size_t offset(MultiTensorMessage& self); + + /** + * @brief Messages count in tensor memory object + * + * @param self + * @return std::size_t + */ + static std::size_t count(MultiTensorMessage& self); + + /** + * @brief Returns the tensor tensor for a given name + * + * @param self + * @param name : Tensor name + * @return pybind11::object + * @throws pybind11::key_error When no matching tensor exists. + */ + static pybind11::object get_tensor(MultiTensorMessage& self, const std::string& name); + + /** + * @brief Same as `get_tensor` but used when the method is being bound to a python property + * + * @param self + * @param name + * @return pybind11::object + * @throws pybind11::attribute_error When no matching tensor exists. + */ + static pybind11::object get_tensor_property(MultiTensorMessage& self, const std::string name); +}; + #pragma GCC visibility pop /** @} */ // end of group } // namespace morpheus diff --git a/morpheus/_lib/include/morpheus/utilities/cupy_util.hpp b/morpheus/_lib/include/morpheus/utilities/cupy_util.hpp index 9e6da3c982..48ddc9e991 100644 --- a/morpheus/_lib/include/morpheus/utilities/cupy_util.hpp +++ b/morpheus/_lib/include/morpheus/utilities/cupy_util.hpp @@ -22,6 +22,9 @@ #include #include +#include +#include + namespace morpheus { /****** Component public implementations *******************/ /****** CupyUtil****************************************/ @@ -37,22 +40,49 @@ namespace morpheus { */ struct CupyUtil { + using tensor_map_t = std::map; + using py_tensor_map_t = std::map; + static pybind11::object cp_module; // handle to cupy module /** - * TODO(Documentation) + * @brief Import and return the cupy module. Requires GIL to have already been aqcuired. + * + * @return pybind11::module_ */ static pybind11::module_ get_cp(); /** - * TODO(Documentation) + * @brief Convert a TensorObject to a CuPy array. Requires GIL to have already been aqcuired. + * + * @param tensor + * @return pybind11::object */ static pybind11::object tensor_to_cupy(const TensorObject& tensor); /** - * TODO(Documentation) + * @brief Convert a CuPy array into a TensorObject. Requires GIL to have already been aqcuired. + * + * @param cupy_array + * @return TensorObject */ static TensorObject cupy_to_tensor(pybind11::object cupy_array); + + /** + * @brief Convert a map of CuPy arrays into a map of TensorObjects. Requires GIL to have already been aqcuired. + * + * @param cupy_tensors + * @return tensor_map_t + */ + static tensor_map_t cupy_to_tensors(const py_tensor_map_t& cupy_tensors); + + /** + * @brief Convert a map of TensorObjects into a map of CuPy arrays. Requires GIL to have already been aqcuired. + * + * @param tensors + * @return py_tensor_map_t + */ + static py_tensor_map_t tensors_to_cupy(const tensor_map_t& tensors); }; /** @} */ // end of group } // namespace morpheus diff --git a/morpheus/_lib/src/messages/memory/inference_memory.cpp b/morpheus/_lib/src/messages/memory/inference_memory.cpp index 180c3641a7..bb587a6282 100644 --- a/morpheus/_lib/src/messages/memory/inference_memory.cpp +++ b/morpheus/_lib/src/messages/memory/inference_memory.cpp @@ -19,6 +19,10 @@ // for TensorObject #include "morpheus/objects/tensor_object.hpp" // IWYU pragma: keep +#include "morpheus/utilities/cupy_util.hpp" // for CupyUtil::cupy_to_tensors, CupyUtil::py_tensor_map_t + +#include +#include // IWYU pragma: keep #include #include // for move @@ -35,8 +39,17 @@ bool InferenceMemory::has_input(const std::string& name) const } /****** InferenceMemoryInterfaceProxy *************************/ -std::size_t InferenceMemoryInterfaceProxy::get_count(InferenceMemory& self) +std::shared_ptr InferenceMemoryInterfaceProxy::init(std::size_t count, pybind11::object& tensors) { - return self.count; + if (tensors.is_none()) + { + return std::make_shared(count); + } + else + { + return std::make_shared( + count, std::move(CupyUtil::cupy_to_tensors(tensors.cast()))); + } } + } // namespace morpheus diff --git a/morpheus/_lib/src/messages/memory/inference_memory_fil.cpp b/morpheus/_lib/src/messages/memory/inference_memory_fil.cpp index a73a287c0e..123ca62d6c 100644 --- a/morpheus/_lib/src/messages/memory/inference_memory_fil.cpp +++ b/morpheus/_lib/src/messages/memory/inference_memory_fil.cpp @@ -18,59 +18,44 @@ #include "morpheus/messages/memory/inference_memory_fil.hpp" #include "morpheus/messages/memory/inference_memory.hpp" -#include "morpheus/types.hpp" // for TensorMap -#include "morpheus/utilities/cupy_util.hpp" +#include "morpheus/utilities/cupy_util.hpp" // for CupyUtil #include #include -#include // this->tensors is a map #include -#include // for runtime_error -#include #include namespace morpheus { /****** Component public implementations *******************/ /****** InferenceMemoryFIL****************************************/ -InferenceMemoryFIL::InferenceMemoryFIL(size_t count, TensorObject input__0, TensorObject seq_ids) : +InferenceMemoryFIL::InferenceMemoryFIL(size_t count, TensorObject&& input__0, TensorObject&& seq_ids) : InferenceMemory(count) { - this->tensors["input__0"] = std::move(input__0); - this->tensors["seq_ids"] = std::move(seq_ids); + set_tensor("input__0", std::move(input__0)); + set_tensor("seq_ids", std::move(seq_ids)); } const TensorObject& InferenceMemoryFIL::get_input__0() const { - auto found = this->tensors.find("input__0"); - if (found == this->tensors.end()) - { - throw std::runtime_error("Tensor: 'input__0' not found in memory"); - } - - return found->second; + return get_tensor("input__0"); } -void InferenceMemoryFIL::set_input__0(TensorObject input__0) +void InferenceMemoryFIL::set_input__0(TensorObject&& input__0) { - this->tensors["input__0"] = std::move(input__0); + set_tensor("input__0", std::move(input__0)); } const TensorObject& InferenceMemoryFIL::get_seq_ids() const { - auto found = this->tensors.find("seq_ids"); - if (found == this->tensors.end()) - { - throw std::runtime_error("Tensor: 'seq_ids' not found in memory"); - } - - return found->second; + return get_tensor("seq_ids"); } -void InferenceMemoryFIL::set_seq_ids(TensorObject seq_ids) +void InferenceMemoryFIL::set_seq_ids(TensorObject&& seq_ids) { - this->tensors["seq_ids"] = std::move(seq_ids); + set_tensor("seq_ids", std::move(seq_ids)); } + /****** InferenceMemoryFILInterfaceProxy *************************/ std::shared_ptr InferenceMemoryFILInterfaceProxy::init(cudf::size_type count, pybind11::object input__0, @@ -81,19 +66,9 @@ std::shared_ptr InferenceMemoryFILInterfaceProxy::init(cudf: count, std::move(CupyUtil::cupy_to_tensor(input__0)), std::move(CupyUtil::cupy_to_tensor(seq_ids))); } -std::size_t InferenceMemoryFILInterfaceProxy::count(InferenceMemoryFIL& self) -{ - return self.count; -} - -TensorObject InferenceMemoryFILInterfaceProxy::get_tensor(InferenceMemoryFIL& self, const std::string& name) -{ - return self.tensors[name]; -} - pybind11::object InferenceMemoryFILInterfaceProxy::get_input__0(InferenceMemoryFIL& self) { - return CupyUtil::tensor_to_cupy(self.get_input__0()); + return get_tensor_property(self, "input__0"); } void InferenceMemoryFILInterfaceProxy::set_input__0(InferenceMemoryFIL& self, pybind11::object cupy_values) @@ -103,7 +78,7 @@ void InferenceMemoryFILInterfaceProxy::set_input__0(InferenceMemoryFIL& self, py pybind11::object InferenceMemoryFILInterfaceProxy::get_seq_ids(InferenceMemoryFIL& self) { - return CupyUtil::tensor_to_cupy(self.get_seq_ids()); + return get_tensor_property(self, "seq_ids"); } void InferenceMemoryFILInterfaceProxy::set_seq_ids(InferenceMemoryFIL& self, pybind11::object cupy_values) diff --git a/morpheus/_lib/src/messages/memory/inference_memory_nlp.cpp b/morpheus/_lib/src/messages/memory/inference_memory_nlp.cpp index dde9587675..d5f82225ae 100644 --- a/morpheus/_lib/src/messages/memory/inference_memory_nlp.cpp +++ b/morpheus/_lib/src/messages/memory/inference_memory_nlp.cpp @@ -18,77 +18,56 @@ #include "morpheus/messages/memory/inference_memory_nlp.hpp" #include "morpheus/messages/memory/inference_memory.hpp" -#include "morpheus/types.hpp" // for TensorMap -#include "morpheus/utilities/cupy_util.hpp" +#include "morpheus/utilities/cupy_util.hpp" // for CupyUtil #include // for size_type #include -#include -#include // this->tensors is a map -#include // for runtime_error -#include // for move, pair +#include // for size_t +#include // for move, pair namespace morpheus { /****** Component public implementations *******************/ /****** InferenceMemoryNLP ****************************************/ InferenceMemoryNLP::InferenceMemoryNLP(std::size_t count, - TensorObject input_ids, - TensorObject input_mask, - TensorObject seq_ids) : + TensorObject&& input_ids, + TensorObject&& input_mask, + TensorObject&& seq_ids) : InferenceMemory(count) { - this->tensors["input_ids"] = std::move(input_ids); - this->tensors["input_mask"] = std::move(input_mask); - this->tensors["seq_ids"] = std::move(seq_ids); + set_tensor("input_ids", std::move(input_ids)); + set_tensor("input_mask", std::move(input_mask)); + set_tensor("seq_ids", std::move(seq_ids)); } const TensorObject& InferenceMemoryNLP::get_input_ids() const { - auto found = this->tensors.find("input_ids"); - if (found == this->tensors.end()) - { - throw std::runtime_error("Tensor: 'input_ids' not found in memory"); - } - - return found->second; + return get_tensor("input_ids"); } -void InferenceMemoryNLP::set_input_ids(TensorObject input_ids) +void InferenceMemoryNLP::set_input_ids(TensorObject&& input_ids) { - this->tensors["input_ids"] = std::move(input_ids); + set_tensor("input_ids", std::move(input_ids)); } const TensorObject& InferenceMemoryNLP::get_input_mask() const { - auto found = this->tensors.find("input_mask"); - if (found == this->tensors.end()) - { - throw std::runtime_error("Tensor: 'input_mask' not found in memory"); - } - - return found->second; + return get_tensor("input_mask"); } -void InferenceMemoryNLP::set_input_mask(TensorObject input_mask) +void InferenceMemoryNLP::set_input_mask(TensorObject&& input_mask) { - this->tensors["input_mask"] = std::move(input_mask); + set_tensor("input_mask", std::move(input_mask)); } const TensorObject& InferenceMemoryNLP::get_seq_ids() const { - auto found = this->tensors.find("seq_ids"); - if (found == this->tensors.end()) - { - throw std::runtime_error("Tensor: 'seq_ids' not found in memory"); - } - - return found->second; + return get_tensor("seq_ids"); } -void InferenceMemoryNLP::set_seq_ids(TensorObject seq_ids) +void InferenceMemoryNLP::set_seq_ids(TensorObject&& seq_ids) { - this->tensors["seq_ids"] = std::move(seq_ids); + set_tensor("seq_ids", std::move(seq_ids)); } /****** InferenceMemoryNLPInterfaceProxy *************************/ @@ -104,14 +83,9 @@ std::shared_ptr InferenceMemoryNLPInterfaceProxy::init(cudf: std::move(CupyUtil::cupy_to_tensor(seq_ids))); } -std::size_t InferenceMemoryNLPInterfaceProxy::count(InferenceMemoryNLP& self) -{ - return self.count; -} - pybind11::object InferenceMemoryNLPInterfaceProxy::get_input_ids(InferenceMemoryNLP& self) { - return CupyUtil::tensor_to_cupy(self.get_input_ids()); + return get_tensor_property(self, "input_ids"); } void InferenceMemoryNLPInterfaceProxy::set_input_ids(InferenceMemoryNLP& self, pybind11::object cupy_values) @@ -121,7 +95,7 @@ void InferenceMemoryNLPInterfaceProxy::set_input_ids(InferenceMemoryNLP& self, p pybind11::object InferenceMemoryNLPInterfaceProxy::get_input_mask(InferenceMemoryNLP& self) { - return CupyUtil::tensor_to_cupy(self.get_input_mask()); + return get_tensor_property(self, "input_mask"); } void InferenceMemoryNLPInterfaceProxy::set_input_mask(InferenceMemoryNLP& self, pybind11::object cupy_values) @@ -131,7 +105,7 @@ void InferenceMemoryNLPInterfaceProxy::set_input_mask(InferenceMemoryNLP& self, pybind11::object InferenceMemoryNLPInterfaceProxy::get_seq_ids(InferenceMemoryNLP& self) { - return CupyUtil::tensor_to_cupy(self.get_seq_ids()); + return get_tensor_property(self, "seq_ids"); } void InferenceMemoryNLPInterfaceProxy::set_seq_ids(InferenceMemoryNLP& self, pybind11::object cupy_values) diff --git a/morpheus/_lib/src/messages/memory/response_memory.cpp b/morpheus/_lib/src/messages/memory/response_memory.cpp index aed947ce00..9144570ccb 100644 --- a/morpheus/_lib/src/messages/memory/response_memory.cpp +++ b/morpheus/_lib/src/messages/memory/response_memory.cpp @@ -19,7 +19,8 @@ #include "morpheus/utilities/cupy_util.hpp" -#include +#include +#include // IWYU pragma: keep #include #include // for move @@ -36,25 +37,17 @@ bool ResponseMemory::has_output(const std::string& name) const } /****** ResponseMemoryInterfaceProxy *************************/ -pybind11::object ResponseMemoryInterfaceProxy::get_output(ResponseMemory& self, const std::string& name) +std::shared_ptr ResponseMemoryInterfaceProxy::init(std::size_t count, pybind11::object& tensors) { - // Directly return the tensor object - if (!self.has_tensor(name)) + if (tensors.is_none()) { - throw pybind11::key_error(); + return std::make_shared(count); } - - return CupyUtil::tensor_to_cupy(self.tensors[name]); -} - -TensorObject ResponseMemoryInterfaceProxy::get_output_tensor(ResponseMemory& self, const std::string& name) -{ - // Directly return the tensor object - if (!self.has_tensor(name)) + else { - throw pybind11::key_error(); + return std::make_shared( + count, std::move(CupyUtil::cupy_to_tensors(tensors.cast()))); } - - return self.tensors[name]; } + } // namespace morpheus diff --git a/morpheus/_lib/src/messages/memory/response_memory_probs.cpp b/morpheus/_lib/src/messages/memory/response_memory_probs.cpp index 2a027311b6..26a4cbfeb8 100644 --- a/morpheus/_lib/src/messages/memory/response_memory_probs.cpp +++ b/morpheus/_lib/src/messages/memory/response_memory_probs.cpp @@ -24,18 +24,16 @@ #include #include -#include // this->tensors is a map #include #include -#include // for runtime_error #include namespace morpheus { /****** Component public implementations *******************/ /****** ResponseMemoryProbs****************************************/ -ResponseMemoryProbs::ResponseMemoryProbs(size_t count, TensorObject probs) : ResponseMemory(count) +ResponseMemoryProbs::ResponseMemoryProbs(size_t count, TensorObject&& probs) : ResponseMemory(count) { - this->tensors["probs"] = std::move(probs); + set_tensor("probs", std::move(probs)); } ResponseMemoryProbs::ResponseMemoryProbs(size_t count, TensorMap&& tensors) : ResponseMemory(count, std::move(tensors)) @@ -45,18 +43,12 @@ ResponseMemoryProbs::ResponseMemoryProbs(size_t count, TensorMap&& tensors) : Re const TensorObject& ResponseMemoryProbs::get_probs() const { - auto found = this->tensors.find("probs"); - if (found == this->tensors.end()) - { - throw std::runtime_error("Tensor: 'probs' not found in memory"); - } - - return found->second; + return get_tensor("probs"); } -void ResponseMemoryProbs::set_probs(TensorObject probs) +void ResponseMemoryProbs::set_probs(TensorObject&& probs) { - this->tensors["probs"] = std::move(probs); + set_tensor("probs", std::move(probs)); } /****** ResponseMemoryProbsInterfaceProxy *************************/ @@ -67,14 +59,9 @@ std::shared_ptr ResponseMemoryProbsInterfaceProxy::init(cud return std::make_shared(count, std::move(CupyUtil::cupy_to_tensor(probs))); } -std::size_t ResponseMemoryProbsInterfaceProxy::count(ResponseMemoryProbs& self) -{ - return self.count; -} - pybind11::object ResponseMemoryProbsInterfaceProxy::get_probs(ResponseMemoryProbs& self) { - return CupyUtil::tensor_to_cupy(self.get_probs()); + return get_tensor_property(self, "probs"); } void ResponseMemoryProbsInterfaceProxy::set_probs(ResponseMemoryProbs& self, pybind11::object cupy_values) diff --git a/morpheus/_lib/src/messages/memory/tensor_memory.cpp b/morpheus/_lib/src/messages/memory/tensor_memory.cpp index a43a917ff8..1ac4744d6e 100644 --- a/morpheus/_lib/src/messages/memory/tensor_memory.cpp +++ b/morpheus/_lib/src/messages/memory/tensor_memory.cpp @@ -15,19 +15,31 @@ * limitations under the License. */ -#include "morpheus/messages/memory/tensor_memory.hpp" +#include "morpheus/messages/memory/tensor_memory.hpp" // IWYU pragma: associated #include "morpheus/objects/tensor_object.hpp" // for TensorObject +#include "morpheus/utilities/cupy_util.hpp" // for CupyUtil +#include "morpheus/utilities/string_util.hpp" // for MORPHEUS_CONCAT_STR + +#include +#include // for attribute_error, key_error +#include // IWYU pragma: keep #include +#include // needed by MORPHEUS_CONCAT_STR +#include // for std::length_error #include #include +// IWYU pragma: no_include namespace morpheus { /****** Component public implementations *******************/ /****** TensorMemory****************************************/ TensorMemory::TensorMemory(size_t count) : count(count) {} -TensorMemory::TensorMemory(size_t count, TensorMap&& tensors) : count(count), tensors(std::move(tensors)) {} +TensorMemory::TensorMemory(size_t count, TensorMap&& tensors) : count(count), tensors(std::move(tensors)) +{ + check_tensors_length(this->tensors); +} bool TensorMemory::has_tensor(const std::string& name) const { @@ -46,4 +58,113 @@ TensorMap TensorMemory::copy_tensor_ranges(const std::vectorcount) + { + throw std::length_error{MORPHEUS_CONCAT_STR("The number rows in tensor " + << tensor.shape(0) << " does not match TensorMemory.count of " + << this->count)}; + } +} + +void TensorMemory::verify_tensor_exists(const std::string& name) const +{ + if (!has_tensor(name)) + { + throw std::runtime_error(MORPHEUS_CONCAT_STR("Tensor: '" << name << "' not found in memory")); + } +} + +const TensorObject& TensorMemory::get_tensor(const std::string& name) const +{ + verify_tensor_exists(name); + return tensors.at(name); +} + +TensorObject& TensorMemory::get_tensor(const std::string& name) +{ + verify_tensor_exists(name); + return tensors[name]; +} + +void TensorMemory::set_tensor(const std::string& name, TensorObject&& tensor) +{ + check_tensor_length(tensor); + this->tensors.insert_or_assign(name, std::move(tensor)); +} + +void TensorMemory::check_tensors_length(const TensorMap& tensors) +{ + for (const auto& p : tensors) + { + check_tensor_length(p.second); + } +} + +void TensorMemory::set_tensors(TensorMap&& tensors) +{ + check_tensors_length(tensors); + this->tensors = std::move(tensors); +} + +/****** TensorMemoryInterfaceProxy *************************/ +std::shared_ptr TensorMemoryInterfaceProxy::init(std::size_t count, pybind11::object& tensors) +{ + if (tensors.is_none()) + { + return std::make_shared(count); + } + else + { + return std::make_shared( + count, std::move(CupyUtil::cupy_to_tensors(tensors.cast()))); + } +} + +std::size_t TensorMemoryInterfaceProxy::get_count(TensorMemory& self) +{ + return self.count; +} + +CupyUtil::py_tensor_map_t TensorMemoryInterfaceProxy::get_tensors(TensorMemory& self) +{ + return CupyUtil::tensors_to_cupy(self.tensors); +} + +void TensorMemoryInterfaceProxy::set_tensors(TensorMemory& self, CupyUtil::py_tensor_map_t tensors) +{ + self.set_tensors(CupyUtil::cupy_to_tensors(tensors)); +} + +pybind11::object TensorMemoryInterfaceProxy::get_tensor(TensorMemory& self, const std::string name) +{ + try + { + auto tensor = self.get_tensor(name); + return CupyUtil::tensor_to_cupy(tensor); + } catch (const std::runtime_error& e) + { + throw pybind11::key_error{e.what()}; + } +} + +pybind11::object TensorMemoryInterfaceProxy::get_tensor_property(TensorMemory& self, const std::string name) +{ + try + { + return get_tensor(self, std::move(name)); + } catch (const pybind11::key_error& e) + { + throw pybind11::attribute_error{e.what()}; + } +} + +void TensorMemoryInterfaceProxy::set_tensor(TensorMemory& self, + const std::string name, + const pybind11::object& cupy_tensor) +{ + self.set_tensor(name, CupyUtil::cupy_to_tensor(cupy_tensor)); +} + } // namespace morpheus diff --git a/morpheus/_lib/src/messages/multi_inference.cpp b/morpheus/_lib/src/messages/multi_inference.cpp index ea85e3d643..fb1be7fa4f 100644 --- a/morpheus/_lib/src/messages/multi_inference.cpp +++ b/morpheus/_lib/src/messages/multi_inference.cpp @@ -20,11 +20,8 @@ #include "morpheus/messages/memory/inference_memory.hpp" #include "morpheus/messages/meta.hpp" #include "morpheus/messages/multi.hpp" -#include "morpheus/utilities/cupy_util.hpp" #include -#include -#include #include #include @@ -70,32 +67,4 @@ std::shared_ptr MultiInferenceMessageInterfaceProxy::init std::move(meta), mess_offset, mess_count, std::move(memory), offset, count); } -std::shared_ptr MultiInferenceMessageInterfaceProxy::memory(MultiInferenceMessage& self) -{ - DCHECK(std::dynamic_pointer_cast(self.memory) != nullptr); - return std::static_pointer_cast(self.memory); -} - -std::size_t MultiInferenceMessageInterfaceProxy::offset(MultiInferenceMessage& self) -{ - return self.offset; -} - -std::size_t MultiInferenceMessageInterfaceProxy::count(MultiInferenceMessage& self) -{ - return self.count; -} - -pybind11::object MultiInferenceMessageInterfaceProxy::get_input(MultiInferenceMessage& self, const std::string& name) -{ - const auto& py_tensor = CupyUtil::tensor_to_cupy(self.get_input(name)); - return py_tensor; -} - -std::shared_ptr MultiInferenceMessageInterfaceProxy::get_slice(MultiInferenceMessage& self, - std::size_t start, - std::size_t stop) -{ - return self.get_slice(start, stop); -} } // namespace morpheus diff --git a/morpheus/_lib/src/messages/multi_inference_fil.cpp b/morpheus/_lib/src/messages/multi_inference_fil.cpp index 683c4c3695..05b3348dd4 100644 --- a/morpheus/_lib/src/messages/multi_inference_fil.cpp +++ b/morpheus/_lib/src/messages/multi_inference_fil.cpp @@ -22,7 +22,6 @@ #include "morpheus/messages/multi_inference.hpp" #include -#include #include #include @@ -58,6 +57,7 @@ void MultiInferenceFILMessage::set_seq_ids(const TensorObject& seq_ids) { this->set_input("seq_ids", seq_ids); } + /****** MultiInferenceFILMessageInterfaceProxy *************************/ std::shared_ptr MultiInferenceFILMessageInterfaceProxy::init( std::shared_ptr meta, @@ -71,22 +71,16 @@ std::shared_ptr MultiInferenceFILMessageInterfaceProxy std::move(meta), mess_offset, mess_count, std::move(memory), offset, count); } -std::shared_ptr MultiInferenceFILMessageInterfaceProxy::memory( - MultiInferenceFILMessage& self) +pybind11::object MultiInferenceFILMessageInterfaceProxy::input__0(MultiInferenceFILMessage& self) { - DCHECK(std::dynamic_pointer_cast(self.memory) != nullptr); - return std::static_pointer_cast(self.memory); + return get_tensor_property(self, "input__0"); } -std::size_t MultiInferenceFILMessageInterfaceProxy::offset(MultiInferenceFILMessage& self) +pybind11::object MultiInferenceFILMessageInterfaceProxy::seq_ids(MultiInferenceFILMessage& self) { - return self.offset; + return get_tensor_property(self, "seq_ids"); } -std::size_t MultiInferenceFILMessageInterfaceProxy::count(MultiInferenceFILMessage& self) -{ - return self.count; -} } // namespace morpheus // Created by drobison on 3/17/22. // diff --git a/morpheus/_lib/src/messages/multi_inference_nlp.cpp b/morpheus/_lib/src/messages/multi_inference_nlp.cpp index 8365bdd4af..72cd587a9b 100644 --- a/morpheus/_lib/src/messages/multi_inference_nlp.cpp +++ b/morpheus/_lib/src/messages/multi_inference_nlp.cpp @@ -20,10 +20,8 @@ #include "morpheus/messages/memory/inference_memory.hpp" #include "morpheus/messages/meta.hpp" #include "morpheus/messages/multi_inference.hpp" -#include "morpheus/utilities/cupy_util.hpp" #include -#include #include #include @@ -84,44 +82,18 @@ std::shared_ptr MultiInferenceNLPMessageInterfaceProxy std::move(meta), mess_offset, mess_count, std::move(memory), offset, count); } -std::shared_ptr MultiInferenceNLPMessageInterfaceProxy::memory( - MultiInferenceNLPMessage& self) -{ - DCHECK(std::dynamic_pointer_cast(self.memory) != nullptr); - return std::static_pointer_cast(self.memory); -} - -std::size_t MultiInferenceNLPMessageInterfaceProxy::offset(MultiInferenceNLPMessage& self) -{ - return self.offset; -} - -std::size_t MultiInferenceNLPMessageInterfaceProxy::count(MultiInferenceNLPMessage& self) -{ - return self.count; -} - pybind11::object MultiInferenceNLPMessageInterfaceProxy::input_ids(MultiInferenceNLPMessage& self) { - // Get and convert - auto tensor = self.get_input_ids(); - - return CupyUtil::tensor_to_cupy(tensor); + return get_tensor_property(self, "input_ids"); } pybind11::object MultiInferenceNLPMessageInterfaceProxy::input_mask(MultiInferenceNLPMessage& self) { - // Get and convert - auto tensor = self.get_input_mask(); - - return CupyUtil::tensor_to_cupy(tensor); + return get_tensor_property(self, "input_mask"); } pybind11::object MultiInferenceNLPMessageInterfaceProxy::seq_ids(MultiInferenceNLPMessage& self) { - // Get and convert - auto tensor = self.get_seq_ids(); - - return CupyUtil::tensor_to_cupy(tensor); + return get_tensor_property(self, "seq_ids"); } } // namespace morpheus diff --git a/morpheus/_lib/src/messages/multi_response.cpp b/morpheus/_lib/src/messages/multi_response.cpp index 9b7008c565..ecf2a8ca43 100644 --- a/morpheus/_lib/src/messages/multi_response.cpp +++ b/morpheus/_lib/src/messages/multi_response.cpp @@ -21,11 +21,8 @@ #include "morpheus/messages/meta.hpp" #include "morpheus/messages/multi.hpp" #include "morpheus/objects/tensor_object.hpp" -#include "morpheus/utilities/cupy_util.hpp" #include -#include -#include #include #include @@ -70,27 +67,4 @@ std::shared_ptr MultiResponseMessageInterfaceProxy::init(s std::move(meta), mess_offset, mess_count, std::move(memory), offset, count); } -std::shared_ptr MultiResponseMessageInterfaceProxy::memory(MultiResponseMessage& self) -{ - DCHECK(std::dynamic_pointer_cast(self.memory) != nullptr); - - return std::static_pointer_cast(self.memory); -} - -std::size_t MultiResponseMessageInterfaceProxy::offset(MultiResponseMessage& self) -{ - return self.offset; -} - -std::size_t MultiResponseMessageInterfaceProxy::count(MultiResponseMessage& self) -{ - return self.count; -} - -pybind11::object MultiResponseMessageInterfaceProxy::get_output(MultiResponseMessage& self, const std::string& name) -{ - auto tensor = self.get_output(name); - - return CupyUtil::tensor_to_cupy(tensor); -} } // namespace morpheus diff --git a/morpheus/_lib/src/messages/multi_response_probs.cpp b/morpheus/_lib/src/messages/multi_response_probs.cpp index 6cbfeba8a3..5498143a34 100644 --- a/morpheus/_lib/src/messages/multi_response_probs.cpp +++ b/morpheus/_lib/src/messages/multi_response_probs.cpp @@ -18,10 +18,8 @@ #include "morpheus/messages/multi_response_probs.hpp" #include "morpheus/messages/meta.hpp" -#include "morpheus/utilities/cupy_util.hpp" #include -#include #include #include @@ -65,29 +63,8 @@ std::shared_ptr MultiResponseProbsMessageInterfacePro std::move(meta), mess_offset, mess_count, std::move(memory), offset, count); } -std::shared_ptr MultiResponseProbsMessageInterfaceProxy::memory( - MultiResponseProbsMessage& self) -{ - DCHECK(std::dynamic_pointer_cast(self.memory) != nullptr); - - return std::static_pointer_cast(self.memory); -} - -std::size_t MultiResponseProbsMessageInterfaceProxy::offset(MultiResponseProbsMessage& self) -{ - return self.offset; -} - -std::size_t MultiResponseProbsMessageInterfaceProxy::count(MultiResponseProbsMessage& self) -{ - return self.count; -} - pybind11::object MultiResponseProbsMessageInterfaceProxy::probs(MultiResponseProbsMessage& self) { - // Get and convert - auto tensor = self.get_probs(); - - return CupyUtil::tensor_to_cupy(tensor); + return get_tensor_property(self, "probs"); } } // namespace morpheus diff --git a/morpheus/_lib/src/messages/multi_tensor.cpp b/morpheus/_lib/src/messages/multi_tensor.cpp index f91618a468..237adde5a5 100644 --- a/morpheus/_lib/src/messages/multi_tensor.cpp +++ b/morpheus/_lib/src/messages/multi_tensor.cpp @@ -17,13 +17,15 @@ #include "morpheus/messages/multi_tensor.hpp" -#include "morpheus/types.hpp" // for TensorIndex, TensorMap +#include "morpheus/types.hpp" // for TensorIndex, TensorMap +#include "morpheus/utilities/cupy_util.hpp" // for CupyUtil::tensor_to_cupy #include // for cudf::size_type> #include +#include // for key_error -#include // for int32_t -#include // needed for logging +#include // for int32_t +#include // for runtime_error namespace morpheus { /****** Component public implementations *******************/ @@ -52,16 +54,16 @@ TensorObject MultiTensorMessage::get_tensor(const std::string& name) TensorObject MultiTensorMessage::get_tensor_impl(const std::string& name) const { - CHECK(this->memory->has_tensor(name)) << "Cound not find tensor: " << name; + auto& tensor = this->memory->get_tensor(name); // check if we are getting the entire input if (this->offset == 0 && this->count == this->memory->count) { - return this->memory->tensors[name]; + return tensor; } - return this->memory->tensors[name].slice({static_cast(this->offset), 0}, - {static_cast(this->offset + this->count), -1}); + return tensor.slice({static_cast(this->offset), 0}, + {static_cast(this->offset + this->count), -1}); } void MultiTensorMessage::set_tensor(const std::string& name, const TensorObject& value) @@ -119,4 +121,56 @@ std::shared_ptr MultiTensorMessage::copy_input_ranges( return std::make_shared(num_selected_rows, std::move(tensors)); } +/****** MultiTensorMessageInterfaceProxy *************************/ +std::shared_ptr MultiTensorMessageInterfaceProxy::init(std::shared_ptr meta, + std::size_t mess_offset, + std::size_t mess_count, + std::shared_ptr memory, + std::size_t offset, + std::size_t count) +{ + return std::make_shared( + std::move(meta), mess_offset, mess_count, std::move(memory), offset, count); +} + +std::shared_ptr MultiTensorMessageInterfaceProxy::memory(MultiTensorMessage& self) +{ + DCHECK(std::dynamic_pointer_cast(self.memory) != nullptr); + + return std::static_pointer_cast(self.memory); +} + +std::size_t MultiTensorMessageInterfaceProxy::offset(MultiTensorMessage& self) +{ + return self.offset; +} + +std::size_t MultiTensorMessageInterfaceProxy::count(MultiTensorMessage& self) +{ + return self.count; +} + +pybind11::object MultiTensorMessageInterfaceProxy::get_tensor(MultiTensorMessage& self, const std::string& name) +{ + try + { + auto tensor = self.get_tensor(name); + return CupyUtil::tensor_to_cupy(tensor); + } catch (const std::runtime_error& e) + { + throw pybind11::key_error{e.what()}; + } +} + +pybind11::object MultiTensorMessageInterfaceProxy::get_tensor_property(MultiTensorMessage& self, const std::string name) +{ + try + { + return get_tensor(self, std::move(name)); + } catch (const pybind11::key_error& e) + { + throw pybind11::attribute_error{e.what()}; + } +} + } // namespace morpheus diff --git a/morpheus/_lib/src/objects/mutable_table_ctx_mgr.cpp b/morpheus/_lib/src/objects/mutable_table_ctx_mgr.cpp index 1f85546a80..cda9deb98e 100644 --- a/morpheus/_lib/src/objects/mutable_table_ctx_mgr.cpp +++ b/morpheus/_lib/src/objects/mutable_table_ctx_mgr.cpp @@ -19,8 +19,8 @@ #include "morpheus/utilities/string_util.hpp" -#include #include +#include #include diff --git a/morpheus/_lib/src/python_modules/messages.cpp b/morpheus/_lib/src/python_modules/messages.cpp index e6363f9808..ddce249116 100644 --- a/morpheus/_lib/src/python_modules/messages.cpp +++ b/morpheus/_lib/src/python_modules/messages.cpp @@ -28,6 +28,7 @@ #include "morpheus/messages/multi_inference_nlp.hpp" #include "morpheus/messages/multi_response.hpp" #include "morpheus/messages/multi_response_probs.hpp" +#include "morpheus/messages/multi_tensor.hpp" #include "morpheus/objects/data_table.hpp" #include "morpheus/objects/mutable_table_ctx_mgr.hpp" #include "morpheus/utilities/cudf_util.hpp" @@ -38,7 +39,6 @@ #include #include #include -#include #include // IWYU pragma: keep #include #include @@ -84,6 +84,7 @@ PYBIND11_MODULE(messages, m) mrc::pymrc::PortBuilderUtil::register_port_util>(); mrc::pymrc::PortBuilderUtil::register_port_util>(); + mrc::pymrc::PortBuilderUtil::register_port_util>(); mrc::pymrc::PortBuilderUtil::register_port_util>(); mrc::pymrc::PortBuilderUtil::register_port_util>(); mrc::pymrc::PortBuilderUtil::register_port_util>(); @@ -91,30 +92,98 @@ PYBIND11_MODULE(messages, m) mrc::pymrc::PortBuilderUtil::register_port_util>(); // EdgeConnectors for derived classes of MultiMessage to MultiMessage + mrc::edge::EdgeConnector, + std::shared_ptr>::register_converter(); + + mrc::edge::EdgeConnector, + std::shared_ptr>::register_converter(); + mrc::edge::EdgeConnector, std::shared_ptr>::register_converter(); mrc::edge::EdgeConnector, std::shared_ptr>::register_converter(); + mrc::edge::EdgeConnector, + std::shared_ptr>::register_converter(); + mrc::edge::EdgeConnector, std::shared_ptr>::register_converter(); mrc::edge::EdgeConnector, std::shared_ptr>::register_converter(); + mrc::edge::EdgeConnector, + std::shared_ptr>::register_converter(); + mrc::edge::EdgeConnector, std::shared_ptr>::register_converter(); mrc::edge::EdgeConnector, std::shared_ptr>::register_converter(); + mrc::edge::EdgeConnector, + std::shared_ptr>::register_converter(); + mrc::edge::EdgeConnector, std::shared_ptr>::register_converter(); + mrc::edge::EdgeConnector, + std::shared_ptr>::register_converter(); + mrc::edge::EdgeConnector, std::shared_ptr>::register_converter(); + // Tensor Memory classes + py::class_>(m, "TensorMemory") + .def(py::init<>(&TensorMemoryInterfaceProxy::init), py::arg("count"), py::arg("tensors") = py::none()) + .def_readonly("count", &TensorMemory::count) + .def("get_tensors", &TensorMemoryInterfaceProxy::get_tensors, py::return_value_policy::move) + .def("set_tensors", &TensorMemoryInterfaceProxy::set_tensors, py::arg("tensors")) + .def("get_tensor", &TensorMemoryInterfaceProxy::get_tensor, py::arg("name"), py::return_value_policy::move) + .def("set_tensor", &TensorMemoryInterfaceProxy::set_tensor, py::arg("name"), py::arg("tensor")); + + py::class_>(m, "InferenceMemory") + .def(py::init<>(&InferenceMemoryInterfaceProxy::init), py::arg("count"), py::arg("tensors") = py::none()) + .def("get_input", &InferenceMemoryInterfaceProxy::get_tensor, py::arg("name"), py::return_value_policy::move) + .def("set_input", &InferenceMemoryInterfaceProxy::set_tensor, py::arg("name"), py::arg("tensor")); + + py::class_>(m, "InferenceMemoryFIL") + .def(py::init<>(&InferenceMemoryFILInterfaceProxy::init), + py::arg("count"), + py::arg("input__0"), + py::arg("seq_ids")) + .def_property("input__0", + &InferenceMemoryFILInterfaceProxy::get_input__0, + &InferenceMemoryFILInterfaceProxy::set_input__0) + .def_property( + "seq_ids", &InferenceMemoryFILInterfaceProxy::get_seq_ids, &InferenceMemoryFILInterfaceProxy::set_seq_ids); + + py::class_>(m, "InferenceMemoryNLP") + .def(py::init<>(&InferenceMemoryNLPInterfaceProxy::init), + py::arg("count"), + py::arg("input_ids"), + py::arg("input_mask"), + py::arg("seq_ids")) + .def_property("input_ids", + &InferenceMemoryNLPInterfaceProxy::get_input_ids, + &InferenceMemoryNLPInterfaceProxy::set_input_ids) + .def_property("input_mask", + &InferenceMemoryNLPInterfaceProxy::get_input_mask, + &InferenceMemoryNLPInterfaceProxy::set_input_mask) + .def_property( + "seq_ids", &InferenceMemoryNLPInterfaceProxy::get_seq_ids, &InferenceMemoryNLPInterfaceProxy::set_seq_ids); + + py::class_>(m, "ResponseMemory") + .def(py::init<>(&ResponseMemoryInterfaceProxy::init), py::arg("count"), py::arg("tensors") = py::none()) + .def("get_output", &ResponseMemoryInterfaceProxy::get_tensor, py::arg("name"), py::return_value_policy::move) + .def("set_output", &ResponseMemoryInterfaceProxy::set_tensor, py::arg("name"), py::arg("tensor")); + + py::class_>(m, "ResponseMemoryProbs") + .def(py::init<>(&ResponseMemoryProbsInterfaceProxy::init), py::arg("count"), py::arg("probs")) + .def_property( + "probs", &ResponseMemoryProbsInterfaceProxy::get_probs, &ResponseMemoryProbsInterfaceProxy::set_probs); + // Context manager for Mutable Dataframes. Attempting to use it outside of a with block will raise an exception py::class_>(m, "MutableTableCtxMgr") .def("__enter__", &MutableTableCtxMgr::enter, py::return_value_policy::reference) @@ -124,6 +193,7 @@ PYBIND11_MODULE(messages, m) .def("__setattr__", &MutableTableCtxMgr::throw_usage_error) .def("__setitem__", &MutableTableCtxMgr::throw_usage_error); + // Message classes py::class_>(m, "MessageMeta") .def(py::init<>(&MessageMetaInterfaceProxy::init_python), py::arg("df")) .def_property_readonly("count", &MessageMetaInterfaceProxy::count) @@ -159,37 +229,18 @@ PYBIND11_MODULE(messages, m) py::return_value_policy::move) .def("get_meta_list", &MultiMessageInterfaceProxy::get_meta_list, py::return_value_policy::move); - py::class_>(m, "InferenceMemory") - .def_property_readonly("count", &InferenceMemoryInterfaceProxy::get_count); - - py::class_>(m, "InferenceMemoryNLP") - .def(py::init<>(&InferenceMemoryNLPInterfaceProxy::init), - py::arg("count"), - py::arg("input_ids"), - py::arg("input_mask"), - py::arg("seq_ids")) - .def_property_readonly("count", &InferenceMemoryNLPInterfaceProxy::count) - .def_property("input_ids", - &InferenceMemoryNLPInterfaceProxy::get_input_ids, - &InferenceMemoryNLPInterfaceProxy::set_input_ids) - .def_property("input_mask", - &InferenceMemoryNLPInterfaceProxy::get_input_mask, - &InferenceMemoryNLPInterfaceProxy::set_input_mask) - .def_property( - "seq_ids", &InferenceMemoryNLPInterfaceProxy::get_seq_ids, &InferenceMemoryNLPInterfaceProxy::set_seq_ids); - - py::class_>(m, "InferenceMemoryFIL") - .def(py::init<>(&InferenceMemoryFILInterfaceProxy::init), - py::arg("count"), - py::arg("input__0"), - py::arg("seq_ids")) - .def_property_readonly("count", &InferenceMemoryFILInterfaceProxy::count) - .def("get_tensor", &InferenceMemoryFILInterfaceProxy::get_tensor) - .def_property("input__0", - &InferenceMemoryFILInterfaceProxy::get_input__0, - &InferenceMemoryFILInterfaceProxy::set_input__0) - .def_property( - "seq_ids", &InferenceMemoryFILInterfaceProxy::get_seq_ids, &InferenceMemoryFILInterfaceProxy::set_seq_ids); + py::class_>(m, "MultiTensorMessage") + .def(py::init<>(&MultiTensorMessageInterfaceProxy::init), + py::arg("meta"), + py::arg("mess_offset"), + py::arg("mess_count"), + py::arg("memory"), + py::arg("offset"), + py::arg("count")) + .def_property_readonly("memory", &MultiTensorMessageInterfaceProxy::memory) + .def_property_readonly("offset", &MultiTensorMessageInterfaceProxy::offset) + .def_property_readonly("count", &MultiTensorMessageInterfaceProxy::count) + .def("get_tensor", &MultiTensorMessageInterfaceProxy::get_tensor); py::class_>(m, "MultiInferenceMessage") .def(py::init<>(&MultiInferenceMessageInterfaceProxy::init), @@ -199,11 +250,7 @@ PYBIND11_MODULE(messages, m) py::arg("memory"), py::arg("offset"), py::arg("count")) - .def_property_readonly("memory", &MultiInferenceMessageInterfaceProxy::memory) - .def_property_readonly("offset", &MultiInferenceMessageInterfaceProxy::offset) - .def_property_readonly("count", &MultiInferenceMessageInterfaceProxy::count) - .def("get_input", &MultiInferenceMessageInterfaceProxy::get_input) - .def("get_slice", &MultiInferenceMessageInterfaceProxy::get_slice, py::return_value_policy::reference_internal); + .def("get_input", &MultiInferenceMessageInterfaceProxy::get_tensor); py::class_>( m, "MultiInferenceNLPMessage") @@ -214,9 +261,6 @@ PYBIND11_MODULE(messages, m) py::arg("memory"), py::arg("offset"), py::arg("count")) - .def_property_readonly("memory", &MultiInferenceNLPMessageInterfaceProxy::memory) - .def_property_readonly("offset", &MultiInferenceNLPMessageInterfaceProxy::offset) - .def_property_readonly("count", &MultiInferenceNLPMessageInterfaceProxy::count) .def_property_readonly("input_ids", &MultiInferenceNLPMessageInterfaceProxy::input_ids) .def_property_readonly("input_mask", &MultiInferenceNLPMessageInterfaceProxy::input_mask) .def_property_readonly("seq_ids", &MultiInferenceNLPMessageInterfaceProxy::seq_ids); @@ -230,25 +274,8 @@ PYBIND11_MODULE(messages, m) py::arg("memory"), py::arg("offset"), py::arg("count")) - .def_property_readonly("memory", &MultiInferenceFILMessageInterfaceProxy::memory) - .def_property_readonly("offset", &MultiInferenceFILMessageInterfaceProxy::offset) - .def_property_readonly("count", &MultiInferenceFILMessageInterfaceProxy::count); - - py::class_>(m, "TensorMemory") - .def_readonly("count", &TensorMemory::count); - - py::class_>(m, "ResponseMemory") - .def_readonly("count", &ResponseMemory::count) - .def("get_output", &ResponseMemoryInterfaceProxy::get_output, py::return_value_policy::reference_internal) - .def("get_output_tensor", - &ResponseMemoryInterfaceProxy::get_output_tensor, - py::return_value_policy::reference_internal); - - py::class_>(m, "ResponseMemoryProbs") - .def(py::init<>(&ResponseMemoryProbsInterfaceProxy::init), py::arg("count"), py::arg("probs")) - .def_property_readonly("count", &ResponseMemoryProbsInterfaceProxy::count) - .def_property( - "probs", &ResponseMemoryProbsInterfaceProxy::get_probs, &ResponseMemoryProbsInterfaceProxy::set_probs); + .def_property_readonly("input__0", &MultiInferenceFILMessageInterfaceProxy::input__0) + .def_property_readonly("seq_ids", &MultiInferenceFILMessageInterfaceProxy::seq_ids); py::class_>(m, "MultiResponseMessage") .def(py::init<>(&MultiResponseMessageInterfaceProxy::init), @@ -258,10 +285,7 @@ PYBIND11_MODULE(messages, m) py::arg("memory"), py::arg("offset"), py::arg("count")) - .def_property_readonly("memory", &MultiResponseMessageInterfaceProxy::memory) - .def_property_readonly("offset", &MultiResponseMessageInterfaceProxy::offset) - .def_property_readonly("count", &MultiResponseMessageInterfaceProxy::count) - .def("get_output", &MultiResponseMessageInterfaceProxy::get_output); + .def("get_output", &MultiResponseMessageInterfaceProxy::get_tensor); py::class_>( m, "MultiResponseProbsMessage") @@ -272,9 +296,6 @@ PYBIND11_MODULE(messages, m) py::arg("memory"), py::arg("offset"), py::arg("count")) - .def_property_readonly("memory", &MultiResponseProbsMessageInterfaceProxy::memory) - .def_property_readonly("offset", &MultiResponseProbsMessageInterfaceProxy::offset) - .def_property_readonly("count", &MultiResponseProbsMessageInterfaceProxy::count) .def_property_readonly("probs", &MultiResponseProbsMessageInterfaceProxy::probs); #ifdef VERSION_INFO diff --git a/morpheus/_lib/src/stages/preprocess_fil.cpp b/morpheus/_lib/src/stages/preprocess_fil.cpp index 709034e7c1..be55edfaba 100644 --- a/morpheus/_lib/src/stages/preprocess_fil.cpp +++ b/morpheus/_lib/src/stages/preprocess_fil.cpp @@ -113,7 +113,8 @@ PreprocessFILStage::subscribe_fn_t PreprocessFILStage::build_operator() 0); // Build the results - auto memory = std::make_shared(x->mess_count, input__0, seg_ids); + auto memory = + std::make_shared(x->mess_count, std::move(input__0), std::move(seg_ids)); auto next = std::make_shared( x->meta, x->mess_offset, x->mess_count, std::move(memory), 0, memory->count); diff --git a/morpheus/_lib/src/utilities/cupy_util.cpp b/morpheus/_lib/src/utilities/cupy_util.cpp index 15d0c22462..f2c0615b14 100644 --- a/morpheus/_lib/src/utilities/cupy_util.cpp +++ b/morpheus/_lib/src/utilities/cupy_util.cpp @@ -36,6 +36,7 @@ #include // for uintptr_t #include // for make_shared #include // for string +#include // for move #include // for vector namespace morpheus { @@ -129,4 +130,26 @@ TensorObject CupyUtil::cupy_to_tensor(pybind11::object cupy_array) return tensor; } + +TensorMap CupyUtil::cupy_to_tensors(const py_tensor_map_t& cupy_tensors) +{ + tensor_map_t tensors; + for (const auto& tensor : cupy_tensors) + { + tensors[tensor.first] = std::move(cupy_to_tensor(tensor.second)); + } + + return tensors; +} + +CupyUtil::py_tensor_map_t CupyUtil::tensors_to_cupy(const tensor_map_t& tensors) +{ + py_tensor_map_t cupy_tensors; + for (const auto& tensor : tensors) + { + cupy_tensors[tensor.first] = std::move(tensor_to_cupy(tensor.second)); + } + + return cupy_tensors; +} } // namespace morpheus diff --git a/morpheus/messages/__init__.py b/morpheus/messages/__init__.py index 58d7c1c85f..cfdf70cff8 100644 --- a/morpheus/messages/__init__.py +++ b/morpheus/messages/__init__.py @@ -18,24 +18,25 @@ # Import order is very important here. Import base classes before child ones # isort: off +from morpheus.messages.memory.inference_memory import InferenceMemory +from morpheus.messages.memory.inference_memory import InferenceMemoryAE +from morpheus.messages.memory.inference_memory import InferenceMemoryFIL +from morpheus.messages.memory.inference_memory import InferenceMemoryNLP +from morpheus.messages.memory.response_memory import ResponseMemory +from morpheus.messages.memory.response_memory import ResponseMemoryAE +from morpheus.messages.memory.response_memory import ResponseMemoryProbs from morpheus.messages.message_base import MessageBase from morpheus.messages.message_meta import MessageMeta from morpheus.messages.message_meta import UserMessageMeta from morpheus.messages.multi_message import MultiMessage +from morpheus.messages.multi_tensor_message import MultiTensorMessage from morpheus.messages.multi_ae_message import MultiAEMessage -from morpheus.messages.multi_inference_message import InferenceMemory -from morpheus.messages.multi_inference_message import InferenceMemoryAE -from morpheus.messages.multi_inference_message import InferenceMemoryFIL -from morpheus.messages.multi_inference_message import InferenceMemoryNLP from morpheus.messages.multi_inference_message import MultiInferenceFILMessage from morpheus.messages.multi_inference_message import MultiInferenceMessage from morpheus.messages.multi_inference_message import MultiInferenceNLPMessage from morpheus.messages.multi_response_message import MultiResponseAEMessage from morpheus.messages.multi_response_message import MultiResponseMessage from morpheus.messages.multi_response_message import MultiResponseProbsMessage -from morpheus.messages.multi_response_message import ResponseMemory -from morpheus.messages.multi_response_message import ResponseMemoryAE -from morpheus.messages.multi_response_message import ResponseMemoryProbs __all__ = [ "InferenceMemory", @@ -52,6 +53,7 @@ "MultiResponseAEMessage", "MultiResponseMessage", "MultiResponseProbsMessage", + "MultiTensorMessage", "ResponseMemory", "ResponseMemoryAE", "ResponseMemoryProbs", diff --git a/morpheus/messages/memory/__init__.py b/morpheus/messages/memory/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/morpheus/messages/memory/inference_memory.py b/morpheus/messages/memory/inference_memory.py new file mode 100644 index 0000000000..58a8a56999 --- /dev/null +++ b/morpheus/messages/memory/inference_memory.py @@ -0,0 +1,138 @@ +# SPDX-FileCopyrightText: Copyright (c) 2022-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import dataclasses + +import cupy as cp + +import morpheus._lib.messages as _messages +from morpheus.messages.data_class_prop import DataClassProp +from morpheus.messages.memory.tensor_memory import TensorMemory + + +@dataclasses.dataclass(init=False) +class InferenceMemory(TensorMemory, cpp_class=_messages.InferenceMemory): + """ + This is a base container class for data that will be used for inference stages. This class is designed to + hold generic tensor data in cupy arrays. + """ + + def get_input(self, name: str): + """ + Get the tensor stored in the container identified by `name`. Alias for `InferenceMemory.get_tensor`. + + Parameters + ---------- + name : str + Key used to do lookup in inputs dict of the container. + + Returns + ------- + cupy.ndarray + Inputs corresponding to name. + + Raises + ------ + KeyError + If input name does not exist in the container. + """ + return self.get_tensor(name) + + def set_input(self, name: str, tensor: cp.ndarray): + """ + Update the input tensor identified by `name`. Alias for `InferenceMemory.set_tensor` + + Parameters + ---------- + name : str + Key used to do lookup in inputs dict of the container. + tensor : cupy.ndarray + Tensor as a CuPy array. + """ + self.set_tensor(name, tensor) + + +@dataclasses.dataclass(init=False) +class InferenceMemoryNLP(InferenceMemory, cpp_class=_messages.InferenceMemoryNLP): + """ + This is a container class for data that needs to be submitted to the inference server for NLP category + usecases. + + Parameters + ---------- + input_ids : cupy.ndarray + The token-ids for each string padded with 0s to max_length. + input_mask : cupy.ndarray + The mask for token-ids result where corresponding positions identify valid token-id values. + seq_ids : cupy.ndarray + Ids used to index from an inference input to a message. Necessary since there can be more inference + inputs than messages (i.e., if some messages get broken into multiple inference requests). + + """ + input_ids: dataclasses.InitVar[cp.ndarray] = DataClassProp(InferenceMemory._get_tensor_prop, + InferenceMemory.set_input) + input_mask: dataclasses.InitVar[cp.ndarray] = DataClassProp(InferenceMemory._get_tensor_prop, + InferenceMemory.set_input) + seq_ids: dataclasses.InitVar[cp.ndarray] = DataClassProp(InferenceMemory._get_tensor_prop, + InferenceMemory.set_input) + + def __init__(self, count: int, input_ids: cp.ndarray, input_mask: cp.ndarray, seq_ids: cp.ndarray): + super().__init__(count, tensors={'input_ids': input_ids, 'input_mask': input_mask, 'seq_ids': seq_ids}) + + +@dataclasses.dataclass(init=False) +class InferenceMemoryFIL(InferenceMemory, cpp_class=_messages.InferenceMemoryFIL): + """ + This is a container class for data that needs to be submitted to the inference server for FIL category + usecases. + + Parameters + ---------- + input__0 : cupy.ndarray + Inference input. + seq_ids : cupy.ndarray + Ids used to index from an inference input to a message. Necessary since there can be more inference + inputs than messages (i.e., if some messages get broken into multiple inference requests). + + """ + input__0: dataclasses.InitVar[cp.ndarray] = DataClassProp(InferenceMemory._get_tensor_prop, + InferenceMemory.set_input) + seq_ids: dataclasses.InitVar[cp.ndarray] = DataClassProp(InferenceMemory._get_tensor_prop, + InferenceMemory.set_input) + + def __init__(self, count: int, input__0: cp.ndarray, seq_ids: cp.ndarray): + super().__init__(count, tensors={'input__0': input__0, 'seq_ids': seq_ids}) + + +@dataclasses.dataclass(init=False) +class InferenceMemoryAE(InferenceMemory, cpp_class=None): + """ + This is a container class for data that needs to be submitted to the inference server for auto encoder usecases. + + Parameters + ---------- + input : cupy.ndarray + Inference input. + seq_ids : cupy.ndarray + Ids used to index from an inference input to a message. Necessary since there can be more inference + inputs than messages (i.e., if some messages get broken into multiple inference requests). + """ + + input: dataclasses.InitVar[cp.ndarray] = DataClassProp(InferenceMemory._get_tensor_prop, InferenceMemory.set_input) + seq_ids: dataclasses.InitVar[cp.ndarray] = DataClassProp(InferenceMemory._get_tensor_prop, + InferenceMemory.set_input) + + def __init__(self, count: int, input: cp.ndarray, seq_ids: cp.ndarray): + super().__init__(count, tensors={'input': input, 'seq_ids': seq_ids}) diff --git a/morpheus/messages/memory/response_memory.py b/morpheus/messages/memory/response_memory.py new file mode 100644 index 0000000000..c120e63191 --- /dev/null +++ b/morpheus/messages/memory/response_memory.py @@ -0,0 +1,101 @@ +# SPDX-FileCopyrightText: Copyright (c) 2022-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import dataclasses + +import cupy as cp + +import morpheus._lib.messages as _messages +from morpheus.messages.data_class_prop import DataClassProp +from morpheus.messages.memory.tensor_memory import TensorMemory + + +@dataclasses.dataclass(init=False) +class ResponseMemory(TensorMemory, cpp_class=_messages.ResponseMemory): + """Output memory block holding the results of inference.""" + + def get_output(self, name: str): + """ + Get the Tensor stored in the container identified by `name`. Alias for `ResponseMemory.get_tensor`. + + Parameters + ---------- + name : str + Key used to do lookup in tensors dict of message container. + + Returns + ------- + cupy.ndarray + Tensors corresponding to name. + + Raises + ------ + KeyError + If output name does not exist in message container. + + """ + return self.get_tensor(name) + + def set_output(self, name: str, tensor: cp.ndarray): + """ + Update the output tensor identified by `name`. Alias for `ResponseMemory.set_tensor` + + Parameters + ---------- + name : str + Key used to do lookup in tensors dict of the container. + tensor : cupy.ndarray + Tensor as a CuPy array. + + Raises + ------ + ValueError + If the number of rows in `tensor` does not match `count` + """ + self.set_tensor(name, tensor) + + +@dataclasses.dataclass(init=False) +class ResponseMemoryProbs(ResponseMemory, cpp_class=_messages.ResponseMemoryProbs): + """ + Subclass of `ResponseMemory` containng an output tensor named 'probs'. + + Parameters + ---------- + probs : cupy.ndarray + Probabilities tensor + """ + probs: dataclasses.InitVar[cp.ndarray] = DataClassProp(ResponseMemory._get_tensor_prop, ResponseMemory.set_output) + + def __init__(self, count: int, probs: cp.ndarray): + super().__init__(count, tensors={'probs': probs}) + + +@dataclasses.dataclass(init=False) +class ResponseMemoryAE(ResponseMemoryProbs, cpp_class=None): + """ + Subclass of `ResponseMemoryProbs` specific to the AutoEncoder pipeline. + + Parameters + ---------- + user_id : str + User id the inference was performed against. + + explain_df : pd.Dataframe + Explainability Dataframe, for each feature a column will exist with a name in the form of: `{feature}_z_loss` + containing the loss z-score along with `max_abs_z` and `mean_abs_z` columns + """ + user_id = "" + explain_df = None diff --git a/morpheus/messages/memory/tensor_memory.py b/morpheus/messages/memory/tensor_memory.py new file mode 100644 index 0000000000..93572a8442 --- /dev/null +++ b/morpheus/messages/memory/tensor_memory.py @@ -0,0 +1,148 @@ +# SPDX-FileCopyrightText: Copyright (c) 2022-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import dataclasses +import typing + +import cupy as cp + +import morpheus._lib.messages as _messages +from morpheus.messages.message_base import MessageData + + +@dataclasses.dataclass(init=False) +class TensorMemory(MessageData, cpp_class=_messages.TensorMemory): + """ + This is a base container class for data that will be used for inference stages. This class is designed to + hold generic tensor data in cupy arrays. + + Parameters + ---------- + count : int + Length of each tensor contained in `tensors`. + tensors : typing.Dict[str, cupy.ndarray] + Collection of tensors uniquely identified by a name. + + """ + count: int + + def __init__(self, count: int, tensors: typing.Dict[str, cp.ndarray] = None): + self.count = count + + if tensors is None: + tensors = {} + else: + self._check_tensors(tensors) + + self._tensors = tensors + + def _check_tensors(self, tensors: typing.Dict[str, cp.ndarray]): + for tensor in tensors.values(): + self._check_tensor(tensor) + + def _check_tensor(self, tensor: cp.ndarray): + if (tensor.shape[0] != self.count): + class_name = type(self).__name__ + raise ValueError( + f"The number rows in tensor {tensor.shape[0]} does not match {class_name}.count of {self.count}") + + def get_tensors(self): + """ + Get the tensors contained by this instance. It is important to note that when C++ execution is enabled the + returned tensors will be a Python copy of the tensors stored in the C++ object. As such any changes made to the + tensors will need to be updated with a call to `set_tensors`. + + Returns + ------- + typing.Dict[str, cp.ndarray] + """ + return self._tensors + + def set_tensors(self, tensors: typing.Dict[str, cp.ndarray]): + """ + Overwrite the tensors stored by this instance. If the length of the tensors has changed, then the `count` + property should also be updated. + + Parameters + ---------- + tensors : typing.Dict[str, cupy.ndarray] + Collection of tensors uniquely identified by a name. + """ + self._check_tensors(tensors) + self._tensors = tensors + + def get_tensor(self, name: str): + """ + Get the Tensor stored in the container identified by `name`. + + Parameters + ---------- + name : str + Tensor key name. + + Returns + ------- + cupy.ndarray + Tensor. + + Raises + ------ + KeyError + If tensor name does not exist in the container. + """ + return self._tensors[name] + + def _get_tensor_prop(self, name: str): + """ + This method is intended to be used by propery methods in subclasses + + Parameters + ---------- + name : str + Tensor key name. + + Returns + ------- + cupy.ndarray + Tensor. + + Raises + ------ + AttributeError + If tensor name does not exist in the container. + """ + try: + return self._tensors[name] + except KeyError: + raise AttributeError + + def set_tensor(self, name: str, tensor: cp.ndarray): + """ + Update the tensor identified by `name`. + + Parameters + ---------- + tensor : cupy.ndarray + Tensor as a CuPy array. + + Raises + ------ + ValueError + If the number of rows in `tensor` does not match `count` + """ + # Ensure that we have 2D array here (`ensure_2d` inserts the wrong axis) + reshaped_tensor = tensor if tensor.ndim == 2 else cp.reshape(tensor, (tensor.shape[0], -1)) + self._check_tensor(reshaped_tensor) + self._tensors[name] = reshaped_tensor diff --git a/morpheus/messages/multi_ae_message.py b/morpheus/messages/multi_ae_message.py index d168053bf9..294c09fac5 100644 --- a/morpheus/messages/multi_ae_message.py +++ b/morpheus/messages/multi_ae_message.py @@ -60,7 +60,7 @@ def get_slice(self, start, stop): train_scores_mean=self.train_scores_mean, train_scores_std=self.train_scores_std) - def copy_ranges(self, ranges: typing.List[typing.Tuple[int, int]], num_selected_rows: int = None): + def copy_ranges(self, ranges: typing.List[typing.Tuple[int, int]]): """ Perform a copy of the current message instance for the given `ranges` of rows. @@ -71,22 +71,15 @@ def copy_ranges(self, ranges: typing.List[typing.Tuple[int, int]], num_selected_ The final output is exclusive of the `stop_row`, i.e. `[start_row, stop_row)`. For example to copy rows 1-2 & 5-7 `ranges=[(1, 3), (5, 8)]` - num_selected_rows : typing.Union[None, int] - Optional specify the number of rows selected by `ranges`, otherwise this is computed by the result. - Returns ------- `MultiAEMessage` """ sliced_rows = self.copy_meta_ranges(ranges) - - if num_selected_rows is None: - num_selected_rows = len(sliced_rows) - return MultiAEMessage(meta=UserMessageMeta(sliced_rows, user_id=self.meta.user_id), mess_offset=0, - mess_count=num_selected_rows, + mess_count=len(sliced_rows), model=self.model, train_scores_mean=self.train_scores_mean, train_scores_std=self.train_scores_std) diff --git a/morpheus/messages/multi_inference_message.py b/morpheus/messages/multi_inference_message.py index 06f3644004..5488530688 100644 --- a/morpheus/messages/multi_inference_message.py +++ b/morpheus/messages/multi_inference_message.py @@ -14,147 +14,16 @@ # limitations under the License. import dataclasses -import typing - -import cupy as cp import morpheus._lib.messages as _messages -from morpheus.messages.data_class_prop import DataClassProp -from morpheus.messages.multi_message import MultiMessage -from morpheus.messages.tensor_memory import TensorMemory +from morpheus.messages.multi_tensor_message import MultiTensorMessage @dataclasses.dataclass -class InferenceMemory(TensorMemory, cpp_class=_messages.InferenceMemory): - """ - This is a base container class for data that will be used for inference stages. This class is designed to - hold generic tensor data in cupy arrays. - """ - - -def get_input(instance, name: str): - """ - Getter function used with DataClassProp for getting inference input from message containers derived - from InferenceMemory. - - Parameters - ---------- - instance : `InferenceMemory` - Message container holding inputs. - name : str - Key used to do lookup in inputs dict of message container. - - Returns - ------- - cupy.ndarray - Inputs corresponding to name. - - Raises - ------ - AttributeError - If input name does not exist in message container. +class MultiInferenceMessage(MultiTensorMessage, cpp_class=_messages.MultiInferenceMessage): """ - if (name not in instance.tensors): - raise AttributeError - - return instance.tensors[name] - - -def set_input(instance, name: str, value): - """ - Setter function used with DataClassProp for setting inference input in message containers derived - from InferenceMemory. - - Parameters - ---------- - instance : `InferenceMemory` - Message container holding inputs. - name : str - Key used to do lookup in inputs dict of message container. - value : cupy.ndarray - Value to set for input. - """ - # Ensure that we have 2D array here (`ensure_2d` inserts the wrong axis) - instance.tensors[name] = value if value.ndim == 2 else cp.reshape(value, (value.shape[0], -1)) - - -@dataclasses.dataclass -class InferenceMemoryNLP(InferenceMemory, cpp_class=_messages.InferenceMemoryNLP): - """ - This is a container class for data that needs to be submitted to the inference server for NLP category - usecases. - - Parameters - ---------- - input_ids : cupy.ndarray - The token-ids for each string padded with 0s to max_length. - input_mask : cupy.ndarray - The mask for token-ids result where corresponding positions identify valid token-id values. - seq_ids : cupy.ndarray - Ids used to index from an inference input to a message. Necessary since there can be more inference - inputs than messages (i.e., if some messages get broken into multiple inference requests). - - """ - input_ids: dataclasses.InitVar[cp.ndarray] = DataClassProp(get_input, set_input) - input_mask: dataclasses.InitVar[cp.ndarray] = DataClassProp(get_input, set_input) - seq_ids: dataclasses.InitVar[cp.ndarray] = DataClassProp(get_input, set_input) - - def __post_init__(self, input_ids, input_mask, seq_ids): - self.input_ids = input_ids - self.input_mask = input_mask - self.seq_ids = seq_ids - - -@dataclasses.dataclass -class InferenceMemoryFIL(InferenceMemory, cpp_class=_messages.InferenceMemoryFIL): - """ - This is a container class for data that needs to be submitted to the inference server for FIL category - usecases. - - Parameters - ---------- - input__0 : cupy.ndarray - Inference input. - seq_ids : cupy.ndarray - Ids used to index from an inference input to a message. Necessary since there can be more inference - inputs than messages (i.e., if some messages get broken into multiple inference requests). - - """ - input__0: dataclasses.InitVar[cp.ndarray] = DataClassProp(get_input, set_input) - seq_ids: dataclasses.InitVar[cp.ndarray] = DataClassProp(get_input, set_input) - - def __post_init__(self, input__0, seq_ids): - self.input__0 = input__0 - self.seq_ids = seq_ids - - -@dataclasses.dataclass -class InferenceMemoryAE(InferenceMemory, cpp_class=None): - """ - This is a container class for data that needs to be submitted to the inference server for auto encoder usecases. - - Parameters - ---------- - input : cupy.ndarray - Inference input. - seq_ids : cupy.ndarray - Ids used to index from an inference input to a message. Necessary since there can be more inference - inputs than messages (i.e., if some messages get broken into multiple inference requests). - """ - - input: dataclasses.InitVar[cp.ndarray] = DataClassProp(get_input, set_input) - seq_ids: dataclasses.InitVar[cp.ndarray] = DataClassProp(get_input, set_input) - - def __post_init__(self, input, seq_ids): - self.input = input - self.seq_ids = seq_ids - - -@dataclasses.dataclass -class MultiInferenceMessage(MultiMessage, cpp_class=_messages.MultiInferenceMessage): - """ - This is a container class that holds the TensorMemory container and the metadata of the data contained - within it. Builds on top of the `MultiMessage` class to add additional data for inferencing. + This is a container class that holds the InferenceMemory container and the metadata of the data contained + within it. Builds on top of the `MultiTensorMessage` class to add additional data for inferencing. This class requires two separate memory blocks for a batch. One for the message metadata (i.e., start time, IP address, etc.) and another for the raw inference inputs (i.e., input_ids, seq_ids). Since there can be @@ -162,25 +31,12 @@ class MultiInferenceMessage(MultiMessage, cpp_class=_messages.MultiInferenceMess inference requests) this class stores two different offset and count values. `mess_offset` and `mess_count` refer to the offset and count in the message metadata batch and `offset` and `count` index into the inference batch data. - - Parameters - ---------- - memory : `TensorMemory` - Inference memory. - offset : int - Message offset in inference memory instance. - count : int - Message count in inference memory instance. - """ - memory: TensorMemory = dataclasses.field(repr=False) - offset: int - count: int @property def inputs(self): """ - Get inputs stored in the TensorMemory container. + Get inputs stored in the InferenceMemory container. Alias for `MultiInferenceMessage.tensors`. Returns ------- @@ -188,8 +44,7 @@ def inputs(self): Inference inputs. """ - - return {key: self.get_input(key) for key in self.memory.tensors.keys()} + return self.tensors def __getstate__(self): return self.__dict__ @@ -197,18 +52,9 @@ def __getstate__(self): def __setstate__(self, d): self.__dict__ = d - def __getattr__(self, name: str) -> typing.Any: - - input_val = self.memory.tensors.get(name, None) - - if (input_val is not None): - return input_val[self.offset:self.offset + self.count, :] - - raise AttributeError - def get_input(self, name: str): """ - Get input stored in the TensorMemory container. + Get input stored in the InferenceMemory container. Alias for `MultiInferenceMessage.get_tensor`. Parameters ---------- @@ -220,9 +66,12 @@ def get_input(self, name: str): cupy.ndarray Inference input. + Raises + ------ + KeyError + When no matching input tensor exists. """ - - return self.memory.tensors[name][self.offset:self.offset + self.count, :] + return self.get_tensor(name) def get_slice(self, start, stop): """ @@ -271,7 +120,7 @@ def input_ids(self): """ - return self.get_input("input_ids") + return self._get_tensor_prop("input_ids") @property def input_mask(self): @@ -285,7 +134,7 @@ def input_mask(self): """ - return self.get_input("input_mask") + return self._get_tensor_prop("input_mask") @property def seq_ids(self): @@ -300,7 +149,7 @@ def seq_ids(self): """ - return self.get_input("seq_ids") + return self._get_tensor_prop("seq_ids") @dataclasses.dataclass @@ -322,7 +171,7 @@ def input__0(self): """ - return self.get_input("input__0") + return self._get_tensor_prop("input__0") @property def seq_ids(self): @@ -336,4 +185,4 @@ def seq_ids(self): """ - return self.get_input("seq_ids") + return self._get_tensor_prop("seq_ids") diff --git a/morpheus/messages/multi_message.py b/morpheus/messages/multi_message.py index d9e944e309..f49f45d499 100644 --- a/morpheus/messages/multi_message.py +++ b/morpheus/messages/multi_message.py @@ -215,7 +215,7 @@ def copy_meta_ranges(self, return df.loc[mask, :] - def copy_ranges(self, ranges: typing.List[typing.Tuple[int, int]], num_selected_rows: int = None): + def copy_ranges(self, ranges: typing.List[typing.Tuple[int, int]]): """ Perform a copy of the current message instance for the given `ranges` of rows. @@ -225,16 +225,9 @@ def copy_ranges(self, ranges: typing.List[typing.Tuple[int, int]], num_selected_ Rows to include in the copy in the form of `[(`start_row`, `stop_row`),...]` The `stop_row` isn't included. For example to copy rows 1-2 & 5-7 `ranges=[(1, 3), (5, 8)]` - num_selected_rows : typing.Union[None, int] - Optional specify the number of rows selected by `ranges`, otherwise this is computed by the result. - Returns ------- `MultiMessage` """ sliced_rows = self.copy_meta_ranges(ranges) - - if num_selected_rows is None: - num_selected_rows = len(sliced_rows) - - return MultiMessage(meta=MessageMeta(sliced_rows), mess_offset=0, mess_count=num_selected_rows) + return MultiMessage(meta=MessageMeta(sliced_rows), mess_offset=0, mess_count=len(sliced_rows)) diff --git a/morpheus/messages/multi_response_message.py b/morpheus/messages/multi_response_message.py index be8008b827..5b363c2733 100644 --- a/morpheus/messages/multi_response_message.py +++ b/morpheus/messages/multi_response_message.py @@ -16,149 +16,20 @@ import dataclasses import typing -import cupy as cp - import morpheus._lib.messages as _messages -from morpheus.messages.data_class_prop import DataClassProp -from morpheus.messages.message_meta import MessageMeta -from morpheus.messages.multi_message import MultiMessage -from morpheus.messages.tensor_memory import TensorMemory - - -def get_output(instance: "ResponseMemory", name: str): - """ - Getter function used with DataClassProp for getting inference output from message containers derived - from ResponseMemory. - - Parameters - ---------- - instance : `ResponseMemory` - Message container holding tensors. - name : str - Key used to do lookup in tensors dict of message container. - - Returns - ------- - cupy.ndarray - Tensors corresponding to name. - - Raises - ------ - AttributeError - If output name does not exist in message container. - - """ - - if (name not in instance.tensors): - raise AttributeError - - return instance.tensors[name] - - -def set_output(instance: "ResponseMemory", name: str, value): - """ - Setter function used with DataClassProp for setting output in message containers derived - from ResponseMemory. - - Parameters - ---------- - instance : `ResponseMemory` - Message container holding tensors. - name : str - Key used to do lookup in tensors dict of message container. - value : cupy.ndarray - Value to set for input. - """ - - # Ensure that we have 2D array here (`ensure_2d` inserts the wrong axis) - instance.tensors[name] = value if value.ndim == 2 else cp.reshape(value, (value.shape[0], -1)) - - -@dataclasses.dataclass -class ResponseMemory(TensorMemory, cpp_class=_messages.ResponseMemory): - """Output memory block holding the results of inference.""" - - def get_output(self, name: str): - """ - Return the output tensor specified by `name`. - - Parameters - ---------- - name : str - Name of output tensor. - - Returns - ------- - cupy.ndarray - Tensor corresponding to name. - """ - return self.tensors[name] +from morpheus.messages.multi_tensor_message import MultiTensorMessage @dataclasses.dataclass -class ResponseMemoryProbs(ResponseMemory, cpp_class=_messages.ResponseMemoryProbs): - """ - Subclass of `ResponseMemory` containng an output tensor named 'probs'. - - Parameters - ---------- - probs : cupy.ndarray - Probabilities tensor - """ - probs: dataclasses.InitVar[cp.ndarray] = DataClassProp(get_output, set_output) - - def __post_init__(self, probs): - self.probs = probs - - -@dataclasses.dataclass -class ResponseMemoryAE(ResponseMemory, cpp_class=None): - """ - Subclass of `ResponseMemory` specific to the AutoEncoder pipeline. - - Parameters - ---------- - probs : cupy.ndarray - Probabilities tensor - - user_id : str - User id the inference was performed against. - - explain_df : pd.Dataframe - Explainability Dataframe, for each feature a column will exist with a name in the form of: `{feature}_z_loss` - containing the loss z-score along with `max_abs_z` and `mean_abs_z` columns - """ - probs: dataclasses.InitVar[cp.ndarray] = DataClassProp(get_output, set_output) - user_id = "" - explain_df = None - - def __post_init__(self, probs): - self.probs = probs - - -@dataclasses.dataclass -class MultiResponseMessage(MultiMessage, cpp_class=_messages.MultiResponseMessage): +class MultiResponseMessage(MultiTensorMessage, cpp_class=_messages.MultiResponseMessage): """ This class contains several inference responses as well as the cooresponding message metadata. - - Parameters - ---------- - memory : `TensorMemory` - This is a response container instance for triton inference requests. - offset : int - Offset of each response message into the `TensorMemory` block. - count : int - Inference results size of all responses. - """ - memory: TensorMemory = dataclasses.field(repr=False) - offset: int - count: int @property def outputs(self): """ - Get outputs stored in the TensorMemory container. + Get outputs stored in the ResponseMemory container. Alias for `MultiResponseMessage.tensors`. Returns ------- @@ -166,21 +37,11 @@ def outputs(self): Inference outputs. """ - - return {key: self.get_output(key) for key in self.memory.tensors.keys()} - - def __getattr__(self, name: str) -> typing.Any: - - output_val = self.memory.tensors.get(name, None) - - if (output_val is not None): - return output_val[self.offset:self.offset + self.count, :] - - raise AttributeError + return self.tensors def get_output(self, name: str): """ - Get output stored in the TensorMemory container. + Get output stored in the ResponseMemory container. Alias for `MultiResponseMessage.get_tensor`. Parameters ---------- @@ -193,12 +54,12 @@ def get_output(self, name: str): Inference output. """ - - return self.memory.tensors[name][self.offset:self.offset + self.count, :] + return self.get_tensor(name) def copy_output_ranges(self, ranges, mask=None): """ Perform a copy of the underlying output tensors for the given `ranges` of rows. + Alias for `MultiResponseMessage.copy_output_ranges` Parameters ---------- @@ -215,14 +76,9 @@ def copy_output_ranges(self, ranges, mask=None): ------- typing.Dict[str, cupy.ndarray] """ - if mask is None: - mask = self._ranges_to_mask(self.get_meta(), ranges=ranges) - - # The outputs property method returns a copy with the offsets applied - outputs = self.outputs - return {key: output[mask] for (key, output) in outputs.items()} + return self.copy_tensor_ranges(ranges, mask=mask) - def copy_ranges(self, ranges): + def copy_ranges(self, ranges: typing.List[typing.Tuple[int, int]]): """ Perform a copy of the current message, dataframe and tensors for the given `ranges` of rows. @@ -235,15 +91,13 @@ def copy_ranges(self, ranges): ------- `MultiResponseMessage` """ - mask = self._ranges_to_mask(self.get_meta(), ranges) - sliced_rows = self.copy_meta_ranges(ranges, mask=mask) - sliced_count = len(sliced_rows) - sliced_outputs = self.copy_output_ranges(ranges, mask=mask) - - mem = TensorMemory(count=sliced_count) - mem.outputs = sliced_outputs - - return MultiResponseMessage(MessageMeta(sliced_rows), 0, sliced_count, mem, 0, sliced_count) + m = super().copy_ranges(ranges) + return MultiResponseMessage(meta=m.meta, + mess_offset=m.mess_offset, + mess_count=m.mess_count, + memory=m.memory, + offset=m.offset, + count=m.mess_count) def get_slice(self, start, stop): """ @@ -292,7 +146,7 @@ def probs(self): """ - return self.get_output("probs") + return self._get_tensor_prop("probs") @dataclasses.dataclass @@ -304,7 +158,7 @@ class MultiResponseAEMessage(MultiResponseProbsMessage, cpp_class=None): user_id: str = None - def copy_ranges(self, ranges): + def copy_ranges(self, ranges: typing.List[typing.Tuple[int, int]]): """ Perform a copy of the current message, dataframe and tensors for the given `ranges` of rows. diff --git a/morpheus/messages/multi_tensor_message.py b/morpheus/messages/multi_tensor_message.py new file mode 100644 index 0000000000..2da7799479 --- /dev/null +++ b/morpheus/messages/multi_tensor_message.py @@ -0,0 +1,176 @@ +# SPDX-FileCopyrightText: Copyright (c) 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import dataclasses +import typing + +import morpheus._lib.messages as _messages +from morpheus.messages.memory.tensor_memory import TensorMemory +from morpheus.messages.message_meta import MessageMeta +from morpheus.messages.multi_message import MultiMessage + + +@dataclasses.dataclass +class MultiTensorMessage(MultiMessage, cpp_class=_messages.MultiTensorMessage): + """ + This class contains several inference responses as well as the cooresponding message metadata. + + Parameters + ---------- + memory : `TensorMemory` + Container holding generic tensor data in cupy arrays + offset : int + Offset of each message into the `TensorMemory` block. + count : int + Number of rows in the `TensorMemory` block. + + """ + memory: TensorMemory = dataclasses.field(repr=False) + offset: int + count: int + + @property + def tensors(self): + """ + Get tensors stored in the TensorMemory container sliced according to `offset` and `count`. + + Returns + ------- + cupy.ndarray + Inference tensors. + + """ + tensors = self.memory.get_tensors() + return {key: self.get_tensor(key) for key in tensors.keys()} + + def __getattr__(self, name: str) -> typing.Any: + return self._get_tensor_prop(name) + + def get_tensor(self, name: str): + """ + Get tensor stored in the TensorMemory container. + + Parameters + ---------- + name : str + tensor key name. + + Returns + ------- + cupy.ndarray + Inference tensor. + + """ + return self.memory.get_tensor(name)[self.offset:self.offset + self.count, :] + + def _get_tensor_prop(self, name: str): + """ + This method is intended to be used by propery methods in subclasses + + Parameters + ---------- + name : str + Tensor key name. + + Returns + ------- + cupy.ndarray + Tensor. + + Raises + ------ + AttributeError + If tensor name does not exist in the container. + """ + try: + return self.get_tensor(name) + except KeyError: + raise AttributeError(f'No attribute named "{name}" exists') + + def copy_tensor_ranges(self, ranges, mask=None): + """ + Perform a copy of the underlying tensor tensors for the given `ranges` of rows. + + Parameters + ---------- + ranges : typing.List[typing.Tuple[int, int]] + Rows to include in the copy in the form of `[(`start_row`, `stop_row`),...]` + The `stop_row` isn't included. For example to copy rows 1-2 & 5-7 `ranges=[(1, 3), (5, 8)]` + + mask : typing.Union[None, cupy.ndarray, numpy.ndarray] + Optionally specify rows as a cupy array (when using cudf Dataframes) or a numpy array (when using pandas + Dataframes) of booleans. When not-None `ranges` will be ignored. This is useful as an optimization as this + avoids needing to generate the mask on it's own. + + Returns + ------- + typing.Dict[str, cupy.ndarray] + """ + if mask is None: + mask = self._ranges_to_mask(self.get_meta(), ranges=ranges) + + # The tensors property method returns a copy with the offsets applied + tensors = self.tensors + return {key: tensor[mask] for (key, tensor) in tensors.items()} + + def copy_ranges(self, ranges: typing.List[typing.Tuple[int, int]]): + """ + Perform a copy of the current message, dataframe and tensors for the given `ranges` of rows. + + Parameters + ---------- + ranges : typing.List[typing.Tuple[int, int]] + Rows to include in the copy in the form of `[(`start_row`, `stop_row`),...]` + The `stop_row` isn't included. For example to copy rows 1-2 & 5-7 `ranges=[(1, 3), (5, 8)]` + + ------- + `MultiTensorMessage` + """ + mask = self._ranges_to_mask(self.get_meta(), ranges) + sliced_rows = self.copy_meta_ranges(ranges, mask=mask) + sliced_count = len(sliced_rows) + sliced_tensors = self.copy_tensor_ranges(ranges, mask=mask) + + mem = TensorMemory(count=sliced_count) + mem.tensors = sliced_tensors + + return MultiTensorMessage(MessageMeta(sliced_rows), 0, sliced_count, mem, 0, sliced_count) + + def get_slice(self, start, stop): + """ + Perform a slice of the current message from `start`:`stop` (excluding `stop`) + + For example to slice from rows 1-3 use `m.get_slice(1, 4)`. The returned `MultiTensorMessage` will contain + references to the same underlying Dataframe and tensor tensors, and this calling this method is reletively low + cost compared to `MultiTensorMessage.copy_ranges` + + Parameters + ---------- + start : int + Starting row of the slice + + stop : int + Stop of the slice + + ------- + `MultiTensorMessage` + """ + mess_count = stop - start + return MultiTensorMessage(meta=self.meta, + mess_offset=self.mess_offset + start, + mess_count=mess_count, + memory=self.memory, + offset=self.offset + start, + count=mess_count) diff --git a/morpheus/messages/tensor_memory.py b/morpheus/messages/tensor_memory.py deleted file mode 100644 index 8049380b9f..0000000000 --- a/morpheus/messages/tensor_memory.py +++ /dev/null @@ -1,41 +0,0 @@ -# SPDX-FileCopyrightText: Copyright (c) 2022-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. -# SPDX-License-Identifier: Apache-2.0 -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import dataclasses -import typing - -import cupy as cp - -import morpheus._lib.messages as _messages -from morpheus.messages.message_base import MessageData - - -@dataclasses.dataclass -class TensorMemory(MessageData, cpp_class=_messages.TensorMemory): - """ - This is a base container class for data that will be used for inference stages. This class is designed to - hold generic tensor data in cupy arrays. - - Parameters - ---------- - count : int - Number of inference inputs. - inputs : typing.Dict[str, cupy.ndarray] - Inference inputs to model. - - """ - count: int - - tensors: typing.Dict[str, cp.ndarray] = dataclasses.field(default_factory=dict, init=False) diff --git a/tests/conftest.py b/tests/conftest.py index 75da6637fc..54ef7bdfe3 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -377,6 +377,7 @@ def _camouflage_is_running(): # Actually launch camoflague if launch_camouflage: + popen = None try: popen = subprocess.Popen(["camouflage", "--config", "config.yml"], cwd=root_dir, @@ -402,20 +403,20 @@ def _camouflage_is_running(): logger.exception("Error launching camouflage") raise finally: - - logger.info("Killing camouflage with pid {}".format(popen.pid)) - - elapsed_time = 0.0 - sleep_time = 0.1 - stopped = False - - # It takes a little while to shutdown - while not stopped and elapsed_time < shutdown_timeout: - popen.kill() - stopped = (popen.poll() is not None) - if not stopped: - time.sleep(sleep_time) - elapsed_time += sleep_time + if popen is not None: + logger.info("Killing camouflage with pid {}".format(popen.pid)) + + elapsed_time = 0.0 + sleep_time = 0.1 + stopped = False + + # It takes a little while to shutdown + while not stopped and elapsed_time < shutdown_timeout: + popen.kill() + stopped = (popen.poll() is not None) + if not stopped: + time.sleep(sleep_time) + elapsed_time += sleep_time else: diff --git a/tests/test_inference_stage.py b/tests/test_inference_stage.py index e8b0f1a299..eb1dffb33f 100755 --- a/tests/test_inference_stage.py +++ b/tests/test_inference_stage.py @@ -288,7 +288,7 @@ def test_convert_response_errors(): mm2.count.side_effect = [2, 1] mm2.mess_count.side_effect = [2, 1, 1] - pytest.raises(AssertionError, inference_stage.InferenceStage._convert_response, ([mm1, mm2], [out_msg1, out_msg2])) + pytest.raises(ValueError, inference_stage.InferenceStage._convert_response, ([mm1, mm2], [out_msg1, out_msg2])) @pytest.mark.use_python @@ -303,17 +303,16 @@ def test_convert_one_response(config): assert mpm.meta == inf.meta assert mpm.mess_offset == 0 assert mpm.mess_count == 1 - assert mpm.memory == mem assert mpm.offset == 0 assert mpm.count == 1 assert mem.get_output('probs').tolist() == [[1.0, 2.0, 3.0]] # Test for the second branch - inf.mess_count = 2 + inf.count = 2 inf.seq_ids = cp.array([[0], [1]]) - res = ResponseMemoryProbs(count=1, probs=cp.array([[0, 0.6, 0.7], [5.6, 4.4, 9.2]])) + res = ResponseMemoryProbs(count=2, probs=cp.array([[0, 0.6, 0.7], [5.6, 4.4, 9.2]])) - mem = ResponseMemoryProbs(1, probs=cp.array([[0.1, 0.5, 0.8], [4.5, 6.7, 8.9]])) + mem = ResponseMemoryProbs(2, probs=cp.array([[0.1, 0.5, 0.8], [4.5, 6.7, 8.9]])) mpm = inference_stage.InferenceStage._convert_one_response(mem, inf, res) assert mem.get_output('probs').tolist() == [[0.1, 0.6, 0.8], [5.6, 6.7, 9.2]] diff --git a/tests/test_messages.py b/tests/test_messages.py index 73e9c92777..6c82a37dcb 100644 --- a/tests/test_messages.py +++ b/tests/test_messages.py @@ -22,6 +22,7 @@ import morpheus._lib.messages as _messages import morpheus.config from morpheus import messages +from morpheus.messages.memory import tensor_memory def check_message(python_type: type, cpp_type: type, should_be_cpp: bool, no_cpp_class: bool, args: tuple): @@ -49,10 +50,8 @@ def check_all_messages(should_be_cpp: bool, no_cpp_class: bool): check_message(messages.MultiMessage, _messages.MultiMessage, should_be_cpp, no_cpp_class, (None, 0, 1)) - assert messages.InferenceMemory._cpp_class is None if no_cpp_class else _messages.InferenceMemory - # C++ impl for InferenceMemory doesn't have a constructor - if (should_be_cpp): - pytest.raises(TypeError, messages.InferenceMemory, 1) + check_message(tensor_memory.TensorMemory, _messages.TensorMemory, should_be_cpp, no_cpp_class, (1, )) + check_message(messages.InferenceMemory, _messages.InferenceMemory, should_be_cpp, no_cpp_class, (1, )) cp_array = cp.zeros((1, 2)) @@ -69,6 +68,11 @@ def check_all_messages(should_be_cpp: bool, no_cpp_class: bool): # No C++ impl, should always get the Python class check_message(messages.InferenceMemoryAE, None, should_be_cpp, no_cpp_class, (1, cp_array, cp_array)) + check_message(messages.MultiTensorMessage, + _messages.MultiTensorMessage, + should_be_cpp, + no_cpp_class, (None, 0, 1, None, 0, 1)) + check_message(messages.MultiInferenceMessage, _messages.MultiInferenceMessage, should_be_cpp, @@ -84,10 +88,7 @@ def check_all_messages(should_be_cpp: bool, no_cpp_class: bool): should_be_cpp, no_cpp_class, (None, 0, 1, None, 0, 1)) - assert messages.ResponseMemory._cpp_class is None if no_cpp_class else _messages.ResponseMemory - # C++ impl doesn't have a constructor - if (should_be_cpp): - pytest.raises(TypeError, messages.ResponseMemory, 1) + check_message(messages.ResponseMemory, _messages.ResponseMemory, should_be_cpp, no_cpp_class, (1, )) check_message(messages.ResponseMemoryProbs, _messages.ResponseMemoryProbs, diff --git a/tests/test_tensor_memory.py b/tests/test_tensor_memory.py new file mode 100644 index 0000000000..51e8232990 --- /dev/null +++ b/tests/test_tensor_memory.py @@ -0,0 +1,174 @@ +#!/usr/bin/env python +# SPDX-FileCopyrightText: Copyright (c) 2022-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import string + +import cupy as cp +import numpy as np +import pytest + +from morpheus._lib.common import FileTypes +from morpheus.io.deserializers import read_file_to_df +from morpheus.messages.memory.inference_memory import InferenceMemory +from morpheus.messages.memory.inference_memory import InferenceMemoryAE +from morpheus.messages.memory.inference_memory import InferenceMemoryFIL +from morpheus.messages.memory.inference_memory import InferenceMemoryNLP +from morpheus.messages.memory.response_memory import ResponseMemory +from morpheus.messages.memory.response_memory import ResponseMemoryAE +from morpheus.messages.memory.response_memory import ResponseMemoryProbs +from morpheus.messages.memory.tensor_memory import TensorMemory +from utils import TEST_DIRS + +INPUT_FILE = os.path.join(TEST_DIRS.tests_data_dir, 'filter_probs.csv') + + +def compare_tensors(t1, t2): + assert sorted(t1.keys()) == sorted(t2.keys()) + for (k, v1) in t1.items(): + assert (v1 == t2[k]).all() + + +def check_tensor_memory(cls, count, tensors): + other_tensors = {'ones': cp.ones(count), 'zeros': cp.zeros(count)} + + m = cls(count) + assert m.count == count + assert m.get_tensors() == {} + + m.set_tensors(tensors) + compare_tensors(m.get_tensors(), tensors) + + m.set_tensors(other_tensors) + compare_tensors(m.get_tensors(), other_tensors) + + m = cls(count, tensors) + assert m.count == count + compare_tensors(m.get_tensors(), tensors) + + m.set_tensors(other_tensors) + compare_tensors(m.get_tensors(), other_tensors) + + +def test_tensor_memory(config): + test_data = cp.array(np.loadtxt(INPUT_FILE, delimiter=",", skiprows=1)) + count = test_data.shape[0] + + # TensorMemory expects a dictionary of { : } + # Convert each column into a 1d cupy array + tensors = {} + for col in range(test_data.shape[1]): + tensors[string.ascii_lowercase[col]] = cp.array(test_data[:, col]) + + for cls in (TensorMemory, InferenceMemory, ResponseMemory): + check_tensor_memory(cls, count, tensors) + + +@pytest.mark.use_python +def test_inference_memory_ae(config): + test_data = cp.array(np.loadtxt(INPUT_FILE, delimiter=",", skiprows=1)) + count = test_data.shape[0] + + input = cp.array(test_data[:, 0]) + seq_ids = cp.array(test_data[:, 1]) + m = InferenceMemoryAE(count, input=input, seq_ids=seq_ids) + + assert m.count == count + compare_tensors(m.get_tensors(), {'input': input, 'seq_ids': seq_ids}) + assert (m.input == input).all() + assert (m.seq_ids == seq_ids).all() + + +def test_inference_memory_fil(config): + test_data = cp.array(np.loadtxt(INPUT_FILE, delimiter=",", skiprows=1)) + count = test_data.shape[0] + + input_0 = cp.array(test_data[:, 0]) + seq_ids = cp.array(test_data[:, 1]) + m = InferenceMemoryFIL(count, input__0=input_0, seq_ids=seq_ids) + + assert m.count == count + compare_tensors(m.get_tensors(), {'input__0': input_0, 'seq_ids': seq_ids}) + assert (m.input__0 == input_0).all() + assert (m.seq_ids == seq_ids).all() + + +def test_inference_memory_nlp(config): + test_data = cp.array(np.loadtxt(INPUT_FILE, delimiter=",", skiprows=1)) + count = test_data.shape[0] + + input_ids = cp.array(test_data[:, 0]) + input_mask = cp.array(test_data[:, 1]) + seq_ids = cp.array(test_data[:, 2]) + m = InferenceMemoryNLP(count, input_ids=input_ids, input_mask=input_mask, seq_ids=seq_ids) + + assert m.count == count + compare_tensors(m.get_tensors(), {'input_ids': input_ids, 'input_mask': input_mask, 'seq_ids': seq_ids}) + assert (m.input_ids == input_ids).all() + assert (m.input_mask == input_mask).all() + assert (m.seq_ids == seq_ids).all() + + +def check_response_memory_probs_and_ae(cls): + test_data = cp.array(np.loadtxt(INPUT_FILE, delimiter=",", skiprows=1)) + count = test_data.shape[0] + + m = cls(count=count, probs=test_data) + assert m.count == count + compare_tensors(m.get_tensors(), {'probs': test_data}) + assert (m.probs == test_data).all() + return m + + +@pytest.mark.use_python +def test_response_memory_ae(config): + m = check_response_memory_probs_and_ae(ResponseMemoryAE) + + assert m.user_id == "" + assert m.explain_df is None + + df = read_file_to_df(INPUT_FILE, file_type=FileTypes.Auto, df_type='pandas') + m.user_id = "testy" + m.explain_df = df + + assert m.user_id == "testy" + assert (m.explain_df.values == df.values).all() + + +def test_response_memory_probs(config): + check_response_memory_probs_and_ae(ResponseMemoryProbs) + + +@pytest.mark.parametrize("tensor_cls", [TensorMemory, InferenceMemory, ResponseMemory]) +def test_constructor_length_error(config, tensor_cls): + count = 10 + tensors = {"a": cp.zeros(count), "b": cp.ones(count)} + pytest.raises(ValueError, tensor_cls, count - 1, tensors) + + +@pytest.mark.parametrize("tensor_cls", [TensorMemory, InferenceMemory, ResponseMemory]) +def test_set_tensor_length_error(config, tensor_cls): + count = 10 + m = tensor_cls(count) + pytest.raises(ValueError, m.set_tensor, 'a', cp.zeros(count + 1)) + + +@pytest.mark.parametrize("tensor_cls", [TensorMemory, InferenceMemory, ResponseMemory]) +def test_set_tensors_length_error(config, tensor_cls): + count = 10 + tensors = {"a": cp.zeros(count), "b": cp.ones(count)} + m = tensor_cls(count + 1) + pytest.raises(ValueError, m.set_tensors, tensors)