Skip to content

Conversation

@unbearables
Copy link
Contributor

It would be nice to have StreamListenerContainer.getStreamName() method to fulfill need to dynamically stop/start listeners in reaction of broker events.

Let's say I need to react to RabbitMQ broker events of queue.created / queue.deleted (see ~ name of queue is sent in event) and I need to start and stop listeners, which are assigned to these streams. Currently I need to create an extra map of listenerId to streamName OR include streamName in listenerId (e.g. listener:{streamName}) to be able to carry our operations on listener.

A simple getter for streamName on StreamListenerContainer would allow me to react to these changes dynamically without extra code effort.

An concise example with BrokerEventListener:

private final RabbitListenerEndpointRegistry listenerRegistry;

@Bean
public BrokerEventListener eventListener(ConnectionFactory connectionFactory) {
    return new BrokerEventListener(connectionFactory, "queue.created", "queue.deleted");
}

@EventListener(condition = "event.eventType == 'queue.created'")
public void onQueueCreation(BrokerEvent event) {
    if (!event.getEventProperties().get("type").equals("rabbit_stream_queue")) {
        return; // handle only streams
    }
    final String createdStream = (String) event.getEventProperties().get("name");
    listenerRegistry.getListenerContainers()
        .stream()
        .filter(StreamListenerContainer.class::isInstance)
        .map(StreamListenerContainer.class::cast)
        .filter(listener -> listener.getStreamName().equals(createdStream))
        // .filter(listener -> listener.getListenerId().equals("listener:" + streamName)) // depends on listenerId without 'getStreamName'
        .forEach(listener -> listener.start());
}

Signed-off-by: David Horak <horak.david@protonmail.com>
* Get a name of stream this listener listens to.
*/
public String getStreamName() {
return streamName;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Must be with this..
Also add @since 3.2.3 to this method JavaDocs and your name to the @author list.
Thanks.

Signed-off-by: David Horak <horak.david@protonmail.com>
Signed-off-by: Artem Bilan <artem.bilan@broadcom.com>
@artembilan artembilan merged commit 0729a57 into spring-projects:main Feb 10, 2025
3 checks passed
@artembilan
Copy link
Member

@unbearables ,
Thank you for contribution; looking forward for more!

artembilan pushed a commit that referenced this pull request Feb 10, 2025
It would be nice to have `StreamListenerContainer.getStreamName()` method to fulfill need to dynamically stop/start listeners in reaction of broker events.

Let's say I need to react to RabbitMQ broker events of queue.created / queue.deleted and I need to start and stop listeners, which are assigned to these streams.
Currently I need to create an extra map of `listenerId` to `streamName` OR include `streamName` in `listenerId` (e.g. `listener:{streamName}`) to be able to carry our operations on listener.

A simple getter for `streamName` on the `StreamListenerContainer` would allow me to react to these changes dynamically without extra code effort.

Signed-off-by: David Horak <horak.david@protonmail.com>

[artem.bilan@broadcom.com: Fix JavaDoc for a new method. Fix Commit message]

Fixes: #2961

Signed-off-by: Artem Bilan <artem.bilan@broadcom.com>
# Conflicts:
#	spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/listener/StreamListenerContainer.java
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants