Skip to content

Commit

Permalink
http/client: Retry request over fresh connection in case old one failed
Browse files Browse the repository at this point in the history
Currently if client::make_request() fails, the error is propagated back
to the caller whatever the error is. Sometimes the request fails because
of connection glitch, which can happen and in that case it's common to
try the same request again after a while in a hope that it was indeed
some teporary glitch.

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>
  • Loading branch information
xemul committed Jun 4, 2024
1 parent 86d6816 commit 26d6516
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 3 deletions.
28 changes: 27 additions & 1 deletion include/seastar/http/client.hh
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ public:
class client {
public:
using reply_handler = noncopyable_function<future<>(const reply&, input_stream<char>&& body)>;
using retry_requests = bool_class<struct retry_requests_tag>;

private:
friend class http::internal::client_ref;
Expand All @@ -178,6 +179,7 @@ private:
unsigned _nr_connections = 0;
unsigned _max_connections;
unsigned long _total_new_connections = 0;
const retry_requests _retry;
condition_variable _wait_con;
connections_list_t _pool;

Expand Down Expand Up @@ -232,14 +234,35 @@ public:
* \param f -- the factory pointer
* \param max_connections -- maximum number of connection a client is allowed to maintain
* (both active and cached in pool)
* \param retry -- whether or not to retry requests on connection IO errors
*
* The client uses connections provided by factory to send requests over and receive responses
* back. Once request-response cycle is over the connection used for that is kept by a client
* in a "pool". Making another http request may then pick up the existing connection from the
* pool thus avoiding the extra latency of establishing new connection. Pool may thus accumulate
* more than one connection if user sends several requests in parallel.
*
* HTTP servers may sometimes want to terminate the connections it keeps. This can happen in
* one of several ways.
*
* The "gentle" way is when server adds the "connection: close" header to its response. In that
* case client would handle the response and will just close the connection without putting it
* to pool.
*
* Less gentle way a server may terminate a connection is by closing it, so the underlying TCP
* stack would communicate regular TCP FIN-s. If the connection happens to be in pool when it
* happens the client would just clean the connection from pool in the background.
*
* Sometimes the least gentle closing occurs when server closes the connection on the fly and
* TCP starts communicating FIN-s in parallel with client using it. In that case, user would
* receive exception from the \ref make_request() call and will have to do something about it.
* Client provides a transparent way of handling it called "retry".
*
* When enabled, it makes client catch the transport error, close the broken connection, open
* another one and retry the very same request one more time over this new connection. If the
* second attempt fails, this error is reported back to user.
*/
explicit client(std::unique_ptr<connection_factory> f, unsigned max_connections = default_max_connections);
explicit client(std::unique_ptr<connection_factory> f, unsigned max_connections = default_max_connections, retry_requests retry = retry_requests::no);

/**
* \brief Send the request and handle the response
Expand All @@ -253,6 +276,9 @@ public:
* \param handle -- the response handler
* \param expected -- the optional expected reply status code, default is std::nullopt
*
* Note that the handle callback should be prepared to be called more than once, because
* client may restart the whole request processing in case server closes the connection
* in the middle of operation
*/
future<> make_request(request req, reply_handler handle, std::optional<reply::status_type> expected = std::nullopt);

Expand Down
23 changes: 21 additions & 2 deletions src/http/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -236,9 +236,10 @@ client::client(socket_address addr, shared_ptr<tls::certificate_credentials> cre
{
}

client::client(std::unique_ptr<connection_factory> f, unsigned max_connections)
client::client(std::unique_ptr<connection_factory> f, unsigned max_connections, retry_requests retry)
: _new_connections(std::move(f))
, _max_connections(max_connections)
, _retry(retry)
{
}

Expand Down Expand Up @@ -330,9 +331,27 @@ auto client::with_new_connection(Fn&& fn) {

future<> client::make_request(request req, reply_handler handle, std::optional<reply::status_type> expected) {
return do_with(std::move(req), std::move(handle), [this, expected] (request& req, reply_handler& handle) mutable {
return with_connection([this, &req, &handle, expected] (connection& con) {
auto f = with_connection([this, &req, &handle, expected] (connection& con) {
return do_make_request(con, req, handle, expected);
});

if (_retry) {
f = f.handle_exception_type([this, &req, &handle, expected] (const std::system_error& ex) {
auto code = ex.code().value();
if ((code != EPIPE) && (code != ECONNABORTED)) {
return make_exception_future<>(ex);
}

// The 'con' connection may not yet be freed, so the total connection
// count still account for it and with_new_connection() may temporarily
// break the limit. That's OK, the 'con' will be closed really soon
return with_new_connection([this, &req, &handle, expected] (connection& con) {
return do_make_request(con, req, handle, expected);
});
});
}

return f;
});
}

Expand Down
43 changes: 43 additions & 0 deletions tests/unit/httpd_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -951,6 +951,49 @@ SEASTAR_TEST_CASE(test_client_response_parse_error) {
});
}

SEASTAR_TEST_CASE(test_client_retry_request) {
return seastar::async([] {
loopback_connection_factory lcf(1);
auto ss = lcf.get_server_socket();
future<> server = ss.accept().then([] (accept_result ar) {
return seastar::async([sk = std::move(ar.connection)] () mutable {
input_stream<char> in = sk.input();
read_simple_http_request(in);
output_stream<char> out = sk.output();
out.write("HTT").get(); // write incomplete response
out.flush().get();
out.close().get();
});
}).then([&ss] {
return ss.accept().then([] (accept_result ar) {
return seastar::async([sk = std::move(ar.connection)] () mutable {
input_stream<char> in = sk.input();
read_simple_http_request(in);
output_stream<char> out = sk.output();
sstring r200("HTTP/1.1 200 OK\r\nHost: localhost\r\n\r\n");
out.write(r200).get(); // now write complete response
out.flush().get();
out.close().get();
});
});
});

future<> client = seastar::async([&lcf] {
auto cln = http::experimental::client(std::make_unique<loopback_http_factory>(lcf), 2, http::experimental::client::retry_requests::yes);
auto req = http::request::make("GET", "test", "/test");
bool got_response = false;
cln.make_request(std::move(req), [&] (const http::reply& rep, input_stream<char>&& in) {
got_response = true;
return make_ready_future<>();
}, http::reply::status_type::ok).get();
cln.close().get();
BOOST_REQUIRE(got_response);
});

when_all(std::move(client), std::move(server)).discard_result().get();
});
}

SEASTAR_TEST_CASE(test_100_continue) {
return seastar::async([] {
loopback_connection_factory lcf(1);
Expand Down

0 comments on commit 26d6516

Please sign in to comment.