Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make http client restart requests #1883

Closed
wants to merge 9 commits into from
49 changes: 46 additions & 3 deletions include/seastar/http/client.hh
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public:
future<> close();

private:
future<reply_ptr> do_make_request(request rq);
future<reply_ptr> do_make_request(request& rq);
void setup_request(request& rq);
future<> send_request_head(const request& rq);
future<reply_ptr> maybe_wait_for_continue(const request& req);
Expand Down Expand Up @@ -166,6 +166,11 @@ public:
*/

class client {
public:
using reply_handler = noncopyable_function<future<>(const reply&, input_stream<char>&& body)>;
using retry_requests = bool_class<struct retry_requests_tag>;

private:
friend class http::internal::client_ref;
using connections_list_t = bi::list<connection, bi::member_hook<connection, typename connection::hook_t, &connection::_hook>, bi::constant_time_size<false>>;
static constexpr unsigned default_max_connections = 100;
Expand All @@ -174,20 +179,27 @@ class client {
unsigned _nr_connections = 0;
unsigned _max_connections;
unsigned long _total_new_connections = 0;
const retry_requests _retry;
condition_variable _wait_con;
connections_list_t _pool;

using connection_ptr = seastar::shared_ptr<connection>;

future<connection_ptr> get_connection();
future<connection_ptr> make_connection();
future<> put_connection(connection_ptr con);
future<> shrink_connections();

template <std::invocable<connection&> Fn>
auto with_connection(Fn&& fn);

template <typename Fn>
requires std::invocable<Fn, connection&>
auto with_new_connection(Fn&& fn);

future<> do_make_request(connection& con, request& req, reply_handler& handle, std::optional<reply::status_type> expected);

public:
using reply_handler = noncopyable_function<future<>(const reply&, input_stream<char>&& body)>;
/**
* \brief Construct a simple client
*
Expand Down Expand Up @@ -220,9 +232,37 @@ public:
* may re-use the sockets on its own
*
* \param f -- the factory pointer
* \param max_connections -- maximum number of connection a client is allowed to maintain
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand why we wanted such a feature... What happens if the user starts another request? It waits until one of the older request finishes? Why?

Anyway, you're just documenting a feature we already have, so ok.

I can understand a maximum number of connections cached in the pool, that's useful. But why do we want to artificially limit the total number of connections?

Although I guess you're just documenting this now,

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if the user starts another request? It waits until one of the older request finishes? Why?

If max_connections is already serving other requests (and pool is empty) then yes. Because this knob limits the number of sockets opened simultaneously. Limiting only the number of connections in pool is also useful, but I think concurrency control is also required for active connections (even though they are supposedly short-living)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And yes, it's the pre-existing feature from #1652 (you approved it back then 😉 )

* (both active and cached in pool)
* \param retry -- whether or not to retry requests on connection IO errors
*
* The client uses connections provided by factory to send requests over and receive responses
* back. Once request-response cycle is over the connection used for that is kept by a client
* in a "pool". Making another http request may then pick up the existing connection from the
* pool thus avoiding the extra latency of establishing new connection. Pool may thus accumulate
* more than one connection if user sends several requests in parallel.
*
* HTTP servers may sometimes want to terminate the connections it keeps. This can happen in
* one of several ways.
*
* The "gentle" way is when server adds the "connection: close" header to its response. In that
* case client would handle the response and will just close the connection without putting it
* to pool.
*
* Less gentle way a server may terminate a connection is by closing it, so the underlying TCP
* stack would communicate regular TCP FIN-s. If the connection happens to be in pool when it
* happens the client would just clean the connection from pool in the background.
*
* Sometimes the least gentle closing occurs when server closes the connection on the fly and
* TCP starts communicating FIN-s in parallel with client using it. In that case, user would
* receive exception from the \ref make_request() call and will have to do something about it.
* Client provides a transparent way of handling it called "retry".
*
* When enabled, it makes client catch the transport error, close the broken connection, open
* another one and retry the very same request one more time over this new connection. If the
* second attempt fails, this error is reported back to user.
*/
explicit client(std::unique_ptr<connection_factory> f, unsigned max_connections = default_max_connections);
explicit client(std::unique_ptr<connection_factory> f, unsigned max_connections = default_max_connections, retry_requests retry = retry_requests::no);
xemul marked this conversation as resolved.
Show resolved Hide resolved

/**
* \brief Send the request and handle the response
Expand All @@ -236,6 +276,9 @@ public:
* \param handle -- the response handler
* \param expected -- the optional expected reply status code, default is std::nullopt
*
* Note that the handle callback should be prepared to be called more than once, because
* client may restart the whole request processing in case server closes the connection
* in the middle of operation
*/
future<> make_request(request req, reply_handler handle, std::optional<reply::status_type> expected = std::nullopt);

Expand Down
108 changes: 76 additions & 32 deletions src/http/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,12 @@ future<connection::reply_ptr> connection::recv_reply() {
parser.init();
return _read_buf.consume(parser).then([this, &parser] {
if (parser.eof()) {
throw std::runtime_error("Invalid response");
http_log.trace("Parsing response EOFed");
throw std::system_error(ECONNABORTED, std::system_category());
}
if (parser.failed()) {
http_log.trace("Parsing response failed");
throw std::runtime_error("Invalid http server response");
xemul marked this conversation as resolved.
Show resolved Hide resolved
}

auto resp = parser.get_parsed_response();
Expand All @@ -151,28 +156,28 @@ future<connection::reply_ptr> connection::recv_reply() {
});
}

future<connection::reply_ptr> connection::do_make_request(request req) {
return do_with(std::move(req), [this] (auto& req) {
setup_request(req);
return send_request_head(req).then([this, &req] {
return maybe_wait_for_continue(req).then([this, &req] (reply_ptr cont) {
if (cont) {
return make_ready_future<reply_ptr>(std::move(cont));
}
future<connection::reply_ptr> connection::do_make_request(request& req) {
setup_request(req);
return send_request_head(req).then([this, &req] {
return maybe_wait_for_continue(req).then([this, &req] (reply_ptr cont) {
if (cont) {
return make_ready_future<reply_ptr>(std::move(cont));
}

return write_body(req).then([this] {
return _write_buf.flush().then([this] {
return recv_reply();
});
return write_body(req).then([this] {
return _write_buf.flush().then([this] {
return recv_reply();
});
});
});
});
}

future<reply> connection::make_request(request req) {
return do_make_request(std::move(req)).then([] (reply_ptr rep) {
return make_ready_future<reply>(std::move(*rep));
return do_with(std::move(req), [this] (auto& req) {
return do_make_request(req).then([] (reply_ptr rep) {
return make_ready_future<reply>(std::move(*rep));
});
});
}

Expand Down Expand Up @@ -231,9 +236,10 @@ client::client(socket_address addr, shared_ptr<tls::certificate_credentials> cre
{
}

client::client(std::unique_ptr<connection_factory> f, unsigned max_connections)
client::client(std::unique_ptr<connection_factory> f, unsigned max_connections, retry_requests retry)
: _new_connections(std::move(f))
, _max_connections(max_connections)
, _retry(retry)
{
}

Expand All @@ -251,6 +257,10 @@ future<client::connection_ptr> client::get_connection() {
});
}

return make_connection();
}

future<client::connection_ptr> client::make_connection() {
_total_new_connections++;
return _new_connections->make().then([cr = internal::client_ref(this)] (connected_socket cs) mutable {
http_log.trace("created new http connection {}", cs.local_address());
Expand Down Expand Up @@ -309,28 +319,62 @@ auto client::with_connection(Fn&& fn) {
});
}

template <typename Fn>
requires std::invocable<Fn, connection&>
auto client::with_new_connection(Fn&& fn) {
return make_connection().then([this, fn = std::move(fn)] (connection_ptr con) mutable {
return fn(*con).finally([this, con = std::move(con)] () mutable {
return put_connection(std::move(con));
});
});
}

future<> client::make_request(request req, reply_handler handle, std::optional<reply::status_type> expected) {
return with_connection([req = std::move(req), handle = std::move(handle), expected] (connection& con) mutable {
return con.do_make_request(std::move(req)).then([&con, expected, handle = std::move(handle)] (connection::reply_ptr reply) mutable {
auto& rep = *reply;
if (expected.has_value() && rep._status != expected.value()) {
if (!http_log.is_enabled(log_level::debug)) {
return make_exception_future<>(httpd::unexpected_status_error(rep._status));
return do_with(std::move(req), std::move(handle), [this, expected] (request& req, reply_handler& handle) mutable {
auto f = with_connection([this, &req, &handle, expected] (connection& con) {
return do_make_request(con, req, handle, expected);
});

if (_retry) {
f = f.handle_exception_type([this, &req, &handle, expected] (const std::system_error& ex) {
auto code = ex.code().value();
if ((code != EPIPE) && (code != ECONNABORTED)) {
return make_exception_future<>(ex);
}

return do_with(con.in(rep), [reply = std::move(reply)] (auto& in) mutable {
return util::read_entire_stream_contiguous(in).then([reply = std::move(reply)] (auto message) {
http_log.debug("request finished with {}: {}", reply->_status, message);
return make_exception_future<>(httpd::unexpected_status_error(reply->_status));
});
// The 'con' connection may not yet be freed, so the total connection
// count still account for it and with_new_connection() may temporarily
// break the limit. That's OK, the 'con' will be closed really soon
return with_new_connection([this, &req, &handle, expected] (connection& con) {
return do_make_request(con, req, handle, expected);
});
});
}

return f;
});
}

future<> client::do_make_request(connection& con, request& req, reply_handler& handle, std::optional<reply::status_type> expected) {
return con.do_make_request(req).then([&con, &handle, expected] (connection::reply_ptr reply) mutable {
auto& rep = *reply;
if (expected.has_value() && rep._status != expected.value()) {
if (!http_log.is_enabled(log_level::debug)) {
return make_exception_future<>(httpd::unexpected_status_error(rep._status));
}

return handle(rep, con.in(rep)).finally([reply = std::move(reply)] {});
}).handle_exception([&con] (auto ex) mutable {
con._persistent = false;
return make_exception_future<>(std::move(ex));
});
return do_with(con.in(rep), [reply = std::move(reply)] (auto& in) mutable {
return util::read_entire_stream_contiguous(in).then([reply = std::move(reply)] (auto message) {
http_log.debug("request finished with {}: {}", reply->_status, message);
return make_exception_future<>(httpd::unexpected_status_error(reply->_status));
});
});
}

return handle(rep, con.in(rep)).finally([reply = std::move(reply)] {});
}).handle_exception([&con] (auto ex) mutable {
con._persistent = false;
return make_exception_future<>(std::move(ex));
});
}

Expand Down
Loading