diff --git a/include/faabric/transport/tcp/Socket.h b/include/faabric/transport/tcp/Socket.h index f32cde763..746b74cd5 100644 --- a/include/faabric/transport/tcp/Socket.h +++ b/include/faabric/transport/tcp/Socket.h @@ -4,6 +4,9 @@ #include namespace faabric::transport::tcp { + +const int SocketTimeoutMs = 3000; + class Socket { public: diff --git a/include/faabric/transport/tcp/SocketOptions.h b/include/faabric/transport/tcp/SocketOptions.h index 6c3dc5a0c..e49a404fc 100644 --- a/include/faabric/transport/tcp/SocketOptions.h +++ b/include/faabric/transport/tcp/SocketOptions.h @@ -12,4 +12,7 @@ bool isNonBlocking(int connFd); // Enable busy polling for non-blocking sockets void setBusyPolling(int connFd); + +// Set timeout for blocking sockets +void setTimeoutMs(int connFd, int timeoutMs); } diff --git a/src/transport/tcp/RecvSocket.cpp b/src/transport/tcp/RecvSocket.cpp index c397e171c..d87d8c66f 100644 --- a/src/transport/tcp/RecvSocket.cpp +++ b/src/transport/tcp/RecvSocket.cpp @@ -105,9 +105,13 @@ void RecvSocket::setSocketOptions(int connFd) // TODO: not clear if this helps or not // setBusyPolling(connFd); #else + // Set the socket as blocking if (isNonBlocking(connFd)) { setBlocking(connFd); } + + // Set the timeout + setTimeoutMs(connFd, SocketTimeoutMs); #endif } diff --git a/src/transport/tcp/SocketOptions.cpp b/src/transport/tcp/SocketOptions.cpp index 39390e635..fb7043b87 100644 --- a/src/transport/tcp/SocketOptions.cpp +++ b/src/transport/tcp/SocketOptions.cpp @@ -103,4 +103,20 @@ void setBusyPolling(int connFd) throw std::runtime_error("Error setting kernel busy poll"); } } + +void setTimeoutMs(int connFd, int timeoutMs) +{ + struct timeval timeVal; + timeVal.tv_sec = timeoutMs / 1000; + timeVal.tv_usec = 0; + + int ret = ::setsockopt( + connFd, SOL_SOCKET, SO_RCVTIMEO, (const char*)&timeVal, sizeof(timeVal)); + if (ret == -1) { + SPDLOG_ERROR("Error setting recv timeout for socket {}: {}", + connFd, + std::strerror(errno)); + throw std::runtime_error("Error setting recv timeout"); + } +} } diff --git a/tests/test/transport/test_tcp_sockets.cpp b/tests/test/transport/test_tcp_sockets.cpp index b4d01e8b1..e0ca61f5f 100644 --- a/tests/test/transport/test_tcp_sockets.cpp +++ b/tests/test/transport/test_tcp_sockets.cpp @@ -68,6 +68,7 @@ TEST_CASE("Test setting socket options", "[transport]") setBusyPolling(conn); setNonBlocking(conn); setBlocking(conn); + setTimeoutMs(conn, SocketTimeoutMs); REQUIRE(!isNonBlocking(conn)); @@ -87,6 +88,7 @@ TEST_CASE("Test setting socket options", "[transport]") REQUIRE_THROWS(setBusyPolling(conn)); REQUIRE_THROWS(setNonBlocking(conn)); REQUIRE_THROWS(setBlocking(conn)); + REQUIRE_THROWS(setTimeoutMs(conn, SocketTimeoutMs)); } TEST_CASE("Test send/recv one message using raw TCP sockets", "[transport]")