Skip to content

Commit

Permalink
Update ControlMessage to hold arbitrary Python objects & update `Me…
Browse files Browse the repository at this point in the history
…ssageMeta` to copy & slice (#1637)

- For ControlMessage, use a specialization of `nlohmann::basic_json` to hold arbitrary Python objects for `m_metadata` & `m_tasks`

- Implement dataframe slicing method for `MessageMeta` which is equivalent to copy_meta_ranges() in MultiMessage. The method will do copy & slicing to the original dataframe, instead of sharing the ownership.

## By Submitting this PR I confirm:
- I am familiar with the [Contributing Guidelines](https://github.com/nv-morpheus/Morpheus/blob/main/docs/source/developer_guide/contributing.md).
- When the PR is ready for review, new or existing tests cover these changes.
- When the PR is ready for review, the documentation is up to date with these changes.

Authors:
  - Yuchen Zhang (https://github.com/yuchenz427)

Approvers:
  - Michael Demoret (https://github.com/mdemoret-nv)

URL: #1637
  • Loading branch information
yczhang-nv authored May 9, 2024
1 parent 266612e commit 26c95e1
Show file tree
Hide file tree
Showing 16 changed files with 733 additions and 184 deletions.
1 change: 1 addition & 0 deletions morpheus/_lib/cmake/libmorpheus.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ add_library(morpheus
src/utilities/cudf_util.cpp
src/utilities/cupy_util.cpp
src/utilities/http_server.cpp
src/utilities/json_types.cpp
src/utilities/matx_util.cu
src/utilities/python_util.cpp
src/utilities/string_util.cpp
Expand Down
87 changes: 21 additions & 66 deletions morpheus/_lib/include/morpheus/messages/control.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@

#pragma once

#include "morpheus/export.h" // for exporting symbols
#include "morpheus/messages/meta.hpp" // for MessageMeta
#include "morpheus/export.h" // for MORPHEUS_EXPORT
#include "morpheus/messages/meta.hpp" // for MessageMeta
#include "morpheus/utilities/json_types.hpp" // for json_t

#include <nlohmann/json.hpp> // for json, basic_json
#include <pybind11/pytypes.h> // for object, dict, list, none
#include <pybind11/pytypes.h> // for object, dict, list

#include <chrono> // for system_clock, time_point
#include <map> // for map
Expand Down Expand Up @@ -177,28 +177,28 @@ class MORPHEUS_EXPORT ControlMessage
{
public:
ControlMessage();
explicit ControlMessage(const nlohmann::json& config);
explicit ControlMessage(const morpheus::utilities::json_t& config);

ControlMessage(const ControlMessage& other); // Copies config and metadata, but not payload

/**
* @brief Set the configuration object for the control message.
* @param config A json object containing configuration information.
* @param config A morpheus::utilities::json_t object containing configuration information.
*/
void config(const nlohmann::json& config);
void config(const morpheus::utilities::json_t& config);

/**
* @brief Get the configuration object for the control message.
* @return A const reference to the json object containing configuration information.
* @return A const reference to the morpheus::utilities::json_t object containing configuration information.
*/
[[nodiscard]] const nlohmann::json& config() const;
[[nodiscard]] const morpheus::utilities::json_t& config() const;

/**
* @brief Add a task of the given type to the control message.
* @param task_type A string indicating the type of the task.
* @param task A json object describing the task.
* @param task A morpheus::utilities::json_t object describing the task.
*/
void add_task(const std::string& task_type, const nlohmann::json& task);
void add_task(const std::string& task_type, const morpheus::utilities::json_t& task);

/**
* @brief Check if a task of the given type exists in the control message.
Expand All @@ -210,21 +210,21 @@ class MORPHEUS_EXPORT ControlMessage
/**
* @brief Remove and return a task of the given type from the control message.
* @param task_type A string indicating the type of the task.
* @return A json object describing the task.
* @return A morpheus::utilities::json_t object describing the task.
*/
nlohmann::json remove_task(const std::string& task_type);
morpheus::utilities::json_t remove_task(const std::string& task_type);

/**
* @brief Get the tasks for the control message.
*/
[[nodiscard]] const nlohmann::json& get_tasks() const;
[[nodiscard]] const morpheus::utilities::json_t& get_tasks() const;

/**
* @brief Add a key-value pair to the metadata for the control message.
* @param key A string key for the metadata value.
* @param value A json object describing the metadata value.
* @param value A morpheus::utilities::json_t object describing the metadata value.
*/
void set_metadata(const std::string& key, const nlohmann::json& value);
void set_metadata(const std::string& key, const morpheus::utilities::json_t& value);

/**
* @brief Check if a metadata key exists in the control message.
Expand All @@ -236,7 +236,7 @@ class MORPHEUS_EXPORT ControlMessage
/**
* @brief Get the metadata for the control message.
*/
[[nodiscard]] nlohmann::json get_metadata() const;
[[nodiscard]] morpheus::utilities::json_t get_metadata() const;

/**
* @brief Get the metadata value for the given key from the control message.
Expand All @@ -245,9 +245,9 @@ class MORPHEUS_EXPORT ControlMessage
* @param key A string indicating the metadata key.
* @param fail_on_nonexist If true, throws an exception when the key does not exist.
* If false, returns std::nullopt for non-existing keys.
* @return An optional json object describing the metadata value if it exists.
* @return An optional morpheus::utilities::json_t object describing the metadata value if it exists.
*/
[[nodiscard]] nlohmann::json get_metadata(const std::string& key, bool fail_on_nonexist = false) const;
[[nodiscard]] morpheus::utilities::json_t get_metadata(const std::string& key, bool fail_on_nonexist = false) const;

/**
* @brief Lists all metadata keys currently stored in the control message.
Expand Down Expand Up @@ -372,8 +372,8 @@ class MORPHEUS_EXPORT ControlMessage
std::shared_ptr<MessageMeta> m_payload{nullptr};
std::shared_ptr<TensorMemory> m_tensors{nullptr};

nlohmann::json m_tasks{};
nlohmann::json m_config{};
morpheus::utilities::json_t m_tasks{};
morpheus::utilities::json_t m_config{};

std::map<std::string, time_point_t> m_timestamps{};
};
Expand Down Expand Up @@ -401,51 +401,6 @@ struct MORPHEUS_EXPORT ControlMessageProxy
*/
static std::shared_ptr<ControlMessage> copy(ControlMessage& self);

/**
* @brief Retrieves the configuration of the ControlMessage as a dictionary.
* @param self Reference to the underlying ControlMessage object.
* @return A pybind11::dict representing the ControlMessage's configuration.
*/
static pybind11::dict config(ControlMessage& self);

/**
* @brief Updates the configuration of the ControlMessage from a dictionary.
* @param self Reference to the underlying ControlMessage object.
* @param config A pybind11::dict representing the new configuration.
*/
static void config(ControlMessage& self, pybind11::dict& config);

/**
* @brief Adds a task to the ControlMessage.
* @param self Reference to the underlying ControlMessage object.
* @param type The type of the task to be added.
* @param task A pybind11::dict representing the task to be added.
*/
static void add_task(ControlMessage& self, const std::string& type, pybind11::dict& task);

/**
* @brief Removes and returns a task of the given type from the ControlMessage.
* @param self Reference to the underlying ControlMessage object.
* @param type The type of the task to be removed.
* @return A pybind11::dict representing the removed task.
*/
static pybind11::dict remove_task(ControlMessage& self, const std::string& type);

/**
* @brief Retrieves all tasks from the ControlMessage.
* @param self Reference to the underlying ControlMessage object.
* @return A pybind11::dict containing all tasks.
*/
static pybind11::dict get_tasks(ControlMessage& self);

/**
* @brief Sets a metadata key-value pair.
* @param self Reference to the underlying ControlMessage object.
* @param key The key for the metadata entry.
* @param value The value for the metadata entry, must be JSON serializable.
*/
static void set_metadata(ControlMessage& self, const std::string& key, pybind11::object& value);

/**
* @brief Retrieves a metadata value by key, with an optional default value.
*
Expand Down
34 changes: 34 additions & 0 deletions morpheus/_lib/include/morpheus/messages/meta.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,23 @@ class MORPHEUS_EXPORT MessageMeta
*/
virtual std::optional<std::string> ensure_sliceable_index();

/**
* @brief Creates a deep copy of DataFrame with the specified ranges.
*
* @param ranges the tensor index ranges to copy
* @return std::shared_ptr<MessageMeta> the deep copy of the specified ranges
*/
virtual std::shared_ptr<MessageMeta> copy_ranges(const std::vector<RangeType>& ranges) const;

/**
* @brief Get a slice of the underlying DataFrame by creating a deep copy
*
* @param start the tensor index of the start of the copy
* @param stop the tensor index of the end of the copy
* @return std::shared_ptr<MessageMeta> the deep copy of the speicifed slice
*/
virtual std::shared_ptr<MessageMeta> get_slice(TensorIndex start, TensorIndex stop) const;

/**
* @brief Create MessageMeta cpp object from a python object
*
Expand Down Expand Up @@ -297,6 +314,23 @@ struct MORPHEUS_EXPORT MessageMetaInterfaceProxy
* @return std::string The name of the column with the old index or nullopt if no changes were made.
*/
static std::optional<std::string> ensure_sliceable_index(MessageMeta& self);

/**
* @brief Creates a deep copy of DataFrame with the specified ranges.
*
* @param ranges the tensor index ranges to copy
* @return std::shared_ptr<MessageMeta> the deep copy of the specified ranges
*/
static std::shared_ptr<MessageMeta> copy_ranges(MessageMeta& self, const std::vector<RangeType>& ranges);

/**
* @brief Get a slice of the underlying DataFrame by creating a deep copy
*
* @param start the tensor index of the start of the copy
* @param stop the tensor index of the end of the copy
* @return std::shared_ptr<MessageMeta> the deep copy of the speicifed slice
*/
static std::shared_ptr<MessageMeta> get_slice(MessageMeta& self, TensorIndex start, TensorIndex stop);
};
/** @} */ // end of group
} // namespace morpheus
136 changes: 136 additions & 0 deletions morpheus/_lib/include/morpheus/pybind11/json.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,5 +166,141 @@ struct type_caster<nlohmann::json_list>
}
};

template <>
struct type_caster<morpheus::utilities::json_t>
{
public:
/**
* This macro establishes a local variable 'value' of type morpheus::utilities::json_t
*/
PYBIND11_TYPE_CASTER(morpheus::utilities::json_t, _("object"));

/**
* Conversion part 1 (Python->C++): convert a PyObject into an morpheus::utilities::json_t
* instance or return false upon failure. The second argument
* indicates whether implicit conversions should be applied.
*/
bool load(handle src, bool convert)
{
if (!src)
{
return false;
}

if (src.is_none())
{
value = morpheus::utilities::json_t(nullptr);
}
else
{
value = morpheus::utilities::cast_from_pyobject(pybind11::reinterpret_borrow<pybind11::object>(src));
}

return true;
}

/**
* Conversion part 2 (C++ -> Python): convert an morpheus::utilities::json_t instance into
* a Python object. The second and third arguments are used to
* indicate the return value policy and parent object (for
* ``return_value_policy::reference_internal``) and are generally
* ignored by implicit casters.
*/
static handle cast(morpheus::utilities::json_t src, return_value_policy policy, handle parent)
{
return morpheus::utilities::cast_from_json(src).release();
}
};

template <>
struct type_caster<morpheus::utilities::json_dict_t>
{
public:
/**
* This macro establishes a local variable 'value' of type morpheus::utilities::json_t_dict
*/
PYBIND11_TYPE_CASTER(morpheus::utilities::json_dict_t, _("dict[str, typing.Any]"));

/**
* Conversion part 1 (Python->C++): convert a PyObject into an morpheus::utilities::json_t_dict
* instance or return false upon failure. The second argument
* indicates whether implicit conversions should be applied.
*/
bool load(handle src, bool convert)
{
if (!src || src.is_none())
{
return false;
}

if (!PyDict_Check(src.ptr()))
{
return false;
}

value = static_cast<const morpheus::utilities::json_dict_t>(
morpheus::utilities::cast_from_pyobject(pybind11::reinterpret_borrow<pybind11::object>(src)));

return true;
}

/**
* Conversion part 2 (C++ -> Python): convert an morpheus::utilities::json_t_dict instance into
* a Python object. The second and third arguments are used to
* indicate the return value policy and parent object (for
* ``return_value_policy::reference_internal``) and are generally
* ignored by implicit casters.
*/
static handle cast(morpheus::utilities::json_dict_t src, return_value_policy policy, handle parent)
{
return morpheus::utilities::cast_from_json(src).release();
}
};

template <>
struct type_caster<morpheus::utilities::json_list_t>
{
public:
/**
* This macro establishes a local variable 'value' of type morpheus::utilities::json_t_list
*/
PYBIND11_TYPE_CASTER(morpheus::utilities::json_list_t, _("list[typing.Any]"));

/**
* Conversion part 1 (Python->C++): convert a PyObject into an morpheus::utilities::json_t_list
* instance or return false upon failure. The second argument
* indicates whether implicit conversions should be applied.
*/
bool load(handle src, bool convert)
{
if (!src || src.is_none())
{
return false;
}

if (!PyList_Check(src.ptr()))
{
return false;
}

value = static_cast<const morpheus::utilities::json_list_t>(
morpheus::utilities::cast_from_pyobject(pybind11::reinterpret_borrow<pybind11::object>(src)));

return true;
}

/**
* Conversion part 2 (C++ -> Python): convert an morpheus::utilities::json_t_list instance into
* a Python object. The second and third arguments are used to
* indicate the return value policy and parent object (for
* ``return_value_policy::reference_internal``) and are generally
* ignored by implicit casters.
*/
static handle cast(morpheus::utilities::json_list_t src, return_value_policy policy, handle parent)
{
return morpheus::utilities::cast_from_json(src).release();
}
};

} // namespace detail
} // namespace PYBIND11_NAMESPACE
Loading

0 comments on commit 26c95e1

Please sign in to comment.