Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix a regression where response exceptions are logged unintentionally #6035

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ class HttpResponseWrapper implements StreamWriter<HttpObject> {
private final EventLoop eventLoop;
private final ClientRequestContext ctx;
private final long maxContentLength;
static final String UNEXPECTED_EXCEPTION_MSG = "Unexpected exception while closing a request";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

Suggested change
static final String UNEXPECTED_EXCEPTION_MSG = "Unexpected exception while closing a request";
@VisibleForTesting
static final String UNEXPECTED_EXCEPTION_MSG = "Unexpected exception while closing a request";


private boolean responseStarted;
private long contentLengthHeaderValue = -1;
Expand Down Expand Up @@ -232,14 +233,16 @@ void close(@Nullable Throwable cause, boolean cancel) {
requestAutoAbortDelayMillis, TimeUnit.MILLISECONDS);
}

private void closeAction(@Nullable Throwable cause) {
private boolean closeAction(@Nullable Throwable cause) {
final boolean closed;
if (cause != null) {
delegate.close(cause);
closed = delegate.tryClose(cause);
ctx.logBuilder().endResponse(cause);
} else {
delegate.close();
closed = delegate.tryClose();
ctx.logBuilder().endResponse();
}
return closed;
}

private void cancelAction(@Nullable Throwable cause) {
Expand All @@ -262,8 +265,10 @@ private void cancelTimeoutAndLog(@Nullable Throwable cause, boolean cancel) {
cancelAction(cause);
return;
}
if (delegate.isOpen()) {
closeAction(cause);

// don't log if the cause will be exposed via the response/log
if (delegate.isOpen() && closeAction(cause)) {
return;
}

// the context has been cancelled either by timeout or by user invocation
Expand All @@ -275,7 +280,7 @@ private void cancelTimeoutAndLog(@Nullable Throwable cause, boolean cancel) {
return;
}

final StringBuilder logMsg = new StringBuilder("Unexpected exception while closing a request");
final StringBuilder logMsg = new StringBuilder(UNEXPECTED_EXCEPTION_MSG);
final HttpRequest request = ctx.request();
assert request != null;
final String authority = request.authority();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -441,28 +441,41 @@ private void handleCloseEvent(SubscriptionImpl subscription, CloseEvent o) {

@Override
public void close() {
if (setState(State.OPEN, State.CLOSED)) {
addObjectOrEvent(SUCCESSFUL_CLOSE);
}
tryClose();
}

@Override
public final void close(Throwable cause) {
requireNonNull(cause, "cause");
if (cause instanceof CancelledSubscriptionException) {
throw new IllegalArgumentException("cause: " + cause + " (must use Subscription.cancel())");
}

tryClose(cause);
}

/**
* Tries to close the stream.
*
* @return {@code true} if the stream has been closed by this method call.
* {@code false} if the stream has been closed already by another party.
*/
@UnstableApi
public final boolean tryClose() {
if (setState(State.OPEN, State.CLOSED)) {
addObjectOrEvent(SUCCESSFUL_CLOSE);
return true;
}
return false;
}

/**
* Tries to close the stream with the specified {@code cause}.
*
* @return {@code true} if the stream has been closed by this method call.
* {@code false} if the stream has been closed already by other party.
* {@code false} if the stream has been closed already by another party.
*/
public final boolean tryClose(Throwable cause) {
if (cause instanceof CancelledSubscriptionException) {
throw new IllegalArgumentException("cause: " + cause + " (must use Subscription.cancel())");
}
if (setState(State.OPEN, State.CLOSED)) {
addObjectOrEvent(new CloseEvent(cause));
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@
*/
package com.linecorp.armeria.client;

import static io.netty.handler.codec.http2.Http2CodecUtil.connectionPrefaceBuf;
import static com.linecorp.armeria.internal.testing.Http2ByteUtil.handleInitialExchange;
import static com.linecorp.armeria.internal.testing.Http2ByteUtil.newClientFactory;
import static com.linecorp.armeria.internal.testing.Http2ByteUtil.readFrame;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;
Expand All @@ -31,14 +32,9 @@
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.RegisterExtension;

import com.google.common.io.ByteStreams;

import com.linecorp.armeria.common.AggregatedHttpResponse;
import com.linecorp.armeria.testing.junit5.common.EventLoopExtension;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http2.Http2CodecUtil;
import io.netty.handler.codec.http2.Http2FrameTypes;

@Timeout(10)
Expand All @@ -53,7 +49,7 @@ class Http2GoAwayTest {
@Test
void streamEndsBeforeGoAway() throws Exception {
try (ServerSocket ss = new ServerSocket(0);
ClientFactory clientFactory = newClientFactory()) {
ClientFactory clientFactory = newClientFactory(eventLoop.get())) {

final int port = ss.getLocalPort();

Expand Down Expand Up @@ -101,7 +97,7 @@ void streamEndsBeforeGoAway() throws Exception {
@Test
void streamEndsAfterGoAway() throws Exception {
try (ServerSocket ss = new ServerSocket(0);
ClientFactory clientFactory = newClientFactory()) {
ClientFactory clientFactory = newClientFactory(eventLoop.get())) {

final int port = ss.getLocalPort();

Expand Down Expand Up @@ -150,7 +146,7 @@ void streamEndsAfterGoAway() throws Exception {
@Test
void streamGreaterThanLastStreamId() throws Exception {
try (ServerSocket ss = new ServerSocket(0);
ClientFactory clientFactory = newClientFactory()) {
ClientFactory clientFactory = newClientFactory(eventLoop.get())) {

final int port = ss.getLocalPort();

Expand Down Expand Up @@ -208,55 +204,4 @@ void streamGreaterThanLastStreamId() throws Exception {
}
}
}

private static ClientFactory newClientFactory() {
return ClientFactory.builder()
.useHttp2Preface(true)
// Set the window size to the HTTP/2 default values to simplify the traffic.
.http2InitialConnectionWindowSize(Http2CodecUtil.DEFAULT_WINDOW_SIZE)
.http2InitialStreamWindowSize(Http2CodecUtil.DEFAULT_WINDOW_SIZE)
.workerGroup(eventLoop.get(), false)
.build();
}

private static void handleInitialExchange(InputStream in, BufferedOutputStream out) throws IOException {
// Read the connection preface and discard it.
readBytes(in, connectionPrefaceBuf().readableBytes());

// Read a SETTINGS frame.
assertThat(readFrame(in).getByte(3)).isEqualTo(Http2FrameTypes.SETTINGS);

// Send a SETTINGS frame and the ack for the received SETTINGS frame.
sendEmptySettingsAndAckFrame(out);

// Read a SETTINGS ack frame.
assertThat(readFrame(in).getByte(3)).isEqualTo(Http2FrameTypes.SETTINGS);
}

private static byte[] readBytes(InputStream in, int length) throws IOException {
final byte[] buf = new byte[length];
ByteStreams.readFully(in, buf);
return buf;
}

private static void sendEmptySettingsAndAckFrame(BufferedOutputStream bos) throws IOException {
// Send an empty SETTINGS frame.
bos.write(new byte[] { 0x00, 0x00, 0x00, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00 });
// Send a SETTINGS_ACK frame.
bos.write(new byte[] { 0x00, 0x00, 0x00, 0x04, 0x01, 0x00, 0x00, 0x00, 0x00 });
bos.flush();
}

private static int payloadLength(byte[] buf) {
return (buf[0] & 0xff) << 16 | (buf[1] & 0xff) << 8 | (buf[2] & 0xff);
}

private static ByteBuf readFrame(InputStream in) throws IOException {
final byte[] frameBuf = readBytes(in, 9);
final int payloadLength = payloadLength(frameBuf);
final ByteBuf buffer = Unpooled.buffer(9 + payloadLength);
buffer.writeBytes(frameBuf);
buffer.writeBytes(in, payloadLength);
return buffer;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Copyright 2024 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package com.linecorp.armeria.client;

import static com.linecorp.armeria.internal.testing.Http2ByteUtil.handleInitialExchange;
import static com.linecorp.armeria.internal.testing.Http2ByteUtil.newClientFactory;
import static com.linecorp.armeria.internal.testing.Http2ByteUtil.readFrame;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.awaitility.Awaitility.await;

import java.io.BufferedOutputStream;
import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.slf4j.LoggerFactory;

import com.linecorp.armeria.common.AggregatedHttpResponse;
import com.linecorp.armeria.common.ClosedSessionException;
import com.linecorp.armeria.common.HttpMethod;
import com.linecorp.armeria.common.HttpRequest;
import com.linecorp.armeria.testing.junit5.common.EventLoopExtension;

import ch.qos.logback.classic.Logger;
import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.read.ListAppender;
import io.netty.handler.codec.http2.Http2FrameTypes;

class HttpResponseWrapperLogTest {

@RegisterExtension
static final EventLoopExtension eventLoop = new EventLoopExtension();

private static final LoggerContext context = (LoggerContext) LoggerFactory.getILoggerFactory();
private static final Logger logger =
(Logger) LoggerFactory.getLogger(HttpResponseWrapper.class);
private static final ListAppender<ILoggingEvent> appender = new ListAppender<>();

@BeforeEach
void beforeEach() {
appender.setContext(context);
appender.start();
logger.addAppender(appender);
}

@AfterEach
void afterEach() {
appender.stop();
logger.detachAppender(appender);
}

@Test
void goAwayNotLogged() throws Exception {
try (ServerSocket ss = new ServerSocket(0);
ClientFactory clientFactory = newClientFactory(eventLoop.get())) {

final int port = ss.getLocalPort();

final WebClient client = WebClient.builder("h2c://127.0.0.1:" + port)
.factory(clientFactory)
.build();
final HttpRequest req = HttpRequest.streaming(HttpMethod.GET, "/");
final CompletableFuture<AggregatedHttpResponse> resFuture = client.execute(req).aggregate();
try (Socket s = ss.accept()) {

final InputStream in = s.getInputStream();
final BufferedOutputStream bos = new BufferedOutputStream(s.getOutputStream());
handleInitialExchange(in, bos);

// Read a HEADERS frame.
assertThat(readFrame(in).getByte(3)).isEqualTo(Http2FrameTypes.HEADERS);

// Send a GOAWAY frame.
bos.write(new byte[] {
0x00, 0x00, 0x08, 0x07, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x03, // lastStreamId = 3
0x00, 0x00, 0x00, 0x00 // errorCode = 0
});
bos.flush();

// The second request should fail with UnprocessedRequestException
// which has a cause of GoAwayReceivedException.
await().untilAsserted(resFuture::isCompletedExceptionally);
assertThatThrownBy(resFuture::join).isInstanceOf(CompletionException.class)
.hasCauseInstanceOf(ClosedSessionException.class);

// Read a GOAWAY frame.
assertThat(readFrame(in).getByte(3)).isEqualTo(Http2FrameTypes.GO_AWAY);

assertThat(in.read()).isEqualTo(-1);
}
}
assertThat(appender.list).allSatisfy(event -> {
assertThat(event.getMessage())
.doesNotContain(HttpResponseWrapper.UNEXPECTED_EXCEPTION_MSG);
});
}
}
3 changes: 2 additions & 1 deletion settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -207,14 +207,15 @@ includeWithFlags ':it:builders', 'java'
includeWithFlags ':it:context-storage', 'java'
includeWithFlags ':it:dgs', 'java17'
includeWithFlags ':it:flags-cyclic-dep', 'java'
includeWithFlags ':it:flags-provider', 'java', 'relocate'
includeWithFlags ':it:graphql-multipart', 'java17'
includeWithFlags ':it:grpcweb', 'java', 'akka-grpc_2.13'
includeWithFlags ':it:grpc:java', 'java'
includeWithFlags ':it:grpc:kotlin', 'java', 'relocate', 'kotlin-grpc', 'kotlin'
includeWithFlags ':it:grpc:kotlin-coroutine-context-provider', 'java', 'relocate', 'kotlin-grpc', 'kotlin'
includeWithFlags ':it:grpc:scala', 'java', 'relocate', 'scala-grpc_2.13', 'scala_2.13'
includeWithFlags ':it:grpc:reactor', 'java', 'relocate', 'reactor-grpc'
includeWithFlags ':it:flags-provider', 'java', 'relocate'
includeWithFlags ':it:internal-logging', 'java', 'relocate'
includeWithFlags ':it:jackson-provider', 'java', 'relocate'
includeWithFlags ':it:kotlin', 'java', 'relocate', 'kotlin'
includeWithFlags ':it:kubernetes-chaos-tests', 'java', 'relocate'
Expand Down
Loading
Loading