Skip to content

Commit

Permalink
Fixes #5633 - Allow to configure HttpClient request authority.
Browse files Browse the repository at this point in the history
Updated after review.
Added more tests.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
  • Loading branch information
sbordet committed Nov 12, 2020
1 parent 96e4b38 commit dd9bdc7
Show file tree
Hide file tree
Showing 2 changed files with 166 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@

package org.eclipse.jetty.http2.client;

import java.nio.ByteBuffer;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import org.eclipse.jetty.http.HttpFields;
Expand All @@ -30,7 +32,11 @@
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.CloseState;
import org.eclipse.jetty.http2.ErrorCode;
import org.eclipse.jetty.http2.FlowControlStrategy;
import org.eclipse.jetty.http2.HTTP2Session;
import org.eclipse.jetty.http2.ISession;
import org.eclipse.jetty.http2.IStream;
import org.eclipse.jetty.http2.SimpleFlowControlStrategy;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.api.server.ServerSessionListener;
Expand Down Expand Up @@ -173,7 +179,7 @@ public void onFailure(Stream stream, int error, String reason, Throwable failure
});

Assertions.assertTrue(clientGoAwayLatch.await(5, TimeUnit.SECONDS));
// Assertions.assertTrue(streamFailureLatch.await(5, TimeUnit.SECONDS));
Assertions.assertTrue(streamFailureLatch.await(5, TimeUnit.SECONDS));
Assertions.assertTrue(serverGoAwayLatch.await(5, TimeUnit.SECONDS));
Assertions.assertTrue(clientCloseLatch.await(5, TimeUnit.SECONDS));
Assertions.assertTrue(serverCloseLatch.await(5, TimeUnit.SECONDS));
Expand Down Expand Up @@ -354,9 +360,125 @@ public void onHeaders(Stream stream, HeadersFrame frame)
// The server should have sent the GOAWAY after the last stream completed.

Assertions.assertTrue(clientGoAwayLatch.await(5, TimeUnit.SECONDS));
Assertions.assertTrue(serverGoAwayLatch.await(5, TimeUnit.SECONDS));
Assertions.assertTrue(clientCloseLatch.await(5, TimeUnit.SECONDS));
Assertions.assertTrue(serverCloseLatch.await(5, TimeUnit.SECONDS));

Assertions.assertFalse(((HTTP2Session)serverSessionRef.get()).getEndPoint().isOpen());
Assertions.assertFalse(((HTTP2Session)clientSession).getEndPoint().isOpen());
}

@Test
public void testServerGoAwayWithStalledStreamServerConsumesDataOfInFlightStream() throws Exception
{
int flowControlWindow = 32 * 1024;

AtomicReference<Session> serverSessionRef = new AtomicReference<>();
CountDownLatch serverGoAwayLatch = new CountDownLatch(1);
CountDownLatch serverCloseLatch = new CountDownLatch(1);
start(new ServerSessionListener.Adapter()
{
@Override
public void onAccept(Session session)
{
serverSessionRef.set(session);
}

@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
{
AtomicInteger dataFrames = new AtomicInteger();
return new Stream.Listener.Adapter()
{
@Override
public void onData(Stream stream, DataFrame frame, Callback callback)
{
// Do not consume the data for this stream (i.e. don't succeed the callback).
// Only send the response when receiving the first DATA frame.
if (dataFrames.incrementAndGet() == 1)
{
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields());
stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP);
}
}
};
}

@Override
public void onGoAway(Session session, GoAwayFrame frame)
{
serverGoAwayLatch.countDown();
}

@Override
public void onClose(Session session, GoAwayFrame frame)
{
serverCloseLatch.countDown();
}
}, h2 ->
{
// Use the simple, predictable, strategy for window updates.
h2.setFlowControlStrategyFactory(SimpleFlowControlStrategy::new);
h2.setInitialSessionRecvWindow(flowControlWindow);
h2.setInitialStreamRecvWindow(flowControlWindow);
});

CountDownLatch clientGoAwayLatch = new CountDownLatch(1);
CountDownLatch clientCloseLatch = new CountDownLatch(1);
Session clientSession = newClient(new Session.Listener.Adapter()
{
@Override
public void onGoAway(Session session, GoAwayFrame frame)
{
clientGoAwayLatch.countDown();
}

@Override
public void onClose(Session session, GoAwayFrame frame)
{
clientCloseLatch.countDown();
}
});
// This is necessary because the server session window is smaller than the
// default and the server cannot send a WINDOW_UPDATE with a negative value.
((ISession)clientSession).updateSendWindow(flowControlWindow - FlowControlStrategy.DEFAULT_WINDOW_SIZE);

