Skip to content

Commit

Permalink
Remove expand agnostic stream lifetime runtime guard.
Browse files Browse the repository at this point in the history
Signed-off-by: Kevin Baichoo <kbaichoo@google.com>
  • Loading branch information
KBaichoo committed Oct 24, 2023
1 parent 6750243 commit c390443
Show file tree
Hide file tree
Showing 9 changed files with 28 additions and 160 deletions.
3 changes: 3 additions & 0 deletions changelogs/current.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ bug_fixes:
removed_config_or_runtime:
# *Normally occurs at the end of the* :ref:`deprecation period <deprecated>`

- area: http
change: |
Removed ``envoy.reloadable_features.expand_agnostic_stream_lifetime`` and legacy code paths.
- area: http
change: |
removed ``envoy.reloadable_features.correctly_validate_alpn`` and legacy code paths.
Expand Down
28 changes: 11 additions & 17 deletions source/common/http/conn_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -314,19 +314,17 @@ void ConnectionManagerImpl::doDeferredStreamDestroy(ActiveStream& stream) {
stream.access_log_flush_timer_ = nullptr;
}

if (stream.expand_agnostic_stream_lifetime_) {
// Only destroy the active stream if the underlying codec has notified us of
// completion or we've internal redirect the stream.
if (!stream.canDestroyStream()) {
// Track that this stream is not expecting any additional calls apart from
// codec notification.
stream.state_.is_zombie_stream_ = true;
return;
}
// Only destroy the active stream if the underlying codec has notified us of
// completion or we've internal redirect the stream.
if (!stream.canDestroyStream()) {
// Track that this stream is not expecting any additional calls apart from
// codec notification.
stream.state_.is_zombie_stream_ = true;
return;
}

if (stream.response_encoder_ != nullptr) {
stream.response_encoder_->getStream().registerCodecEventCallbacks(nullptr);
}
if (stream.response_encoder_ != nullptr) {
stream.response_encoder_->getStream().registerCodecEventCallbacks(nullptr);
}

stream.completeRequest();
Expand Down Expand Up @@ -421,9 +419,7 @@ RequestDecoder& ConnectionManagerImpl::newStream(ResponseEncoder& response_encod
new_stream->state_.is_internally_created_ = is_internally_created;
new_stream->response_encoder_ = &response_encoder;
new_stream->response_encoder_->getStream().addCallbacks(*new_stream);
if (new_stream->expand_agnostic_stream_lifetime_) {
new_stream->response_encoder_->getStream().registerCodecEventCallbacks(new_stream.get());
}
new_stream->response_encoder_->getStream().registerCodecEventCallbacks(new_stream.get());
new_stream->response_encoder_->getStream().setFlushTimeout(new_stream->idle_timeout_ms_);
new_stream->streamInfo().setDownstreamBytesMeter(response_encoder.getStream().bytesMeter());
// If the network connection is backed up, the stream should be made aware of it on creation.
Expand Down Expand Up @@ -845,8 +841,6 @@ ConnectionManagerImpl::ActiveStream::ActiveStream(ConnectionManagerImpl& connect
StreamInfo::FilterState::LifeSpan::Connection),
request_response_timespan_(new Stats::HistogramCompletableTimespanImpl(
connection_manager_.stats_.named_.downstream_rq_time_, connection_manager_.timeSource())),
expand_agnostic_stream_lifetime_(
Runtime::runtimeFeatureEnabled(Runtime::expand_agnostic_stream_lifetime)),
header_validator_(
connection_manager.config_.makeHeaderValidator(connection_manager.codec_->protocol())) {
ASSERT(!connection_manager.config_.isRoutable() ||
Expand Down
2 changes: 0 additions & 2 deletions source/common/http/conn_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -472,8 +472,6 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,
std::chrono::milliseconds idle_timeout_ms_{};
State state_;

const bool expand_agnostic_stream_lifetime_;

// Snapshot of the route configuration at the time of request is started. This is used to ensure
// that the same route configuration is used throughout the lifetime of the request. This
// snapshot will be cleared when the cached route is blocked. Because after that we will not
Expand Down
1 change: 0 additions & 1 deletion source/common/runtime/runtime_features.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ RUNTIME_GUARD(envoy_reloadable_features_enable_compression_bomb_protection);
RUNTIME_GUARD(envoy_reloadable_features_enable_connect_udp_support);
RUNTIME_GUARD(envoy_reloadable_features_enable_intermediate_ca);
RUNTIME_GUARD(envoy_reloadable_features_enable_zone_routing_different_zone_counts);
RUNTIME_GUARD(envoy_reloadable_features_expand_agnostic_stream_lifetime);
RUNTIME_GUARD(envoy_reloadable_features_ext_authz_http_send_original_xff);
RUNTIME_GUARD(envoy_reloadable_features_format_ports_as_numbers);
RUNTIME_GUARD(envoy_reloadable_features_handle_uppercase_scheme);
Expand Down
2 changes: 0 additions & 2 deletions source/common/runtime/runtime_features.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ void maybeSetRuntimeGuard(absl::string_view name, bool value);
void maybeSetDeprecatedInts(absl::string_view name, uint32_t value);
constexpr absl::string_view defer_processing_backedup_streams =
"envoy.reloadable_features.defer_processing_backedup_streams";
constexpr absl::string_view expand_agnostic_stream_lifetime =
"envoy.reloadable_features.expand_agnostic_stream_lifetime";

} // namespace Runtime
} // namespace Envoy
99 changes: 0 additions & 99 deletions test/integration/buffer_accounting_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -878,105 +878,6 @@ TEST_P(Http2OverloadManagerIntegrationTest,
EXPECT_EQ(smallest_response->headers().getStatusValue(), "200");
}

TEST_P(Http2OverloadManagerIntegrationTest, CanResetStreamIfEnvoyLevelStreamEnded) {
// This test is not applicable if expand_agnostic_stream_lifetime is enabled
// as the gap between lifetimes of the codec level and envoy level stream
// shrinks.
if (Runtime::runtimeFeatureEnabled(Runtime::expand_agnostic_stream_lifetime)) {
return;
}

useAccessLog("%RESPONSE_CODE%");
initializeOverloadManagerInBootstrap(
TestUtility::parseYaml<envoy::config::overload::v3::OverloadAction>(R"EOF(
name: "envoy.overload_actions.reset_high_memory_stream"
triggers:
- name: "envoy.resource_monitors.testonly.fake_resource_monitor"
scaled:
scaling_threshold: 0.90
saturation_threshold: 0.98
)EOF"));
initialize();

// Set 10MiB receive window for the client.
const int downstream_window_size = 10 * 1024 * 1024;
envoy::config::core::v3::Http2ProtocolOptions http2_options =
::Envoy::Http2::Utility::initializeAndValidateOptions(
envoy::config::core::v3::Http2ProtocolOptions());
http2_options.mutable_initial_stream_window_size()->set_value(downstream_window_size);
http2_options.mutable_initial_connection_window_size()->set_value(downstream_window_size);
codec_client_ = makeRawHttpConnection(makeClientConnection(lookupPort("http")), http2_options);

// Makes us have Envoy's writes to downstream return EAGAIN
write_matcher_->setSourcePort(lookupPort("http"));
write_matcher_->setWriteReturnsEgain();

// Send a request
auto encoder_decoder = codec_client_->startRequest(Http::TestRequestHeaderMapImpl{
{":method", "POST"},
{":path", "/"},
{":scheme", "http"},
{":authority", "host"},
{"content-length", "10"},
});
auto& encoder = encoder_decoder.first;
const std::string data(10, 'a');
codec_client_->sendData(encoder, data, true);
auto response = std::move(encoder_decoder.second);

waitForNextUpstreamRequest();
FakeStreamPtr upstream_request_for_response = std::move(upstream_request_);

// Send the responses back. It is larger than the downstream's receive window
// size. Thus, the codec will not end the stream, but the Envoy level stream
// should.
upstream_request_for_response->encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "200"}},
false);
const int response_size = downstream_window_size + 1024; // Slightly over the window size.
upstream_request_for_response->encodeData(response_size, true);

if (streamBufferAccounting()) {
if (deferProcessingBackedUpStreams()) {
// Wait for an accumulation of data, as we cannot rely on the access log
// output since we're deferring the processing of the stream data.
EXPECT_TRUE(buffer_factory_->waitUntilTotalBufferedExceeds(10 * 10 * 1024));

} else {
// Wait for access log to know the Envoy level stream has been deleted.
EXPECT_THAT(waitForAccessLog(access_log_name_), HasSubstr("200"));
}
}

// Set the pressure so the overload action kills the response if doing stream
// accounting
updateResource(0.95);
test_server_->waitForGaugeEq(
"overload.envoy.overload_actions.reset_high_memory_stream.scale_percent", 62);

if (streamBufferAccounting()) {
test_server_->waitForCounterGe("envoy.overload_actions.reset_high_memory_stream.count", 1);
}

// Reduce resource pressure
updateResource(0.80);
test_server_->waitForGaugeEq(
"overload.envoy.overload_actions.reset_high_memory_stream.scale_percent", 0);

// Resume writes to downstream.
write_matcher_->setResumeWrites();

if (streamBufferAccounting()) {
EXPECT_TRUE(response->waitForReset());
EXPECT_TRUE(response->reset());
} else {
// If we're not doing the accounting, we didn't end up resetting the
// streams.
ASSERT_TRUE(response->waitForEndStream());
ASSERT_TRUE(response->complete());
EXPECT_EQ(response->headers().getStatusValue(), "200");
}
}

class Http2DeferredProcessingIntegrationTest : public Http2BufferWatermarksTest {
public:
Http2DeferredProcessingIntegrationTest() : registered_tee_factory_(tee_filter_factory_) {
Expand Down
12 changes: 3 additions & 9 deletions test/integration/filter_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -184,15 +184,9 @@ TEST_P(FilterIntegrationTest, MissingHeadersLocalReplyDownstreamBytesCount) {
EXPECT_EQ("200", response->headers().getStatusValue());

if (testing_downstream_filter_) {
if (Runtime::runtimeFeatureEnabled(Runtime::expand_agnostic_stream_lifetime)) {
expectDownstreamBytesSentAndReceived(BytesCountExpectation(90, 88, 71, 54),
BytesCountExpectation(40, 58, 40, 58),
BytesCountExpectation(7, 10, 7, 8));
} else {
expectDownstreamBytesSentAndReceived(BytesCountExpectation(90, 88, 71, 54),
BytesCountExpectation(0, 58, 0, 58),
BytesCountExpectation(7, 10, 7, 8));
}
expectDownstreamBytesSentAndReceived(BytesCountExpectation(90, 88, 71, 54),
BytesCountExpectation(40, 58, 40, 58),
BytesCountExpectation(7, 10, 7, 8));
}
}

Expand Down
17 changes: 5 additions & 12 deletions test/integration/multiplexed_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2159,11 +2159,6 @@ TEST_P(Http2FrameIntegrationTest, AccessLogOfWireBytesIfResponseSizeGreaterThanW
// Check access log if the agnostic stream lifetime is not extended.
// It should have access logged since it has received the entire response.
int hcm_logged_wire_bytes_sent, hcm_logged_wire_header_bytes_sent;
if (!Runtime::runtimeFeatureEnabled(Runtime::expand_agnostic_stream_lifetime)) {
auto access_log_values = stoiAccessLogString(waitForAccessLog(access_log_name_));
hcm_logged_wire_bytes_sent = access_log_values[0];
hcm_logged_wire_header_bytes_sent = access_log_values[1];
}

// Grant the sender (Envoy) additional window so it can finish sending the
// stream.
Expand All @@ -2181,13 +2176,11 @@ TEST_P(Http2FrameIntegrationTest, AccessLogOfWireBytesIfResponseSizeGreaterThanW
EXPECT_EQ(accumulator.bodyWireBytesReceivedDiscountingHeaders(),
accumulator.bodyWireBytesReceivedGivenPayloadAndFrames());

if (Runtime::runtimeFeatureEnabled(Runtime::expand_agnostic_stream_lifetime)) {
// Access logs are only available now due to the expanded agnostic stream
// lifetime.
auto access_log_values = stoiAccessLogString(waitForAccessLog(access_log_name_));
hcm_logged_wire_bytes_sent = access_log_values[0];
hcm_logged_wire_header_bytes_sent = access_log_values[1];
}
// Access logs are only available now due to the expanded agnostic stream
// lifetime.
auto access_log_values = stoiAccessLogString(waitForAccessLog(access_log_name_));
hcm_logged_wire_bytes_sent = access_log_values[0];
hcm_logged_wire_header_bytes_sent = access_log_values[1];
EXPECT_EQ(accumulator.stream_wire_header_bytes_recieved_, hcm_logged_wire_header_bytes_sent);
EXPECT_EQ(accumulator.stream_wire_bytes_recieved_, hcm_logged_wire_bytes_sent)
<< "Received " << accumulator.stream_wire_bytes_recieved_
Expand Down
24 changes: 6 additions & 18 deletions test/integration/protocol_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -201,15 +201,9 @@ TEST_P(DownstreamProtocolIntegrationTest, RouterRedirectHttpRequest) {
EXPECT_EQ("301", response->headers().getStatusValue());
EXPECT_EQ("https://www.redirect.com/foo",
response->headers().get(Http::Headers::get().Location)[0]->value().getStringView());
if (Runtime::runtimeFeatureEnabled(Runtime::expand_agnostic_stream_lifetime)) {
expectDownstreamBytesSentAndReceived(BytesCountExpectation(145, 45, 111, 23),
BytesCountExpectation(69, 30, 69, 30),
BytesCountExpectation(0, 30, 0, 30));
} else {
expectDownstreamBytesSentAndReceived(BytesCountExpectation(145, 45, 111, 23),
BytesCountExpectation(0, 30, 0, 30),
BytesCountExpectation(0, 30, 0, 30));
}
expectDownstreamBytesSentAndReceived(BytesCountExpectation(145, 45, 111, 23),
BytesCountExpectation(69, 30, 69, 30),
BytesCountExpectation(0, 30, 0, 30));
} else {
// All QUIC requests use https, and should not be redirected. (Even those sent with http scheme
// will be overridden to https by HCM.)
Expand Down Expand Up @@ -717,15 +711,9 @@ TEST_P(DownstreamProtocolIntegrationTest, MissingHeadersLocalReplyDownstreamByte
ASSERT_TRUE(response->waitForEndStream());
EXPECT_TRUE(response->complete());
EXPECT_EQ("200", response->headers().getStatusValue());
if (Runtime::runtimeFeatureEnabled(Runtime::expand_agnostic_stream_lifetime)) {
expectDownstreamBytesSentAndReceived(BytesCountExpectation(90, 88, 71, 54),
BytesCountExpectation(40, 58, 40, 58),
BytesCountExpectation(7, 10, 7, 8));
} else {
expectDownstreamBytesSentAndReceived(BytesCountExpectation(90, 88, 71, 54),
BytesCountExpectation(0, 58, 0, 58),
BytesCountExpectation(7, 10, 7, 8));
}
expectDownstreamBytesSentAndReceived(BytesCountExpectation(90, 88, 71, 54),
BytesCountExpectation(40, 58, 40, 58),
BytesCountExpectation(7, 10, 7, 8));
}

TEST_P(DownstreamProtocolIntegrationTest, MissingHeadersLocalReplyUpstreamBytesCount) {
Expand Down

0 comments on commit c390443

Please sign in to comment.