Skip to content

Commit

Permalink
Added support for a publisher list (i.e. for mDNS) (#29)
Browse files Browse the repository at this point in the history
- Added API to initialize a Subscriber session with a list of possible addresses
- Added API to get that list again
- Added API to retrieve the connected publisher and the list of possible publishers
  • Loading branch information
FlorianReimold authored Feb 26, 2025
1 parent 9ccbd4a commit 4e1dd32
Show file tree
Hide file tree
Showing 20 changed files with 288 additions and 114 deletions.
4 changes: 1 addition & 3 deletions tcp_pubsub/include/tcp_pubsub/callback_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,8 @@

#pragma once

#include <vector>
#include <chrono>
#include <memory>
#include <cstdint>
#include <vector>

#include <tcp_pubsub/tcp_pubsub_version.h> // IWYU pragma: keep

Expand Down
3 changes: 0 additions & 3 deletions tcp_pubsub/include/tcp_pubsub/executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,6 @@

#include <cstddef>
#include <memory>
#include <string>

#include <cstdint>

#include "tcp_pubsub_logger.h"

Expand Down
2 changes: 0 additions & 2 deletions tcp_pubsub/include/tcp_pubsub/publisher.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@

#include <cstddef>
#include <cstdint>

#include <chrono>
#include <memory>
#include <string>
#include <utility>
Expand Down
41 changes: 38 additions & 3 deletions tcp_pubsub/include/tcp_pubsub/subscriber.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,18 @@
#pragma once

#include <cstdint>

#include <chrono>
#include <functional>
#include <memory>
#include <string>
#include <utility>
#include <vector>

#include "executor.h"
#include "subscriber_session.h"
#include "callback_data.h"

#include <tcp_pubsub/tcp_pubsub_export.h>
#include <tcp_pubsub/tcp_pubsub_version.h> // IWYU pragma: keep
#include <vector>

namespace tcp_pubsub
{
Expand Down Expand Up @@ -111,6 +110,42 @@ namespace tcp_pubsub
*/
TCP_PUBSUB_EXPORT std::shared_ptr<SubscriberSession> addSession(const std::string& address, uint16_t port, int max_reconnection_attempts = -1);

/**
* @brief Add a new connection to a publisher
*
* Adds a new connection to a publisher. In cases where it is not clear how
* the publisher can be reached (e.g. multiple IP addresses), you can
* provide a list of pairs of address and port. The Subscriber will try to
* connect to all of them. The first one that works will be used. This can
* e.g. be used to connect both to "HOSTNAME" and "HOSTNAME.local" (i.e. the
* mDNS variant) or to an IPv4 and an IPv6 address. The server list must
* have at least one entry.
*
* By default, a Session will try to reconnect, when anything failes. How
* often this shall happen until the Session will delete itself from the
* Subscriber can be controlled by the max_reconnection_attempts parameter.
* A negative value will cause the Session to retry indefinitively, so you
* will probably have to write your own algorithm code that cancels Sessions
* that will not recover any more.
* Between reconnection attemps the Session will wait 1 second.
*
* Even though it may usually not make sense, you can add multiple
* sessions to a single publisher.
*
* This function is thread-safe.
*
* @param[in] publisher_list
* A list of [address, port] pairs. The first pair that works
* will be used. The list must have at least one entry.
*
* @param[in] max_reconnection_attempts
* How often the Session will try to reconnect in case of an
* issue. A negative value means infinite reconnection attemps.
*
* @return A shared pointer to the session. You don't need to store it.
*/
TCP_PUBSUB_EXPORT std::shared_ptr<SubscriberSession> addSession(const std::vector<std::pair<std::string, uint16_t>>& publisher_list, int max_reconnection_attempts = -1);

/**
* @brief Get a list of all Sessions.
*
Expand Down
21 changes: 14 additions & 7 deletions tcp_pubsub/include/tcp_pubsub/subscriber_session.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@

#pragma once

#include <cstdint>
#include <memory>
#include <string>
#include <cstdint>
#include <utility>
#include <vector>

#include <tcp_pubsub/tcp_pubsub_version.h> // IWYU pragma: keep
#include <tcp_pubsub/tcp_pubsub_export.h>
Expand Down Expand Up @@ -48,18 +50,23 @@ namespace tcp_pubsub

public:
/**
* @brief Get the address used when creating the Session
* @brief Get the list of publishers used for creating the session.
*
* The session will try to connect to the publishers in the order they are
* present in that list. If a connection fails, it will try the next one.
*
* For checking which publisher is currently connected, use getConnectedPublisher().
*
* @return The address / hostname of this Session
* @return The list of publishers used for creating the session
*/
TCP_PUBSUB_EXPORT std::string getAddress() const;
TCP_PUBSUB_EXPORT std::vector<std::pair<std::string, uint16_t>> getPublisherList() const;

/**
* @brief Get the port used when creating the Session.
* @brief Get the currently connected publisher address
*
* @return The port this Session is connecting to
* @return The address and port of the currently connected publisher
*/
TCP_PUBSUB_EXPORT uint16_t getPort() const;
TCP_PUBSUB_EXPORT std::pair<std::string, uint16_t> getConnectedPublisher() const;

/**
* @brief Cancels the Session
Expand Down
4 changes: 1 addition & 3 deletions tcp_pubsub/include/tcp_pubsub/tcp_pubsub_logger.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,9 @@

#pragma once

#include <string>
#include <functional>
#include <iostream>

#include <cstdint>
#include <string>

#include <tcp_pubsub/tcp_pubsub_version.h> // IWYU pragma: keep

Expand Down
3 changes: 2 additions & 1 deletion tcp_pubsub/src/executor.cpp
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
// Copyright (c) Continental. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for details.

#include <tcp_pubsub/executor.h>

#include <cstddef>
#include <memory>
#include <tcp_pubsub/executor.h>

#include "executor_impl.h"
#include "tcp_pubsub/tcp_pubsub_logger.h"
Expand Down
8 changes: 6 additions & 2 deletions tcp_pubsub/src/executor_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,18 @@
// Licensed under the MIT license. See LICENSE file in the project root for details.

#include "executor_impl.h"
#include "tcp_pubsub/tcp_pubsub_logger.h"
#include "tcp_pubsub_logger_abstraction.h"

#include <cstddef>
#include <memory>
#include <sstream>
#include <string>
#include <thread>

#include <asio.hpp>

#include "tcp_pubsub/tcp_pubsub_logger.h"
#include "tcp_pubsub_logger_abstraction.h"

namespace tcp_pubsub
{
Executor_Impl::Executor_Impl(const logger::logger_t& log_function)
Expand Down
4 changes: 2 additions & 2 deletions tcp_pubsub/src/publisher.cpp
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
// Copyright (c) Continental. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for details.

#include <tcp_pubsub/publisher.h>

#include <cstddef>
#include <cstdint>
#include <memory>
#include <string>
#include <utility>
#include <vector>

#include <tcp_pubsub/publisher.h>
#include <tcp_pubsub/executor.h>

#include "executor_impl.h"
#include "publisher_impl.h"

namespace tcp_pubsub
Expand Down
23 changes: 13 additions & 10 deletions tcp_pubsub/src/publisher_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,13 @@

#include "publisher_impl.h"

#include "tcp_header.h"
#include "publisher_session.h"
#include "portable_endian.h"

#include "executor_impl.h"
#include "tcp_pubsub/tcp_pubsub_logger.h"
#include "tcp_pubsub_logger_abstraction.h"
#include <algorithm>
#include <asio.hpp>
#include <cstddef>
#include <cstdint>
#include <cstring>
#include <functional>
#include <ios>
#include <tcp_pubsub/executor.h>

#include <cstddef>
#include <memory>
#include <mutex>
#include <sstream>
Expand All @@ -24,6 +18,15 @@
#include <utility>
#include <vector>

#include "executor_impl.h" // IWYU pragma: keep
#include "portable_endian.h"
#include "publisher_session.h"
#include "tcp_header.h"
#include "tcp_pubsub/tcp_pubsub_logger.h"
#include "tcp_pubsub_logger_abstraction.h"
#include <tcp_pubsub/executor.h>


namespace tcp_pubsub
{

Expand Down
5 changes: 3 additions & 2 deletions tcp_pubsub/src/publisher_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@
#include <thread>
#include <vector>

#include "tcp_header.h"
#include "portable_endian.h"
#include <asio.hpp>

#include "portable_endian.h"
#include "protocol_handshake_message.h"
#include "tcp_header.h"
#include "tcp_pubsub/tcp_pubsub_logger.h"
#include "tcp_pubsub_logger_abstraction.h"

Expand Down
3 changes: 1 addition & 2 deletions tcp_pubsub/src/publisher_session.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,10 @@

#pragma once

#include <asio.hpp>
#include <atomic>
#include <cstdint>
#include <functional>

#include <asio.hpp>
#include <memory>
#include <mutex>
#include <string>
Expand Down
13 changes: 9 additions & 4 deletions tcp_pubsub/src/subscriber.cpp
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
// Copyright (c) Continental. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for details.

#include <tcp_pubsub/subscriber.h>

#include <cstdint>
#include <functional>
#include <memory>
#include <string>
#include <tcp_pubsub/subscriber.h>

#include <tcp_pubsub/executor.h>
#include <utility>
#include <vector>

#include "subscriber_impl.h"
#include "tcp_pubsub/callback_data.h"
#include "tcp_pubsub/subscriber_session.h"
#include <tcp_pubsub/executor.h>

namespace tcp_pubsub
{
Expand All @@ -26,7 +27,11 @@ namespace tcp_pubsub
}

std::shared_ptr<SubscriberSession> Subscriber::addSession(const std::string& address, uint16_t port, int max_reconnection_attempts)
{ return subscriber_impl_->addSession(address, port, max_reconnection_attempts); }
{ return subscriber_impl_->addSession(std::vector<std::pair<std::string, uint16_t>>{{address, port}}, max_reconnection_attempts); }

std::shared_ptr<SubscriberSession> Subscriber::addSession(const std::vector<std::pair<std::string, uint16_t>>& publisher_list, int max_reconnection_attempts)
{ return subscriber_impl_->addSession(publisher_list, max_reconnection_attempts); }


std::vector<std::shared_ptr<SubscriberSession>> Subscriber::getSessions() const
{ return subscriber_impl_->getSessions(); }
Expand Down
34 changes: 21 additions & 13 deletions tcp_pubsub/src/subscriber_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,6 @@

#include "subscriber_impl.h"

#include "executor_impl.h"
#include "portable_endian.h"
#include "subscriber_session_impl.h"
#include "tcp_header.h"
#include "tcp_pubsub/callback_data.h"
#include "tcp_pubsub/executor.h"
#include "tcp_pubsub/subscriber_session.h"
#include "tcp_pubsub/tcp_pubsub_logger.h"
#include "tcp_pubsub_logger_abstraction.h"
#include <algorithm>
#include <cstdint>
#include <functional>
Expand All @@ -24,6 +15,15 @@
#include <utility>
#include <vector>

#include "executor_impl.h" // IWYU pragma: keep
#include "subscriber_session_impl.h"
#include "tcp_header.h"
#include "tcp_pubsub/callback_data.h"
#include "tcp_pubsub/executor.h"
#include "tcp_pubsub/subscriber_session.h"
#include "tcp_pubsub/tcp_pubsub_logger.h"
#include "tcp_pubsub_logger_abstraction.h"

namespace tcp_pubsub
{
////////////////////////////////////////////////
Expand Down Expand Up @@ -55,10 +55,19 @@ namespace tcp_pubsub
////////////////////////////////////////////////
// Session Management
////////////////////////////////////////////////
std::shared_ptr<SubscriberSession> Subscriber_Impl::addSession(const std::string& address, uint16_t port, int max_reconnection_attempts)
std::shared_ptr<SubscriberSession> Subscriber_Impl::addSession(const std::vector<std::pair<std::string, uint16_t>>& publisher_list, int max_reconnection_attempts)
{
#if (TCP_PUBSUB_LOG_DEBUG_VERBOSE_ENABLED)
log_(logger::LogLevel::DebugVerbose, "Subscriber " + subscriberIdString() + ": Adding session for endpoint " + address + ":" + std::to_string(port) + ".");
// Create a list of all publishers as string
std::string publisher_list_string;
for (const auto& publisher : publisher_list)
{
publisher_list_string += publisher.first + ":" + std::to_string(publisher.second);
if (&publisher != &publisher_list.back())
publisher_list_string += ", ";
}

log_(logger::LogLevel::DebugVerbose, "Subscriber " + subscriberIdString() + ": Adding session for endpoints {" + publisher_list_string + "}.");
#endif

// Function for getting a free buffer
Expand Down Expand Up @@ -101,8 +110,7 @@ namespace tcp_pubsub
// cannot access it. Thus, we crate the object manually with new.
std::shared_ptr<SubscriberSession> subscriber_session(
new SubscriberSession(std::make_shared<SubscriberSession_Impl>(executor_->executor_impl_->ioService()
, address
, port
, publisher_list
, max_reconnection_attempts
, get_free_buffer_handler
, subscriber_session_closed_handler
Expand Down
Loading

0 comments on commit 4e1dd32

Please sign in to comment.