Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 27 additions & 16 deletions src/inlet_connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

using namespace lsl;
namespace ip = asio::ip;

inlet_connection::inlet_connection(const stream_info_impl &info, bool recover)
: type_info_(info), host_info_(info), tcp_protocol_(tcp::v4()), udp_protocol_(udp::v4()),
recovery_enabled_(recover), lost_(false), shutdown_(false), last_receive_time_(lsl_clock()),
Expand All @@ -28,19 +29,7 @@ inlet_connection::inlet_connection(const stream_info_impl &info, bool recover)
") uses a newer protocol version than this inlet. Please update.");

// select TCP/UDP protocol versions
if (api_config::get_instance()->allow_ipv6()) {
// if IPv6 is optionally allowed...
if (host_info_.v4address().empty() || !host_info_.v4data_port() ||
!host_info_.v4service_port()) {
// then use it but only iff there are problems with the IPv4 connection data
tcp_protocol_ = tcp::v6();
udp_protocol_ = udp::v6();
} else {
// (otherwise stick to IPv4)
tcp_protocol_ = tcp::v4();
udp_protocol_ = udp::v4();
}
} else {
if (!set_protocols(info, false)) {
// otherwise use the protocol type that is selected in the config
tcp_protocol_ = api_config::get_instance()->allow_ipv4() ? tcp::v4() : tcp::v6();
udp_protocol_ = api_config::get_instance()->allow_ipv4() ? udp::v4() : udp::v6();
Expand All @@ -54,7 +43,6 @@ inlet_connection::inlet_connection(const stream_info_impl &info, bool recover)
host_info_.name().c_str());
recovery_enabled_ = false;
}

} else {
// the actual endpoint is not known yet -- we need to discover it later on the fly
// check that all the necessary information for this has been fully specified
Expand Down Expand Up @@ -192,7 +180,7 @@ void inlet_connection::try_recover() {
resolver_.resolve_oneshot(query.str(), 1, FOREVER, attempt == 0 ? 1.0 : 5.0);
if (!infos.empty()) {
// got a result
unique_lock_t lock(host_info_mut_);
unique_lock_t lock_recover_host_info(host_info_mut_);
// check if any of the returned streams is the one that we're currently
// connected to
for (auto &info : infos)
Expand All @@ -202,12 +190,18 @@ void inlet_connection::try_recover() {
// ensure that the query result is unique (since someone might have used a
// non-unique stream ID)
if (infos.size() == 1) {
// update the protocols from the stream info,
// preferring IPv6 if previously used as well
if (!set_protocols(infos[0], tcp_protocol_ == tcp::v6()))
throw std::logic_error("No suitable protocol found in discovery");
// update the endpoint
host_info_ = infos[0];
// cancel all cancellable operations registered with this connection
cancel_all_registered();
// invoke any callbacks associated with a connection recovery
std::lock_guard<std::mutex> lock(onrecover_mut_);
std::lock_guard<std::mutex> lock_onrecover_mut(onrecover_mut_);
// unlock recover mutex because onrecover callbacks may acquire the lock themselves
lock_recover_host_info.unlock();
for (auto &pair : onrecover_) (pair.second)();
} else {
// there are multiple possible streams to connect to in a recovery attempt:
Expand All @@ -234,6 +228,23 @@ void inlet_connection::try_recover() {
}
}

bool inlet_connection::set_protocols(const stream_info_impl &info, bool prefer_v6) {
bool has_v4 = !info.v4address().empty() && info.v4data_port() && info.v4service_port();
bool has_v6 = !info.v6address().empty() && info.v6data_port() && info.v6service_port();
bool can_v4 = api_config::get_instance()->allow_ipv4() && has_v4;
bool can_v6 = api_config::get_instance()->allow_ipv6() && has_v6;
if ((prefer_v6 && can_v6) || !can_v4) {
tcp_protocol_ = tcp::v6();
udp_protocol_ = udp::v6();
return true;
} else if (can_v4) {
tcp_protocol_ = tcp::v4();
udp_protocol_ = udp::v4();
return true;
}
return false;
}

void inlet_connection::watchdog_thread() {
loguru::set_thread_name((std::string("W_") += type_info().name().substr(0, 12)).c_str());
while (!lost_ && !shutdown_) {
Expand Down
3 changes: 3 additions & 0 deletions src/inlet_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,9 @@ class inlet_connection : public cancellable_registry {
/// A (potentially speculative) resolve-and-recover operation.
void try_recover();

/// Sets the endpoints from a stream info considering a previous connection
bool set_protocols(const stream_info_impl &info, bool prefer_v6);

// core connection properties
/// static/read-only information of the stream (type & format)
const stream_info_impl type_info_;
Expand Down
17 changes: 12 additions & 5 deletions src/time_receiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,18 @@ time_receiver::time_receiver(inlet_connection &conn)
: conn_(conn), was_reset_(false), timeoffset_(std::numeric_limits<double>::max()),
remote_time_(std::numeric_limits<double>::max()),
uncertainty_(std::numeric_limits<double>::max()), cfg_(api_config::get_instance()),
time_sock_(time_io_), next_estimate_(time_io_), aggregate_results_(time_io_),
next_packet_(time_io_) {
time_sock_(time_io_), outlet_addr_(conn_.get_udp_endpoint()), next_estimate_(time_io_),
aggregate_results_(time_io_), next_packet_(time_io_) {
conn_.register_onlost(this, &timeoffset_upd_);
conn_.register_onrecover(this, [this]() { reset_timeoffset_on_recovery(); });
time_sock_.open(conn_.udp_protocol());
conn_.register_onrecover(this, [this]() {
reset_timeoffset_on_recovery();
outlet_addr_ = conn_.get_udp_endpoint();
DLOG_F(INFO, "Set new time service address: %s", outlet_addr_.address().to_string().c_str());
// handle outlet switching between IPv4 and IPv6
time_sock_.close();
time_sock_.open(outlet_addr_.protocol());
});
time_sock_.open(outlet_addr_.protocol());
}

time_receiver::~time_receiver() {
Expand Down Expand Up @@ -128,7 +135,7 @@ void time_receiver::send_next_packet(int packet_num) {
request.precision(16);
request << "LSL:timedata\r\n" << current_wave_id_ << " " << lsl_clock() << "\r\n";
auto msg_buffer = std::make_shared<std::string>(request.str());
time_sock_.async_send_to(asio::buffer(*msg_buffer), conn_.get_udp_endpoint(),
time_sock_.async_send_to(asio::buffer(*msg_buffer), outlet_addr_,
[msg_buffer](err_t /*unused*/, std::size_t /*unused*/) {
/* Do nothing, but keep the msg_buffer alive until async_send is completed */
});
Expand Down
2 changes: 2 additions & 0 deletions src/time_receiver.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ class time_receiver {
char recv_buffer_[1024]{0};
/// the socket through which the time thread communicates
udp_socket time_sock_;
/// current outlet address
udp::endpoint outlet_addr_;
/// schedule the next time estimate
steady_timer next_estimate_;
/// schedules result aggregation
Expand Down
1 change: 1 addition & 0 deletions src/util/cast.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "cast.hpp"
#include <cstdint>
#include <iomanip>
#include <locale>
#include <sstream>
Expand Down