MetaData.Request request1 = newRequest("GET", new HttpFields());
HeadersFrame headersFrame1 = new HeadersFrame(request1, null, false);
DataFrame dataFrame1 = new DataFrame(ByteBuffer.allocate(flowControlWindow / 2), false);
((ISession)clientSession).newStream(new IStream.FrameList(headersFrame1, dataFrame1, null), new Promise.Adapter<>(), new Stream.Listener.Adapter()
{
@Override
public void onHeaders(Stream clientStream1, HeadersFrame frame)
{
// Send the server GOAWAY frame.
serverSessionRef.get().close(ErrorCode.NO_ERROR.code, null, Callback.NOOP);

// Send a second, in-flight, stream with data, which
// will exhaust the client session flow control window.
// The server should consume the data even if it will drop
// this stream, so that the first stream can send more data.
MetaData.Request request2 = newRequest("POST", new HttpFields());
HeadersFrame headersFrame2 = new HeadersFrame(request2, null, false);
DataFrame dataFrame2 = new DataFrame(ByteBuffer.allocate(flowControlWindow / 2), true);
((ISession)clientStream1.getSession()).newStream(new IStream.FrameList(headersFrame2, dataFrame2, null), new Promise.Adapter<Stream>()
{
@Override
public void succeeded(Stream clientStream2)
{
// After the in-flight stream is sent, try to complete the first stream.
// The client should receive the window update from
// the server and be able to complete this stream.
clientStream1.data(new DataFrame(clientStream1.getId(), ByteBuffer.allocate(flowControlWindow / 2), true), Callback.NOOP);
}
}, new Adapter());
}
});

Assertions.assertTrue(clientGoAwayLatch.await(5, TimeUnit.SECONDS));
Assertions.assertTrue(serverGoAwayLatch.await(5, TimeUnit.SECONDS));
Assertions.assertTrue(serverCloseLatch.await(5, TimeUnit.SECONDS));
Assertions.assertTrue(clientCloseLatch.await(5, TimeUnit.SECONDS));

Assertions.assertFalse(((HTTP2Session)serverSessionRef.get()).getEndPoint().isOpen());
Assertions.assertFalse(((HTTP2Session)clientSession).getEndPoint().isOpen());
Expand Down Expand Up @@ -643,6 +765,7 @@ public void onClose(Session session, GoAwayFrame frame)
});

Session clientSession = newClient(new Session.Listener.Adapter());
// TODO: get rid of sleep!
// Wait for the SETTINGS frames to be exchanged.
Thread.sleep(500);

Expand Down Expand Up @@ -691,6 +814,8 @@ public void onGoAway(Session session, GoAwayFrame frame)
Assertions.assertFalse(((HTTP2Session)serverSessionRef.get()).getEndPoint().isOpen());
}

// TODO: add a shutdown test with pending stream.

@Test
public void testServerIdleTimeout() throws Exception
{
Expand Down Expand Up @@ -729,7 +854,8 @@ public void onClose(Session session, GoAwayFrame frame)
@Override
public void onGoAway(Session session, GoAwayFrame frame)
{
clientGoAwayLatch.countDown();
if (!frame.isGraceful())
clientGoAwayLatch.countDown();
}

@Override
Expand Down Expand Up @@ -802,13 +928,22 @@ public void onClose(Session session, GoAwayFrame frame)
clientCloseLatch.countDown();
}
});
CountDownLatch clientResetLatch = new CountDownLatch(1);
MetaData.Request request = newRequest(HttpMethod.GET.asString(), new HttpFields());
// Send request headers but not data.
clientSession.newStream(new HeadersFrame(request, null, false), new Promise.Adapter<>(), new Stream.Listener.Adapter());
clientSession.newStream(new HeadersFrame(request, null, false), new Promise.Adapter<>(), new Stream.Listener.Adapter()
{
@Override
public void onReset(Stream stream, ResetFrame frame)
{
clientResetLatch.countDown();
}
});

Assertions.assertTrue(clientGracefulGoAwayLatch.await(5, TimeUnit.SECONDS));
// Server idle timeout sends a non-graceful GOAWAY.
Assertions.assertTrue(clientGoAwayLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS));
Assertions.assertTrue(clientResetLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS));
Assertions.assertTrue(clientGoAwayLatch.await(5, TimeUnit.SECONDS));
Assertions.assertTrue(serverCloseLatch.await(5, TimeUnit.SECONDS));
Assertions.assertTrue(clientCloseLatch.await(5, TimeUnit.SECONDS));

