Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions changelogs/current.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,12 @@ new_features:
change: |
Added runtime guard ``envoy.reloadable_features.report_load_when_rq_active_is_non_zero``.
When enabled, LRS continues to send locality_stats reoprt to config server when there is no request_issued in the poll cycle.
- area: on_demand
change: |
Added runtime guard ``envoy.reloadable_features.on_demand_track_end_stream``.
When enabled, the on_demand filter tracks downstream end_stream state to support stream recreation with fully read request bodies.
Previously, the filter rejected all requests with bodies by checking for the presence of a decoding buffer,
even when the body was complete.
- area: router
change: |
Added :ref:`request_mirror_policies <envoy_v3_api_field_extensions.upstreams.http.v3.HttpProtocolOptions.request_mirror_policies>`
Expand Down
1 change: 1 addition & 0 deletions source/common/runtime/runtime_features.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ RUNTIME_GUARD(envoy_reloadable_features_no_extension_lookup_by_name);
RUNTIME_GUARD(envoy_reloadable_features_oauth2_cleanup_cookies);
RUNTIME_GUARD(envoy_reloadable_features_oauth2_encrypt_tokens);
RUNTIME_GUARD(envoy_reloadable_features_odcds_over_ads_fix);
RUNTIME_GUARD(envoy_reloadable_features_on_demand_track_end_stream);
RUNTIME_GUARD(envoy_reloadable_features_original_dst_rely_on_idle_timeout);
RUNTIME_GUARD(envoy_reloadable_features_prefix_map_matcher_resume_after_subtree_miss);
RUNTIME_GUARD(envoy_reloadable_features_quic_defer_logging_to_ack_listener);
Expand Down
32 changes: 26 additions & 6 deletions source/extensions/filters/http/on_demand/on_demand_update.cc
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,9 @@ OptRef<const Router::Route> OnDemandRouteUpdate::handleMissingRoute() {
return makeOptRefFromPtr(callbacks_->route().get());
}

