Skip to content

Commit

Permalink
Re-enable async dispatches in Observation Filter
Browse files Browse the repository at this point in the history
Prior to this commit, the fix for gh-32730 disabled the involvment of
the osbervation filter for async dispatches. Instead of relying on ASYNC
dispatches to close the observation for async requests, this is now
using an async listener instead: async dispatches are not guaranteed to
happen once the async request is handled.

This change caused another side-effect: because async dispatches are not
considered anymore by this filter, the observation scope is not
reinstated for async dispatches. For example, `ResponseBodyAdvice`
implementations do not have the observation scope opened during their
execution.

This commit re-enables async dispatches for this filter, but ensures
that observations are not closed during such dispatches as this will be
done by the async listener.

Fixes gh-33091
  • Loading branch information
bclozel committed Jul 1, 2024
1 parent 61adf2d commit ab236c7
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.micrometer.observation.ObservationRegistry;
import jakarta.servlet.AsyncEvent;
import jakarta.servlet.AsyncListener;
import jakarta.servlet.DispatcherType;
import jakarta.servlet.FilterChain;
import jakarta.servlet.RequestDispatcher;
import jakarta.servlet.ServletException;
Expand Down Expand Up @@ -97,6 +98,11 @@ public static Optional<ServerRequestObservationContext> findObservationContext(H
return Optional.ofNullable((ServerRequestObservationContext) request.getAttribute(CURRENT_OBSERVATION_CONTEXT_ATTRIBUTE));
}

@Override
protected boolean shouldNotFilterAsyncDispatch() {
return false;
}

@Override
@SuppressWarnings("try")
protected void doFilterInternal(HttpServletRequest request, HttpServletResponse response, FilterChain filterChain)
Expand All @@ -116,8 +122,9 @@ protected void doFilterInternal(HttpServletRequest request, HttpServletResponse
if (request.isAsyncStarted()) {
request.getAsyncContext().addListener(new ObservationAsyncListener(observation));
}
// Stop Observation right now if async processing has not been started.
else {
// scope is opened for ASYNC dispatches, but the observation will be closed
// by the async listener.
else if (request.getDispatcherType() != DispatcherType.ASYNC){
Throwable error = fetchException(request);
if (error != null) {
observation.error(error);
Expand Down Expand Up @@ -176,7 +183,6 @@ public void onComplete(AsyncEvent event) {
@Override
public void onError(AsyncEvent event) {
this.currentObservation.error(unwrapServletException(event.getThrowable()));
this.currentObservation.stop();
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,24 @@

package org.springframework.web.filter;

import java.io.IOException;

import io.micrometer.observation.ObservationRegistry;
import io.micrometer.observation.tck.TestObservationRegistry;
import io.micrometer.observation.tck.TestObservationRegistryAssert;
import jakarta.servlet.AsyncEvent;
import jakarta.servlet.AsyncListener;
import jakarta.servlet.DispatcherType;
import jakarta.servlet.RequestDispatcher;
import jakarta.servlet.ServletException;
import jakarta.servlet.http.HttpServlet;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import org.junit.jupiter.api.Test;

import org.springframework.http.HttpMethod;
import org.springframework.http.server.observation.ServerRequestObservationContext;
import org.springframework.web.testfixture.servlet.MockAsyncContext;
import org.springframework.web.testfixture.servlet.MockFilterChain;
import org.springframework.web.testfixture.servlet.MockHttpServletRequest;
import org.springframework.web.testfixture.servlet.MockHttpServletResponse;
Expand All @@ -41,18 +50,18 @@ class ServerHttpObservationFilterTests {

private final TestObservationRegistry observationRegistry = TestObservationRegistry.create();

private final ServerHttpObservationFilter filter = new ServerHttpObservationFilter(this.observationRegistry);

private final MockFilterChain mockFilterChain = new MockFilterChain();

private final MockHttpServletRequest request = new MockHttpServletRequest(HttpMethod.GET.name(), "/resource/test");

private final MockHttpServletResponse response = new MockHttpServletResponse();

private MockFilterChain mockFilterChain = new MockFilterChain();

private ServerHttpObservationFilter filter = new ServerHttpObservationFilter(this.observationRegistry);


@Test
void filterShouldNotProcessAsyncDispatch() {
assertThat(this.filter.shouldNotFilterAsyncDispatch()).isTrue();
void filterShouldProcessAsyncDispatch() {
assertThat(this.filter.shouldNotFilterAsyncDispatch()).isFalse();
}

@Test
Expand All @@ -68,6 +77,12 @@ void filterShouldFillObservationContext() throws Exception {
assertThatHttpObservation().hasLowCardinalityKeyValue("outcome", "SUCCESS").hasBeenStopped();
}

@Test
void filterShouldOpenScope() throws Exception {
this.mockFilterChain = new MockFilterChain(new ScopeCheckingServlet(this.observationRegistry));
filter.doFilter(this.request, this.response, this.mockFilterChain);
}

@Test
void filterShouldAcceptNoOpObservationContext() throws Exception {
ServerHttpObservationFilter filter = new ServerHttpObservationFilter(ObservationRegistry.NOOP);
Expand Down Expand Up @@ -124,9 +139,52 @@ void shouldCloseObservationAfterAsyncCompletion() throws Exception {
assertThatHttpObservation().hasLowCardinalityKeyValue("outcome", "SUCCESS").hasBeenStopped();
}

@Test
void shouldCloseObservationAfterAsyncError() throws Exception {
this.request.setAsyncSupported(true);
this.request.startAsync();
this.filter.doFilter(this.request, this.response, this.mockFilterChain);
MockAsyncContext asyncContext = (MockAsyncContext) this.request.getAsyncContext();
for (AsyncListener listener : asyncContext.getListeners()) {
listener.onError(new AsyncEvent(this.request.getAsyncContext(), new IllegalStateException("test error")));
}
asyncContext.complete();
assertThatHttpObservation().hasLowCardinalityKeyValue("exception", "IllegalStateException").hasBeenStopped();
}

@Test
void shouldNotCloseObservationDuringAsyncDispatch() throws Exception {
this.mockFilterChain = new MockFilterChain(new ScopeCheckingServlet(this.observationRegistry));
this.request.setDispatcherType(DispatcherType.ASYNC);
this.filter.doFilter(this.request, this.response, this.mockFilterChain);
TestObservationRegistryAssert.assertThat(this.observationRegistry)
.hasObservationWithNameEqualTo("http.server.requests")
.that().isNotStopped();
}

private TestObservationRegistryAssert.TestObservationRegistryAssertReturningObservationContextAssert assertThatHttpObservation() {
TestObservationRegistryAssert.assertThat(this.observationRegistry)
.hasNumberOfObservationsWithNameEqualTo("http.server.requests", 1);

return TestObservationRegistryAssert.assertThat(this.observationRegistry)
.hasObservationWithNameEqualTo("http.server.requests").that();
.hasObservationWithNameEqualTo("http.server.requests")
.that()
.hasBeenStopped();
}

@SuppressWarnings("serial")
static class ScopeCheckingServlet extends HttpServlet {

private final ObservationRegistry observationRegistry;

public ScopeCheckingServlet(ObservationRegistry observationRegistry) {
this.observationRegistry = observationRegistry;
}

@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
assertThat(this.observationRegistry.getCurrentObservation()).isNotNull();
}
}

}

0 comments on commit ab236c7

Please sign in to comment.