Skip to content
This repository was archived by the owner on Aug 11, 2023. It is now read-only.

Commit 03225d1

Browse files
author
Juan Ignacio Ubeira
committed
Adding the capability to remove a messageListener from a subscriber.
1 parent deaeafa commit 03225d1

File tree

5 files changed

+71
-4
lines changed

5 files changed

+71
-4
lines changed

rosjava/src/main/java/org/ros/concurrent/ListenerGroup.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package org.ros.concurrent;
1818

19+
import com.google.common.base.Preconditions;
1920
import com.google.common.collect.Lists;
2021

2122
import java.util.Collection;
@@ -107,11 +108,10 @@ public Collection<EventDispatcher<T>> addAll(Collection<T> listeners) {
107108
* @param listener the listener to remove
108109
* @return flag indicating successful removal
109110
*/
110-
public boolean remove(T listener)
111-
{
111+
public boolean remove(T listener) {
112+
Preconditions.checkNotNull(listener);
112113
for (EventDispatcher<T> eventDispatcher : eventDispatchers) {
113-
if(listener.equals(eventDispatcher.getListener()))
114-
{
114+
if (listener.equals(eventDispatcher.getListener())) {
115115
eventDispatcher.cancel();
116116
eventDispatchers.remove(eventDispatcher);
117117
return true;

rosjava/src/main/java/org/ros/internal/node/topic/DefaultSubscriber.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,16 @@ public void addMessageListener(MessageListener<T> messageListener) {
139139
addMessageListener(messageListener, 1);
140140
}
141141

142+
@Override
143+
public boolean removeMessageListener(MessageListener<T> messageListener) {
144+
return incomingMessageQueue.removeListener(messageListener);
145+
}
146+
147+
@Override
148+
public void removeAllMessageListeners() {
149+
incomingMessageQueue.removeAllListeners();
150+
}
151+
142152
@VisibleForTesting
143153
public void addPublisher(PublisherIdentifier publisherIdentifier, InetSocketAddress address) {
144154
synchronized (mutex) {

rosjava/src/main/java/org/ros/internal/transport/queue/IncomingMessageQueue.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,20 @@ public void addListener(final MessageListener<T> messageListener, int queueCapac
7171
messageDispatcher.addListener(messageListener, queueCapacity);
7272
}
7373

74+
/**
75+
* @see MessageDispatcher#removeListener(MessageListener)
76+
*/
77+
public boolean removeListener(MessageListener<T> messageListener) {
78+
return messageDispatcher.removeListener(messageListener);
79+
}
80+
81+
/**
82+
* @see MessageDispatcher#removeAllListeners()
83+
*/
84+
public void removeAllListeners() {
85+
messageDispatcher.removeAllListeners();
86+
}
87+
7488
public void shutdown() {
7589
messageDispatcher.cancel();
7690
}

rosjava/src/main/java/org/ros/internal/transport/queue/MessageDispatcher.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,37 @@ public void addListener(MessageListener<T> messageListener, int limit) {
7979
}
8080
}
8181

82+
/**
83+
* Removes the specified {@link MessageListener} from the internal
84+
* {@link ListenerGroup}.
85+
* @param messageListener {@link MessageListener} to remove.
86+
* @return True if the listener was removed, false if it wasn't registered before.
87+
*
88+
* @see ListenerGroup#remove(Object)
89+
*/
90+
public boolean removeListener(MessageListener<T> messageListener) {
91+
if (DEBUG) {
92+
log.info("Removing listener.");
93+
}
94+
synchronized (mutex) {
95+
return messageListeners.remove(messageListener);
96+
}
97+
}
98+
99+
/**
100+
* Removes all the registered {@link MessageListener}s.
101+
*
102+
* @see ListenerGroup#shutdown()
103+
*/
104+
public void removeAllListeners() {
105+
if (DEBUG) {
106+
log.info("Removing all listeners.");
107+
}
108+
synchronized (mutex) {
109+
messageListeners.shutdown();
110+
}
111+
}
112+
82113
/**
83114
* Returns a newly allocated {@link SignalRunnable} for the specified
84115
* {@link LazyMessage}.

rosjava/src/main/java/org/ros/node/topic/Subscriber.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,18 @@ public interface Subscriber<T> extends TopicParticipant {
6161
*/
6262
void addMessageListener(MessageListener<T> messageListener);
6363

64+
/**
65+
* Removes a previously added {@link MessageListener}.
66+
* @param messageListener {@link MessageListener} to remove.
67+
* @return True if the listener was removed, false if it wasn't registered before.
68+
*/
69+
boolean removeMessageListener(MessageListener<T> messageListener);
70+
71+
/**
72+
* Removes all registered {@link MessageListener}s.
73+
*/
74+
void removeAllMessageListeners();
75+
6476
/**
6577
* Shuts down and unregisters the {@link Subscriber}. using the default
6678
* timeout Shutdown is delayed by at most the specified timeout to allow

0 commit comments

Comments
 (0)