Http::FilterHeadersStatus OnDemandRouteUpdate::decodeHeaders(Http::RequestHeaderMap&, bool) {
Http::FilterHeadersStatus OnDemandRouteUpdate::decodeHeaders(Http::RequestHeaderMap&,
bool end_stream) {
downstream_end_stream_ = end_stream;
auto config = getConfig();

config->decodeHeadersBehavior().decodeHeaders(*this);
Expand Down Expand Up @@ -207,13 +209,15 @@ const OnDemandFilterConfig* OnDemandRouteUpdate::getConfig() {
return config_.get();
}

Http::FilterDataStatus OnDemandRouteUpdate::decodeData(Buffer::Instance&, bool) {
Http::FilterDataStatus OnDemandRouteUpdate::decodeData(Buffer::Instance&, bool end_stream) {
downstream_end_stream_ = end_stream;
return filter_iteration_state_ == Http::FilterHeadersStatus::StopIteration
? Http::FilterDataStatus::StopIterationAndWatermark
: Http::FilterDataStatus::Continue;
}

Http::FilterTrailersStatus OnDemandRouteUpdate::decodeTrailers(Http::RequestTrailerMap&) {
downstream_end_stream_ = true;
return Http::FilterTrailersStatus::Continue;
}

Expand Down Expand Up @@ -241,9 +245,17 @@ void OnDemandRouteUpdate::onRouteConfigUpdateCompletion(bool route_exists) {
return;
}

if (route_exists && // route can be resolved after an on-demand
// VHDS update
!callbacks_->decodingBuffer() && // Redirects with body not yet supported.
bool can_recreate_stream = false;
if (Runtime::runtimeFeatureEnabled("envoy.reloadable_features.on_demand_track_end_stream")) {
// New behavior: track end_stream state to support stream recreation with fully read bodies.
can_recreate_stream = downstream_end_stream_;
} else {
// Old behavior: reject all requests with bodies.
can_recreate_stream = !callbacks_->decodingBuffer();
}
if (route_exists && // route can be resolved after an on-demand
// VHDS update
can_recreate_stream && // Redirects require fully read body.
callbacks_->recreateStream(/*headers=*/nullptr)) {
return;
}
Expand All @@ -257,8 +269,16 @@ void OnDemandRouteUpdate::onClusterDiscoveryCompletion(
Upstream::ClusterDiscoveryStatus cluster_status) {
filter_iteration_state_ = Http::FilterHeadersStatus::Continue;
cluster_discovery_handle_.reset();
bool can_recreate_stream = false;
if (Runtime::runtimeFeatureEnabled("envoy.reloadable_features.on_demand_track_end_stream")) {
// New behavior: track end_stream state to support stream recreation with fully read bodies.
can_recreate_stream = downstream_end_stream_;
} else {
// Old behavior: reject all requests with bodies.
can_recreate_stream = !callbacks_->decodingBuffer();
}
if (cluster_status == Upstream::ClusterDiscoveryStatus::Available &&
!callbacks_->decodingBuffer()) { // Redirects with body not yet supported.
can_recreate_stream) { // Redirects require fully read body.
const Http::ResponseHeaderMap* headers = nullptr;
if (callbacks_->recreateStream(headers)) {
callbacks_->downstreamCallbacks()->clearRouteCache();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ class OnDemandRouteUpdate : public Http::StreamDecoderFilter {
Upstream::ClusterDiscoveryCallbackHandlePtr cluster_discovery_handle_;
Envoy::Http::FilterHeadersStatus filter_iteration_state_{Http::FilterHeadersStatus::Continue};
bool decode_headers_active_{false};
bool downstream_end_stream_{false};
};

} // namespace OnDemand
Expand Down
84 changes: 74 additions & 10 deletions test/extensions/filters/http/on_demand/on_demand_filter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,34 @@ TEST_F(OnDemandFilterTest, TestDecodeTrailers) {
EXPECT_EQ(Http::FilterTrailersStatus::Continue, filter_->decodeTrailers(headers));
}

// tests onRouteConfigUpdateCompletion() when redirect contains a body with trailers (fully read)
TEST_F(OnDemandFilterTest, OnRouteConfigUpdateCompletionRestartsActiveStreamWithTrailers) {
Http::TestRequestHeaderMapImpl headers;
Http::TestRequestTrailerMapImpl trailers;
Buffer::OwnedImpl buffer;
// Simulate request with body and trailers (end_stream = true)
filter_->decodeHeaders(headers, false);
filter_->decodeData(buffer, false);
filter_->decodeTrailers(trailers);
EXPECT_CALL(decoder_callbacks_, recreateStream(_)).WillOnce(Return(true));
filter_->onRouteConfigUpdateCompletion(true);
}

// tests onClusterDiscoveryCompletion() when redirect contains a body with trailers (fully read)
TEST_F(OnDemandFilterTest, OnClusterDiscoveryCompletionClusterFoundWithTrailers) {
Http::TestRequestHeaderMapImpl headers;
Http::TestRequestTrailerMapImpl trailers;
Buffer::OwnedImpl buffer;
// Simulate request with body and trailers (end_stream = true)
filter_->decodeHeaders(headers, false);
filter_->decodeData(buffer, false);
filter_->decodeTrailers(trailers);
EXPECT_CALL(decoder_callbacks_, continueDecoding()).Times(0);
EXPECT_CALL(decoder_callbacks_.downstream_callbacks_, clearRouteCache());
EXPECT_CALL(decoder_callbacks_, recreateStream(_)).WillOnce(Return(true));
filter_->onClusterDiscoveryCompletion(Upstream::ClusterDiscoveryStatus::Available);
}

// tests decodeData() when filter state is Http::FilterHeadersStatus::Continue
TEST_F(OnDemandFilterTest, TestDecodeDataReturnsContinue) {
Buffer::OwnedImpl buffer;
Expand All @@ -130,25 +158,42 @@ TEST_F(OnDemandFilterTest,
filter_->onRouteConfigUpdateCompletion(false);
}

// tests onRouteConfigUpdateCompletion() when redirect contains a body
TEST_F(OnDemandFilterTest, TestOnRouteConfigUpdateCompletionContinuesDecodingWithRedirectWithBody) {
// tests onRouteConfigUpdateCompletion() when redirect contains a body but not fully read
TEST_F(OnDemandFilterTest,
TestOnRouteConfigUpdateCompletionContinuesDecodingWithRedirectWithIncompleteBody) {
Http::TestRequestHeaderMapImpl headers;
Buffer::OwnedImpl buffer;
// Simulate request with body that hasn't ended yet
filter_->decodeHeaders(headers, false);
filter_->decodeData(buffer, false);
EXPECT_CALL(decoder_callbacks_, continueDecoding());
EXPECT_CALL(decoder_callbacks_, decodingBuffer()).WillOnce(Return(&buffer));
filter_->onRouteConfigUpdateCompletion(true);
}

// tests onRouteConfigUpdateCompletion() when redirect contains a fully read body
TEST_F(OnDemandFilterTest, OnRouteConfigUpdateCompletionRestartsActiveStreamWithFullyReadBody) {
Http::TestRequestHeaderMapImpl headers;
Buffer::OwnedImpl buffer;
// Simulate request with body that has been fully read (end_stream = true)
filter_->decodeHeaders(headers, false);
filter_->decodeData(buffer, true);
EXPECT_CALL(decoder_callbacks_, recreateStream(_)).WillOnce(Return(true));
filter_->onRouteConfigUpdateCompletion(true);
}

// tests onRouteConfigUpdateCompletion() when ActiveStream recreation fails
TEST_F(OnDemandFilterTest, OnRouteConfigUpdateCompletionContinuesDecodingIfRedirectFails) {
Http::TestRequestHeaderMapImpl headers;
filter_->decodeHeaders(headers, true);
EXPECT_CALL(decoder_callbacks_, continueDecoding());
EXPECT_CALL(decoder_callbacks_, decodingBuffer()).WillOnce(Return(nullptr));
EXPECT_CALL(decoder_callbacks_, recreateStream(_)).WillOnce(Return(false));
filter_->onRouteConfigUpdateCompletion(true);
}

// tests onRouteConfigUpdateCompletion() when route was resolved
TEST_F(OnDemandFilterTest, OnRouteConfigUpdateCompletionRestartsActiveStream) {
EXPECT_CALL(decoder_callbacks_, decodingBuffer()).WillOnce(Return(nullptr));
Http::TestRequestHeaderMapImpl headers;
filter_->decodeHeaders(headers, true);
EXPECT_CALL(decoder_callbacks_, recreateStream(_)).WillOnce(Return(true));
filter_->onRouteConfigUpdateCompletion(true);
}
Expand All @@ -169,28 +214,47 @@ TEST_F(OnDemandFilterTest, OnClusterDiscoveryCompletionClusterTimedOut) {

// tests onClusterDiscoveryCompletion when a cluster is available
TEST_F(OnDemandFilterTest, OnClusterDiscoveryCompletionClusterFound) {
Http::TestRequestHeaderMapImpl headers;
filter_->decodeHeaders(headers, true);
EXPECT_CALL(decoder_callbacks_, continueDecoding()).Times(0);
EXPECT_CALL(decoder_callbacks_.downstream_callbacks_, clearRouteCache());
EXPECT_CALL(decoder_callbacks_, recreateStream(_)).WillOnce(Return(true));
filter_->onClusterDiscoveryCompletion(Upstream::ClusterDiscoveryStatus::Available);
}

// tests onClusterDiscoveryCompletion when a cluster is available with a fully read body
TEST_F(OnDemandFilterTest, OnClusterDiscoveryCompletionClusterFoundWithFullyReadBody) {
Http::TestRequestHeaderMapImpl headers;
Buffer::OwnedImpl buffer;
// Simulate request with body that has been fully read (end_stream = true)
filter_->decodeHeaders(headers, false);
filter_->decodeData(buffer, true);
EXPECT_CALL(decoder_callbacks_, continueDecoding()).Times(0);
EXPECT_CALL(decoder_callbacks_.downstream_callbacks_, clearRouteCache());
EXPECT_CALL(decoder_callbacks_, decodingBuffer()).WillOnce(Return(nullptr));
EXPECT_CALL(decoder_callbacks_, recreateStream(_)).WillOnce(Return(true));
filter_->onClusterDiscoveryCompletion(Upstream::ClusterDiscoveryStatus::Available);
}

// tests onClusterDiscoveryCompletion when a cluster is available, but recreating a stream failed
TEST_F(OnDemandFilterTest, OnClusterDiscoveryCompletionClusterFoundRecreateStreamFailed) {
Http::TestRequestHeaderMapImpl headers;
filter_->decodeHeaders(headers, true);
EXPECT_CALL(decoder_callbacks_, continueDecoding());
EXPECT_CALL(decoder_callbacks_.downstream_callbacks_, clearRouteCache()).Times(0);
EXPECT_CALL(decoder_callbacks_, decodingBuffer()).WillOnce(Return(nullptr));
EXPECT_CALL(decoder_callbacks_, recreateStream(_)).WillOnce(Return(false));
filter_->onClusterDiscoveryCompletion(Upstream::ClusterDiscoveryStatus::Available);
}

// tests onClusterDiscoveryCompletion when a cluster is available, but redirect contains a body
TEST_F(OnDemandFilterTest, OnClusterDiscoveryCompletionClusterFoundRedirectWithBody) {
// tests onClusterDiscoveryCompletion when a cluster is available, but redirect contains an
// incomplete body
TEST_F(OnDemandFilterTest, OnClusterDiscoveryCompletionClusterFoundRedirectWithIncompleteBody) {
Http::TestRequestHeaderMapImpl headers;
Buffer::OwnedImpl buffer;
// Simulate request with body that hasn't ended yet
filter_->decodeHeaders(headers, false);
filter_->decodeData(buffer, false);
EXPECT_CALL(decoder_callbacks_, continueDecoding());
EXPECT_CALL(decoder_callbacks_.downstream_callbacks_, clearRouteCache()).Times(0);
EXPECT_CALL(decoder_callbacks_, decodingBuffer()).WillOnce(Return(&buffer));
filter_->onClusterDiscoveryCompletion(Upstream::ClusterDiscoveryStatus::Available);
}

Expand Down
Loading