Skip to content

Commit

Permalink
Close HTTP connections that prematurely reset streams
Browse files Browse the repository at this point in the history
Signed-off-by: Yan Avlasov <yavlasov@google.com>

Signed-off-by: Ryan Northey <ryan@synca.io>
  • Loading branch information
yanavlasov authored and phlax committed Oct 10, 2023
1 parent 8399fb4 commit 230331f
Show file tree
Hide file tree
Showing 6 changed files with 171 additions and 0 deletions.
9 changes: 9 additions & 0 deletions changelogs/current.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,15 @@ bug_fixes:
- area: redis
change: |
Fixed a bug where redis key with ``%`` in the key is failing with a validation error.
- area: http
change: |
Close HTTP/2 and HTTP/3 connections that prematurely reset streams. The runtime key
``overload.premature_reset_min_stream_lifetime_seconds`` determines the interval where received stream
reset is considered premature (with 1 second default). The runtime key ``overload.premature_reset_total_stream_count``,
with the default value of 500, determines the number of requests received from a connection before the check for premature
resets is applied. The connection is disconnected if more than 50% of resets are premature.
Setting the runtime key ``envoy.restart_features.send_goaway_for_premature_rst_streams`` to ``false`` completely disables
this check.
removed_config_or_runtime:
# *Normally occurs at the end of the* :ref:`deprecation period <deprecated>`
Expand Down
1 change: 1 addition & 0 deletions source/common/http/conn_manager_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ namespace Http {
COUNTER(downstream_rq_rejected_via_ip_detection) \
COUNTER(downstream_rq_response_before_rq_complete) \
COUNTER(downstream_rq_rx_reset) \
COUNTER(downstream_rq_too_many_premature_resets) \
COUNTER(downstream_rq_timeout) \
COUNTER(downstream_rq_header_timeout) \
COUNTER(downstream_rq_too_large) \
Expand Down
65 changes: 65 additions & 0 deletions source/common/http/conn_manager_impl.cc
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "source/common/http/conn_manager_impl.h"

#include <chrono>
#include <cstdint>
#include <functional>
#include <list>
Expand Down Expand Up @@ -55,6 +56,11 @@
namespace Envoy {
namespace Http {

const absl::string_view ConnectionManagerImpl::PrematureResetTotalStreamCountKey =
"overload.premature_reset_total_stream_count";
const absl::string_view ConnectionManagerImpl::PrematureResetMinStreamLifetimeSecondsKey =
"overload.premature_reset_min_stream_lifetime_seconds";

bool requestWasConnect(const RequestHeaderMapSharedPtr& headers, Protocol protocol) {
if (!headers) {
return false;
Expand Down Expand Up @@ -273,6 +279,12 @@ void ConnectionManagerImpl::doEndStream(ActiveStream& stream, bool check_for_def
}

void ConnectionManagerImpl::doDeferredStreamDestroy(ActiveStream& stream) {
if (!stream.state_.is_internally_destroyed_) {
++closed_non_internally_destroyed_requests_;
if (isPrematureRstStream(stream)) {
++number_premature_stream_resets_;
}
}
if (stream.max_stream_duration_timer_ != nullptr) {
stream.max_stream_duration_timer_->disableTimer();
stream.max_stream_duration_timer_ = nullptr;
Expand Down Expand Up @@ -349,6 +361,7 @@ void ConnectionManagerImpl::doDeferredStreamDestroy(ActiveStream& stream) {
if (connection_idle_timer_ && streams_.empty()) {
connection_idle_timer_->enableTimer(config_.idleTimeout().value());
}
maybeDrainDueToPrematureResets();
}

RequestDecoderHandlePtr ConnectionManagerImpl::newStreamHandle(ResponseEncoder& response_encoder,
Expand Down Expand Up @@ -619,6 +632,58 @@ void ConnectionManagerImpl::doConnectionClose(
}
}

bool ConnectionManagerImpl::isPrematureRstStream(const ActiveStream& stream) const {
// Check if the request was prematurely reset, by comparing its lifetime to the configured
// threshold.
ASSERT(!stream.state_.is_internally_destroyed_);
absl::optional<std::chrono::nanoseconds> duration =
stream.filter_manager_.streamInfo().currentDuration();

// Check if request lifetime is longer than the premature reset threshold.
if (duration) {
const uint64_t lifetime = std::chrono::duration_cast<std::chrono::seconds>(*duration).count();
const uint64_t min_lifetime = runtime_.snapshot().getInteger(
ConnectionManagerImpl::PrematureResetMinStreamLifetimeSecondsKey, 1);
if (lifetime > min_lifetime) {
return false;
}
}

// If request has completed before configured threshold, also check if the Envoy proxied the
// response from the upstream. Requests without the response status were reset.
// TODO(RyanTheOptimist): Possibly support half_closed_local instead.
return !stream.filter_manager_.streamInfo().responseCode();
}

// Sends a GOAWAY if too many streams have been reset prematurely on this
// connection.
void ConnectionManagerImpl::maybeDrainDueToPrematureResets() {
if (!Runtime::runtimeFeatureEnabled(
"envoy.restart_features.send_goaway_for_premature_rst_streams") ||
closed_non_internally_destroyed_requests_ == 0) {
return;
}

const uint64_t limit =
runtime_.snapshot().getInteger(ConnectionManagerImpl::PrematureResetTotalStreamCountKey, 500);

if (closed_non_internally_destroyed_requests_ < limit) {
return;
}

if (static_cast<double>(number_premature_stream_resets_) /
closed_non_internally_destroyed_requests_ <
.5) {
return;
}

if (drain_state_ == DrainState::NotDraining) {
stats_.named_.downstream_rq_too_many_premature_resets_.inc();
doConnectionClose(Network::ConnectionCloseType::Abort, absl::nullopt,
"too_many_premature_resets");
}
}

void ConnectionManagerImpl::onGoAway(GoAwayErrorCode) {
// Currently we do nothing with remote go away frames. In the future we can decide to no longer
// push resources if applicable.
Expand Down
23 changes: 23 additions & 0 deletions source/common/http/conn_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,14 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,
void setClearHopByHopResponseHeaders(bool value) { clear_hop_by_hop_response_headers_ = value; }
bool clearHopByHopResponseHeaders() const { return clear_hop_by_hop_response_headers_; }

// This runtime key configures the number of streams which must be closed on a connection before
// envoy will potentially drain a connection due to excessive prematurely reset streams.
static const absl::string_view PrematureResetTotalStreamCountKey;

// The minimum lifetime of a stream, in seconds, in order not to be considered
// prematurely closed.
static const absl::string_view PrematureResetMinStreamLifetimeSecondsKey;

private:
struct ActiveStream;
class MobileConnectionManagerImpl;
Expand Down Expand Up @@ -570,6 +578,15 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,
void doConnectionClose(absl::optional<Network::ConnectionCloseType> close_type,
absl::optional<StreamInfo::ResponseFlag> response_flag,
absl::string_view details);
// Returns true if a RST_STREAM for the given stream is premature. Premature
// means the RST_STREAM arrived before response headers were sent and than
// the stream was alive for short period of time. This period is specified
// by the optional runtime value PrematureResetMinStreamLifetimeSecondsKey,
// or one second if that is not present.
bool isPrematureRstStream(const ActiveStream& stream) const;
// Sends a GOAWAY if both sufficient streams have been closed on a connection
// and at least half have been prematurely reset?
void maybeDrainDueToPrematureResets();

enum class DrainState { NotDraining, Draining, Closing };

Expand Down Expand Up @@ -610,6 +627,12 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,
bool clear_hop_by_hop_response_headers_{true};
// The number of requests accumulated on the current connection.
uint64_t accumulated_requests_{};
// The number of requests closed on the current connection which were
// not internally destroyed
uint64_t closed_non_internally_destroyed_requests_{};
// The number of requests that received a premature RST_STREAM, according to
// the definition given in `isPrematureRstStream()`.
uint64_t number_premature_stream_resets_{0};
const std::string proxy_name_; // for Proxy-Status.

const bool refresh_rtt_after_request_{};
Expand Down
1 change: 1 addition & 0 deletions source/common/runtime/runtime_features.cc
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ RUNTIME_GUARD(envoy_reloadable_features_validate_connect);
RUNTIME_GUARD(envoy_reloadable_features_validate_detailed_override_host_statuses);
RUNTIME_GUARD(envoy_reloadable_features_validate_grpc_header_before_log_grpc_status);
RUNTIME_GUARD(envoy_reloadable_features_validate_upstream_headers);
RUNTIME_GUARD(envoy_restart_features_send_goaway_for_premature_rst_streams);
RUNTIME_GUARD(envoy_restart_features_udp_read_normalize_addresses);

// Begin false flags. These should come with a TODO to flip true.
Expand Down
72 changes: 72 additions & 0 deletions test/integration/multiplexed_integration_test.cc
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include <algorithm>
#include <chrono>
#include <memory>
#include <string>

Expand All @@ -25,6 +26,7 @@
#include "test/mocks/http/mocks.h"
#include "test/test_common/network_utility.h"
#include "test/test_common/printers.h"
#include "test/test_common/simulated_time_system.h"
#include "test/test_common/utility.h"

#include "gtest/gtest.h"
Expand Down Expand Up @@ -92,6 +94,15 @@ INSTANTIATE_TEST_SUITE_P(IpVersions, MultiplexedIntegrationTest,
{Http::CodecType::HTTP1})),
HttpProtocolIntegrationTest::protocolTestParamsToString);

class MultiplexedIntegrationTestWithSimulatedTime : public Event::TestUsingSimulatedTime,
public MultiplexedIntegrationTest {};

INSTANTIATE_TEST_SUITE_P(IpVersions, MultiplexedIntegrationTestWithSimulatedTime,
testing::ValuesIn(HttpProtocolIntegrationTest::getProtocolTestParams(
{Http::CodecType::HTTP2, Http::CodecType::HTTP3},
{Http::CodecType::HTTP1})),
HttpProtocolIntegrationTest::protocolTestParamsToString);

TEST_P(MultiplexedIntegrationTest, RouterRequestAndResponseWithBodyNoBuffer) {
testRouterRequestAndResponseWithBody(1024, 512, false, false);
}
Expand Down Expand Up @@ -1076,6 +1087,67 @@ TEST_P(MultiplexedIntegrationTest, GoAway) {
EXPECT_EQ("200", response->headers().getStatusValue());
}

// TODO(rch): Add a unit test which covers internal redirect handling.
TEST_P(MultiplexedIntegrationTestWithSimulatedTime, GoAwayAfterTooManyResets) {
EXCLUDE_DOWNSTREAM_HTTP3; // Need to wait for the server to reset the stream
// before opening new one.
config_helper_.addRuntimeOverride("envoy.restart_features.send_goaway_for_premature_rst_streams",
"true");
const int total_streams = 100;
config_helper_.addRuntimeOverride("overload.premature_reset_total_stream_count",
absl::StrCat(total_streams));
initialize();

Http::TestRequestHeaderMapImpl headers{
{":method", "GET"}, {":path", "/healthcheck"}, {":scheme", "http"}, {":authority", "host"}};
codec_client_ = makeHttpConnection(lookupPort("http"));
for (int i = 0; i < total_streams; ++i) {
auto encoder_decoder = codec_client_->startRequest(headers);
request_encoder_ = &encoder_decoder.first;
auto response = std::move(encoder_decoder.second);
codec_client_->sendReset(*request_encoder_);
ASSERT_TRUE(response->waitForReset());
}

// Envoy should disconnect client due to premature reset check
ASSERT_TRUE(codec_client_->waitForDisconnect());
test_server_->waitForCounterEq("http.config_test.downstream_rq_rx_reset", total_streams);
test_server_->waitForCounterEq("http.config_test.downstream_rq_too_many_premature_resets", 1);
}

TEST_P(MultiplexedIntegrationTestWithSimulatedTime, DontGoAwayAfterTooManyResetsForLongStreams) {
EXCLUDE_DOWNSTREAM_HTTP3; // Need to wait for the server to reset the stream
// before opening new one.
config_helper_.addRuntimeOverride("envoy.restart_features.send_goaway_for_premature_rst_streams",
"true");
const int total_streams = 100;
const int stream_lifetime_seconds = 2;
config_helper_.addRuntimeOverride("overload.premature_reset_total_stream_count",
absl::StrCat(total_streams));

config_helper_.addRuntimeOverride("overload.premature_reset_min_stream_lifetime_seconds",
absl::StrCat(stream_lifetime_seconds));

initialize();

Http::TestRequestHeaderMapImpl headers{
{":method", "GET"}, {":path", "/healthcheck"}, {":scheme", "http"}, {":authority", "host"}};
codec_client_ = makeHttpConnection(lookupPort("http"));

std::string request_counter = "http.config_test.downstream_rq_total";
std::string reset_counter = "http.config_test.downstream_rq_rx_reset";
for (int i = 0; i < total_streams * 2; ++i) {
auto encoder_decoder = codec_client_->startRequest(headers);
request_encoder_ = &encoder_decoder.first;
auto response = std::move(encoder_decoder.second);
test_server_->waitForCounterEq(request_counter, i + 1);
timeSystem().advanceTimeWait(std::chrono::seconds(2 * stream_lifetime_seconds));
codec_client_->sendReset(*request_encoder_);
ASSERT_TRUE(response->waitForReset());
test_server_->waitForCounterEq(reset_counter, i + 1);
}
}

TEST_P(MultiplexedIntegrationTest, Trailers) { testTrailers(1024, 2048, false, false); }

TEST_P(MultiplexedIntegrationTest, TrailersGiantBody) {
Expand Down

0 comments on commit 230331f

Please sign in to comment.