Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ext_proc filter crash when sending trailers when request has no body #27430

Merged
merged 3 commits into from
May 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions source/extensions/filters/http/ext_proc/ext_proc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,16 @@ FilterTrailersStatus Filter::onTrailers(ProcessorState& state, Http::HeaderMap&
}

if (!body_delivered && state.bodyMode() == ProcessingMode::BUFFERED) {
// If no gRPC stream yet, opens it before sending data.
switch (openStream()) {
case StreamOpenState::Error:
return FilterTrailersStatus::StopIteration;
case StreamOpenState::IgnoreError:
return FilterTrailersStatus::Continue;
case StreamOpenState::Ok:
// Fall through
break;
}
// We would like to process the body in a buffered way, but until now the complete
// body has not arrived. With the arrival of trailers, we now know that the body
// has arrived.
Expand Down Expand Up @@ -516,6 +526,17 @@ void Filter::sendBodyChunk(ProcessorState& state, const Buffer::Instance& data,
stats_.stream_msgs_sent_.inc();
}

void Filter::sendBufferedData(ProcessorState& state, ProcessorState::CallbackState new_state,
bool end_stream) {
if (state.hasBufferedData()) {
sendBodyChunk(state, *state.bufferedData(), new_state, end_stream);
} else {
// If there is no buffered data, sends an empty body.
Buffer::OwnedImpl data("");
sendBodyChunk(state, data, new_state, end_stream);
}
}

void Filter::sendTrailers(ProcessorState& state, const Http::HeaderMap& trailers) {
ProcessingRequest req;
auto* trailers_req = state.mutableTrailers(req);
Expand Down
5 changes: 2 additions & 3 deletions source/extensions/filters/http/ext_proc/ext_proc.h
Original file line number Diff line number Diff line change
Expand Up @@ -194,9 +194,8 @@ class Filter : public Logger::Loggable<Logger::Id::ext_proc>,
void onNewTimeout(const ProtobufWkt::Duration& override_message_timeout);

void sendBufferedData(ProcessorState& state, ProcessorState::CallbackState new_state,
bool end_stream) {
sendBodyChunk(state, *state.bufferedData(), new_state, end_stream);
}
bool end_stream);

void sendBodyChunk(ProcessorState& state, const Buffer::Instance& data,
ProcessorState::CallbackState new_state, bool end_stream);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,27 @@ class ExtProcIntegrationTest : public HttpIntegrationTest,
}
}

void processRequestTrailersMessage(
FakeUpstream& grpc_upstream, bool first_message,
absl::optional<std::function<bool(const HttpTrailers&, TrailersResponse&)>> cb) {
ProcessingRequest request;
if (first_message) {
ASSERT_TRUE(grpc_upstream.waitForHttpConnection(*dispatcher_, processor_connection_));
ASSERT_TRUE(processor_connection_->waitForNewStream(*dispatcher_, processor_stream_));
}
ASSERT_TRUE(processor_stream_->waitForGrpcMessage(*dispatcher_, request));
ASSERT_TRUE(request.has_request_trailers());
if (first_message) {
processor_stream_->startGrpcStream();
}
ProcessingResponse response;
auto* body = response.mutable_request_trailers();
const bool sendReply = !cb || (*cb)(request.request_trailers(), *body);
if (sendReply) {
processor_stream_->sendGrpcMessage(response);
}
}

void processResponseHeadersMessage(
FakeUpstream& grpc_upstream, bool first_message,
absl::optional<std::function<bool(const HttpHeaders&, HeadersResponse&)>> cb) {
Expand Down Expand Up @@ -1969,4 +1990,28 @@ TEST_P(ExtProcIntegrationTest, RequestMessageNewTimeoutOutOfBounds) {
newTimeoutWrongConfigTest(override_message_timeout);
}

// Set the ext_proc filter in SKIP header, BUFFERED body mode.
// Send a request with headers and trailers.
TEST_P(ExtProcIntegrationTest, SendHeaderAndTrailerInBufferedMode) {
proto_config_.mutable_processing_mode()->set_request_body_mode(ProcessingMode::BUFFERED);
proto_config_.mutable_processing_mode()->set_request_header_mode(ProcessingMode::SKIP);
proto_config_.mutable_processing_mode()->set_request_trailer_mode(ProcessingMode::SEND);
initializeConfig();
HttpIntegrationTest::initialize();

codec_client_ = makeHttpConnection(lookupPort("http"));
Http::TestRequestHeaderMapImpl headers;
HttpTestUtility::addDefaultHeaders(headers);
auto encoder_decoder = codec_client_->startRequest(headers);
request_encoder_ = &encoder_decoder.first;
IntegrationStreamDecoderPtr response = std::move(encoder_decoder.second);
Http::TestRequestTrailerMapImpl request_trailers{{"request", "trailer"}};
codec_client_->sendTrailers(*request_encoder_, request_trailers);
processRequestBodyMessage(*grpc_upstreams_[0], true, absl::nullopt);
processRequestTrailersMessage(*grpc_upstreams_[0], false, absl::nullopt);
handleUpstreamRequest();
processResponseHeadersMessage(*grpc_upstreams_[0], false, absl::nullopt);
verifyDownstreamResponse(*response, 200);
}

} // namespace Envoy

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.