Skip to content
This repository was archived by the owner on Aug 11, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions rosjava/src/main/java/org/ros/internal/node/DefaultNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import org.ros.node.topic.DefaultSubscriberListener;
import org.ros.node.topic.Publisher;
import org.ros.node.topic.Subscriber;
import org.ros.node.topic.TransportHints;
import org.ros.time.ClockTopicTimeProvider;
import org.ros.time.TimeProvider;

Expand Down Expand Up @@ -283,7 +284,7 @@ public <T> Publisher<T> newPublisher(GraphName topicName, String messageType) {
TopicDescription topicDescription =
nodeConfiguration.getTopicDescriptionFactory().newFromType(messageType);
TopicDeclaration topicDeclaration =
TopicDeclaration.newFromTopicName(resolvedTopicName, topicDescription);
TopicDeclaration.newFromTopicName(resolvedTopicName, topicDescription, null);
org.ros.message.MessageSerializer<T> serializer = newMessageSerializer(messageType);
return publisherFactory.newOrExisting(topicDeclaration, serializer);
}
Expand All @@ -295,19 +296,29 @@ public <T> Publisher<T> newPublisher(String topicName, String messageType) {

@Override
public <T> Subscriber<T> newSubscriber(GraphName topicName, String messageType) {
return newSubscriber(topicName, messageType, null);
}

@Override
public <T> Subscriber<T> newSubscriber(GraphName topicName, String messageType, TransportHints transportHints) {
GraphName resolvedTopicName = resolveName(topicName);
TopicDescription topicDescription =
nodeConfiguration.getTopicDescriptionFactory().newFromType(messageType);
TopicDeclaration topicDeclaration =
TopicDeclaration.newFromTopicName(resolvedTopicName, topicDescription);
TopicDeclaration.newFromTopicName(resolvedTopicName, topicDescription, transportHints);
MessageDeserializer<T> deserializer = newMessageDeserializer(messageType);
Subscriber<T> subscriber = subscriberFactory.newOrExisting(topicDeclaration, deserializer);
return subscriber;
}

@Override
public <T> Subscriber<T> newSubscriber(String topicName, String messageType) {
return newSubscriber(GraphName.of(topicName), messageType);
return newSubscriber(GraphName.of(topicName), messageType, null);
}

@Override
public <T> Subscriber<T> newSubscriber(String topicName, String messageType, TransportHints transportHints) {
return newSubscriber(GraphName.of(topicName), messageType, transportHints);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public List<TopicDeclaration> newFromValue(Object value) {
String name = (String) ((Object[]) topic)[0];
String type = (String) ((Object[]) topic)[1];
descriptions.add(TopicDeclaration.newFromTopicName(GraphName.of(name), new TopicDescription(type, null,
null)));
null), null));
}
return descriptions;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.ros.internal.transport.ConnectionHeader;
import org.ros.internal.transport.ConnectionHeaderFields;
import org.ros.namespace.GraphName;
import org.ros.node.topic.TransportHints;

import java.util.List;
import java.util.Map;
Expand All @@ -36,6 +37,7 @@ public class TopicDeclaration {

private final TopicIdentifier topicIdentifier;
private final TopicDescription topicDescription;
private final TransportHints transportHints;

/**
* @param header
Expand All @@ -49,19 +51,26 @@ public static TopicDeclaration newFromHeader(Map<String, String> header) {
String definition = header.get(ConnectionHeaderFields.MESSAGE_DEFINITION);
String md5Checksum = header.get(ConnectionHeaderFields.MD5_CHECKSUM);
TopicDescription topicDescription = new TopicDescription(type, definition, md5Checksum);
return new TopicDeclaration(new TopicIdentifier(name), topicDescription);
boolean tcpNoDelay = "1".equals(header.get(ConnectionHeaderFields.TCP_NODELAY));
return new TopicDeclaration(new TopicIdentifier(name), topicDescription, new TransportHints(tcpNoDelay));
}

public static TopicDeclaration newFromTopicName(GraphName topicName,
TopicDescription topicDescription) {
return new TopicDeclaration(new TopicIdentifier(topicName), topicDescription);
TopicDescription topicDescription, TransportHints transportHints) {
return new TopicDeclaration(new TopicIdentifier(topicName), topicDescription, transportHints);
}

public TopicDeclaration(TopicIdentifier topicIdentifier, TopicDescription topicDescription) {
public TopicDeclaration(TopicIdentifier topicIdentifier, TopicDescription topicDescription, TransportHints transportHints) {
Preconditions.checkNotNull(topicIdentifier);
Preconditions.checkNotNull(topicDescription);
this.topicIdentifier = topicIdentifier;
this.topicDescription = topicDescription;

if (transportHints != null) {
this.transportHints = transportHints;
} else {
this.transportHints = new TransportHints();
}
}

public TopicIdentifier getIdentifier() {
Expand All @@ -84,6 +93,7 @@ public ConnectionHeader toConnectionHeader() {
topicDescription.getDefinition());
connectionHeader.addField(ConnectionHeaderFields.MD5_CHECKSUM,
topicDescription.getMd5Checksum());
connectionHeader.addField(ConnectionHeaderFields.TCP_NODELAY, transportHints.getTcpNoDelay() ? "1" : "0");
return connectionHeader;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ private void handleSubscriberHandshake(ChannelHandlerContext ctx, MessageEvent e
DefaultPublisher<?> publisher = topicParticipantManager.getPublisher(topicName);
ChannelBuffer outgoingBuffer = publisher.finishHandshake(incomingConnectionHeader);
Channel channel = ctx.getChannel();
if (incomingConnectionHeader.hasField(ConnectionHeaderFields.TCP_NODELAY)) {
boolean tcpNoDelay = "1".equals(incomingConnectionHeader.getField(ConnectionHeaderFields.TCP_NODELAY));
channel.getConfig().setOption("tcpNoDelay", tcpNoDelay);
}
ChannelFuture future = channel.write(outgoingBuffer).await();
if (!future.isSuccess()) {
throw new RosRuntimeException(future.getCause());
Expand Down
19 changes: 19 additions & 0 deletions rosjava/src/main/java/org/ros/node/ConnectedNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.ros.node.service.ServiceServer;
import org.ros.node.topic.Publisher;
import org.ros.node.topic.Subscriber;
import org.ros.node.topic.TransportHints;

import java.net.URI;

Expand Down Expand Up @@ -82,11 +83,29 @@ public interface ConnectedNode extends Node {
*/
<T> Subscriber<T> newSubscriber(GraphName topicName, String messageType);

/**
* @param <T>
* the message type to create the {@link Subscriber} for
* @param topicName
* the topic name to be subscribed to, this will be auto resolved
* @param messageType
* the message data type (e.g. "std_msgs/String")
* @param transportHints
* the transport hints
* @return a {@link Subscriber} for the specified topic
*/
<T> Subscriber<T> newSubscriber(GraphName topicName, String messageType, TransportHints transportHints);

/**
* @see #newSubscriber(GraphName, String)
*/
<T> Subscriber<T> newSubscriber(String topicName, String messageType);

/**
* @see #newSubscriber(GraphName, String, TransportHints)
*/
<T> Subscriber<T> newSubscriber(String topicName, String messageType, TransportHints transportHints);

/**
* Create a new {@link ServiceServer}.
*
Expand Down
43 changes: 43 additions & 0 deletions rosjava/src/main/java/org/ros/node/topic/TransportHints.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package org.ros.node.topic;

import java.util.Map;

import org.ros.internal.transport.ConnectionHeaderFields;
import org.ros.node.ConnectedNode;

import com.google.common.collect.Maps;


/**
* Provides a way of specifying network transport hints to
* {@link ConnectedNode#newSubscriber(String, String)} and
* {@link ConnectedNode#newSubscriber(org.ros.namespace.GraphName, String)}.
*
* @author stefan.glaser@hs-offenburg.de (Stefan Glaser)
*/
public class TransportHints {

Map<String, String> options;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add private access specifier.


public TransportHints() {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: please indent functions and code with two spaces (same applies for next line).

this.options = Maps.newConcurrentMap();
}

public TransportHints(boolean tcpNoDelay) {
tcpNoDelay(tcpNoDelay);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This constructor yields a NullPointerException when called, as the default constructor is not called and therefore the map is not created.

Please consider either:

  • Calling the default constructor first.
  • Moving the initialization of the map to an instance initializer.
  • Initialize options inline (i.e. Map<String, String> options = Maps.newConcurrentMap<String, String>();).

I know in Java 7 it's not necessary to specify <String, String> twice, but please do so so as to make it easier to compile rosjava for older Java versions.

}

public TransportHints tcpNoDelay(boolean nodelay) {
options.put(ConnectionHeaderFields.TCP_NODELAY, nodelay ? "1" : "0");

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: remove newline.

return this;
}

public boolean getTcpNoDelay() {
return "1".equals(options.get(ConnectionHeaderFields.TCP_NODELAY));
}

public Map<String, String> getOptions() {
return options;
}
}