Expand Down Expand Up @@ -871,23 +1006,22 @@ public void onClose(Session session, GoAwayFrame frame)
}
});
MetaData.Request request = newRequest(HttpMethod.GET.asString(), new HttpFields());
CountDownLatch streamFailureLatch = new CountDownLatch(1);
CountDownLatch streamResetLatch = new CountDownLatch(1);
clientSession.newStream(new HeadersFrame(request, null, false), new Promise.Adapter<>(), new Stream.Listener.Adapter()
{
@Override
public void onFailure(Stream stream, int error, String reason, Throwable failure, Callback callback)
public void onReset(Stream stream, ResetFrame frame)
{
streamFailureLatch.countDown();
callback.succeeded();
streamResetLatch.countDown();
}
});

// Client sends a graceful GOAWAY.
((HTTP2Session)clientSession).goAway(GoAwayFrame.GRACEFUL, Callback.NOOP);

Assertions.assertTrue(serverGracefulGoAwayLatch.await(5, TimeUnit.SECONDS));
Assertions.assertTrue(streamResetLatch.await(5, TimeUnit.SECONDS));
Assertions.assertTrue(clientGoAwayLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS));
Assertions.assertTrue(streamFailureLatch.await(5, TimeUnit.SECONDS));
Assertions.assertTrue(serverCloseLatch.await(5, TimeUnit.SECONDS));
Assertions.assertTrue(clientCloseLatch.await(5, TimeUnit.SECONDS));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -757,16 +757,19 @@ protected IStream createLocalStream(int streamId, Promise<Stream> promise)

protected IStream createRemoteStream(int streamId)
{
// This stream has been seen the server.
// Even if the stream cannot be created because this peer is closing,
// updating the lastRemoteStreamId ensures that in-flight HEADERS and
// DATA frames can be read (and discarded) without causing an error.
updateLastRemoteStreamId(streamId);

if (!streamsState.newRemoteStream(streamId))
{
if (LOG.isDebugEnabled())
LOG.debug("Could not create remote stream #{} for {}", streamId, this);
return null;
}

// This stream has been seen the server.
updateLastRemoteStreamId(streamId);

// SPEC: exceeding max concurrent streams is treated as stream error.
while (true)
{
Expand Down Expand Up @@ -1423,8 +1426,8 @@ private class StreamsState
private long idleTime = System.nanoTime();
private CloseState closed = CloseState.NOT_CLOSED;
private Runnable closingAction;
private GoAwayFrame goAwayRecv;
private GoAwayFrame goAwaySent;
private volatile GoAwayFrame goAwayRecv;
private volatile GoAwayFrame goAwaySent;
private volatile Throwable failure;
private Thread flushing;

Expand Down Expand Up @@ -1739,7 +1742,7 @@ private boolean onIdleTimeout()
{
String reason = "idle_timeout";
boolean notify = false;
Throwable cause = null;
boolean sendGoAway = false;
try (Locker.Lock l = lock.lock())
{
switch (closed)
Expand All @@ -1752,22 +1755,24 @@ private boolean onIdleTimeout()
notify = true;
break;
}
// Timed out while waiting for closing events, abort all the streams.
// Timed out while waiting for closing events, fail all the streams.
case LOCALLY_CLOSED:
{
boolean shouldSend = goAwaySent.isGraceful();
goAwaySent = newGoAwayFrame(ErrorCode.NO_ERROR.code, reason);
if (goAwaySent.isGraceful())
{
goAwaySent = newGoAwayFrame(ErrorCode.NO_ERROR.code, reason);
sendGoAway = true;
}
closed = CloseState.CLOSING;
closingAction = shouldSend ? () -> sendGoAwayAndTerminate(goAwaySent) : () -> terminate(goAwaySent);
failure = cause = new TimeoutException("Session idle timeout expired");
failure = new TimeoutException("Session idle timeout expired");
break;
}
case REMOTELY_CLOSED:
{
goAwaySent = newGoAwayFrame(ErrorCode.NO_ERROR.code, reason);
closed = CloseState.CLOSING;
closingAction = () -> sendGoAwayAndTerminate(goAwaySent);
failure = cause = new TimeoutException("Session idle timeout expired");
failure = new TimeoutException("Session idle timeout expired");
sendGoAway = true;
break;
}
default:
Expand All @@ -1789,7 +1794,11 @@ private boolean onIdleTimeout()
return false;
}

abort(reason, cause, Callback.from(this::tryRunClosingAction));
failStreams(stream -> true, reason, true);
if (sendGoAway)
sendGoAway(goAwaySent, Callback.NOOP);
notifyFailure(HTTP2Session.this, failure, Callback.NOOP);
terminate(goAwaySent);
return false;
}

Expand Down

0 comments on commit dd9bdc7

Please sign in to comment.