Skip to content

[FEA]: Create Dynamic Batcher node type #480

Open
@mdemoret-nv

Description

Is this a new feature, an improvement, or a change to existing functionality?

New Feature

How would you describe the priority of this feature request

High

Please provide a clear description of problem this feature solves

Often its better to work on multiple messages at the same time for efficiency in the pipeline. Simple nodes could help group individual messages into groups for processing.

Describe your ideal solution

Create a new node type (similar to the Broadcast node) which can buffer upstream messages, of type T, into a vector of downstream buffers, of type std::vector<T>. The downstream buffer should be emitted when either:

  1. The maximum timeout duration has been exceeded
  2. The maximum buffer size has been reached

Before either of these conditions have been met, the node should hold onto messages in memory.

For example, if the upstream emitted 3 messages all in a row and the max size was set to 2 and duration 100ms, the output should be:

Input:
Emit 1
Emit 2
Emit 3

Output
Emit [1, 2] at 0 ms
Emit [3] at 100 ms

Describe any alternatives you have considered

No response

Additional context

Scaffolding class:

template <typename T>
class DynamicBatcher : public mrc::node::WritableProvider<T>,
                     public mrc::node::ReadableAcceptor<T>,
                     public mrc::node::SinkChannelOwner<T>,
                     public mrc::node::WritableAcceptor<std::vector<T>>,
                     public mrc::node::ReadableProvider<std::vector<T>>,
                     public mrc::node::SourceChannelOwner<std::vector<T>>,
                     public mrc::runnable::RunnableWithContext<>
{
    using state_t  = mrc::runnable::Runnable::State;
    using input_t  = T;
    using output_t = std::vector<T>;

  public:
    DynamicBatcher(size_t max_count)
    {
        // Set the default channel
        mrc::node::SinkChannelOwner<input_t>::set_channel(std::make_unique<mrc::channel::BufferedChannel<input_t>>());
        mrc::node::SourceChannelOwner<output_t>::set_channel(
            std::make_unique<mrc::channel::BufferedChannel<output_t>>());
    }
    ~DynamicBatcher() override = default;

  private:
    /**
     * @brief Runnable's entrypoint.
     */
    void run(mrc::runnable::Context& ctx) override
    {
        T input_data;
        auto status = this->get_readable_edge()->await_read(input_data);










        // Only drop the output edges if we are rank 0
        if (ctx.rank() == 0)
        {
            // Need to drop the output edges
            mrc::node::SourceProperties<output_t>::release_edge_connection();
            mrc::node::SinkProperties<T>::release_edge_connection();
        }
    }

    /**
     * @brief Runnable's state control, for stopping from MRC.
     */
    void on_state_update(const state_t& state) final;

    std::stop_source m_stop_source;
};

Code of Conduct

  • I agree to follow MRC's Code of Conduct
  • I have searched the open feature requests and have found no duplicates for this feature request

Activity

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Metadata

Metadata

Assignees

Labels

Type

No type

Projects

  • Status

    Todo

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions