Skip to content

WIP: AIRO-1243 Make Unity ROS Service Async #189

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -185,10 +185,14 @@ public void Subscribe<T>(string topic, Action<T> callback) where T : Message
string rosMessageName = MessageRegistry.GetRosMessageName<T>();
AddSubscriberInternal(topic, rosMessageName, (Message msg) =>
{
if (msg.RosMessageName != rosMessageName)
Debug.LogError($"Subscriber to '{topic}' expected '{rosMessageName}' but received '{msg.RosMessageName}'!?");
else
if (msg.RosMessageName == rosMessageName)
{
callback((T)msg);
}
else
{
Debug.LogError($"Subscriber to '{topic}' expected '{rosMessageName}' but received '{msg.RosMessageName}'!?");
}
});
}

Expand Down Expand Up @@ -224,7 +228,7 @@ void AddSubscriberInternal(string topic, string rosMessageName, Action<Message>
}

// Implement a service in Unity
public void ImplementService<TRequest>(string topic, Func<TRequest, Message> callback)
public void ImplementService<TRequest>(string topic, Func<TRequest, Message> callback, int? queueSize = null)
where TRequest : Message
{
string rosMessageName = rosMessageName = MessageRegistry.GetRosMessageName<TRequest>();
Expand All @@ -235,7 +239,8 @@ public void ImplementService<TRequest>(string topic, Func<TRequest, Message> cal
info = AddTopic(topic, rosMessageName);
}

info.ImplementService((Message msg) => callback((TRequest)msg));
int resolvedQueueSize = queueSize.GetValueOrDefault(k_DefaultPublisherQueueSize);
info.ImplementService((Message msg) => callback((TRequest)msg), resolvedQueueSize);
}

// Send a request to a ros service
Expand Down Expand Up @@ -267,8 +272,8 @@ public void ImplementService<TRequest>(string topic, Func<TRequest, Message> cal
m_ServicesWaiting.Add(srvID, pauser);
}

SendSysCommand(SysCommand.k_SysCommand_ServiceRequest, new SysCommand_Service { srv_id = srvID });
Publish(rosServiceName, serviceRequest);
RosTopicState topicState = GetOrCreateTopic(rosServiceName, serviceRequest.RosMessageName);
topicState.SendServiceRequest(serviceRequest, srvID);

byte[] rawResponse = (byte[])await pauser.PauseUntilResumed();

Expand All @@ -293,39 +298,33 @@ public void RegisterSubscriber(string topic, string rosMessageName)
{
}

public RosPublisher RegisterPublisher<T>(string rosTopicName,
public RosTopicState RegisterPublisher<T>(string rosTopicName,
int? queue_size = null, bool? latch = null) where T : Message
{
return RegisterPublisher(rosTopicName, MessageRegistry.GetRosMessageName<T>(), queue_size, latch);
}

public RosPublisher RegisterPublisher(string rosTopicName, string rosMessageName,
int? queue_size = null, bool? latch = null)
public RosTopicState RegisterPublisher(string rosTopicName, string rosMessageName,
int? queueSize = null, bool? latch = null)
{
RosTopicState topicState = GetOrCreateTopic(rosTopicName, rosMessageName);
if (topicState.Publisher != null)
{
Debug.LogWarning($"Publisher for topic {rosTopicName} registered twice!");
return topicState.Publisher;
}
else
{
//Create a new publisher.
int resolvedQueueSize = queue_size.GetValueOrDefault(k_DefaultPublisherQueueSize);
bool resolvedLatch = latch.GetValueOrDefault(k_DefaultPublisherLatch);
return topicState.CreatePublisher(resolvedQueueSize, resolvedLatch);
}
//Create a new publisher.
int resolvedQueueSize = queueSize.GetValueOrDefault(k_DefaultPublisherQueueSize);
bool resolvedLatch = latch.GetValueOrDefault(k_DefaultPublisherLatch);
topicState.RegisterPublisher(resolvedQueueSize, resolvedLatch);
return topicState;
}

public void RegisterRosService<TRequest, TResponse>(string topic) where TRequest : Message where TResponse : Message
{
RegisterRosService(topic, MessageRegistry.GetRosMessageName<TRequest>(), MessageRegistry.GetRosMessageName<TResponse>());
}

public void RegisterRosService(string topic, string requestMessageName, string responseMessageName)
public void RegisterRosService(string topic, string requestMessageName, string responseMessageName, int? queueSize = null)
{
RosTopicState info = GetOrCreateTopic(topic, requestMessageName);
info.RegisterRosService(responseMessageName);
int resolvedQueueSize = queueSize.GetValueOrDefault(k_DefaultPublisherQueueSize);
info.RegisterRosService(responseMessageName, resolvedQueueSize);
}

[Obsolete("Calling ImplementUnityService now implicitly registers it")]
Expand Down Expand Up @@ -375,6 +374,28 @@ public void SendUnityServiceUnregistration(string topic, NetworkStream stream =
{
m_Self.SendSysCommand(SysCommand.k_SysCommand_RemoveUnityService, new SysCommand_Topic { topic = topic }, stream);
}

public void SendUnityServiceResponse(int serviceId, NetworkStream stream = null)
{
m_Self.SendSysCommand(SysCommand.k_SysCommand_ServiceResponse, new SysCommand_Service { srv_id = serviceId }, stream);
}

public void SendPublisherRegistration(string topic, string message_name, int queueSize, bool latch, NetworkStream stream = null)
{
m_Self.SendSysCommand(SysCommand.k_SysCommand_Publish,
new SysCommand_PublisherRegistration { topic = topic, message_name = message_name, queue_size = queueSize, latch = latch }
);
}

public void SendServiceRequest(int serviceId)
{
m_Self.SendSysCommand(SysCommand.k_SysCommand_ServiceRequest, new SysCommand_Service { srv_id = serviceId });
}

public void AddSenderToQueue(OutgoingMessageSender sender)
{
m_Self.m_OutgoingMessageQueue.Enqueue(sender);
}
}

static ROSConnection _instance;
Expand Down Expand Up @@ -588,16 +609,12 @@ void ReceiveSysCommand(string topic, string json)
m_SpecialIncomingMessageHandler = null;

RosTopicState topicState = GetTopic(serviceTopic);
if (topicState == null || !topicState.IsUnityService)
if (topicState == null)
{
Debug.LogError($"Unity service {serviceTopic} has not been implemented!");
return;
}
Message responseMessage = topicState.HandleUnityServiceRequest(requestBytes);

// send the response message back
SendSysCommand(SysCommand.k_SysCommand_ServiceResponse, new SysCommand_Service { srv_id = serviceCommand.srv_id });
Publish(serviceTopic, responseMessage);
topicState.HandleUnityServiceRequest(requestBytes, serviceCommand.srv_id);
};
}
break;
Expand Down Expand Up @@ -704,7 +721,6 @@ static async Task ConnectionThread(
float waitingSinceRealTime = s_RealTimeSinceStartup;
while (true)
{

bool messageReadyEventWasSet = outgoingQueue.NewMessageReadyToSendEvent.WaitOne(sleepMilliseconds);
token.ThrowIfCancellationRequested();

Expand Down Expand Up @@ -900,19 +916,16 @@ public void Publish(string rosTopicName, Message message)
throw new Exception($"No registered publisher on topic {rosTopicName} for type {message.RosMessageName}!");
}

m_LastMessageSentRealtime = Time.realtimeSinceStartup;
rosTopic.OnMessageSent(message);
rosTopic.Publisher.PublishInternal(message);
m_OutgoingMessageQueue.Enqueue(rosTopic.Publisher);
rosTopic.Publish(message);
}
}

public T GetFromPool<T>(string rosTopicName) where T : Message
{
RosTopicState topicState = GetTopic(rosTopicName);
if (topicState != null && topicState.Publisher != null)
if (topicState != null)
{
return (T)topicState.Publisher.GetMessageFromPool();
return (T)topicState.GetMessageFromPool();
}
throw new Exception($"No publisher on topic {rosTopicName} of type {MessageRegistry.GetRosMessageName<T>()} to get pooled messages from!");
}
Expand Down
Loading