Skip to content

Commit

Permalink
Fixes #12266 - InvocationType improvements and cleanups.
Browse files Browse the repository at this point in the history
* Removed usages of `AbstractConnection.getInvocationType()`.
* Changed HTTP server-side Connection implementations to use `AbstractConnection.fillInterested(Callback)` with a callback that specifies the `InvocationType`, derived from the `Server`, which derives it from the `Handler` chain.
* Changed client-side receivers to use `AbstractConnection.fillInterested(Callback)` with a callback that specifies the `InvocationType`, derived from the `HttpClientTransport`.
* Introduced `HttpClientTransport.getInvocationType(Connection)`, so that client applications can specify whether they block or not.
* Made sure client-side HTTP/2 and HTTP/3 return tasks with the proper `InvocationType` to be run by the `ExecutionStrategy` when calling application code.
* HTTP3StreamConnection now uses an `EITHER` fillable callback to possibly process streams in parallel.
* `QuicStreamEndPoint` now uses a task to invoke `FillInterest.fillable()`, rather than invoking it directly, therefore honoring the `InvocationType` of callback set by the `Connection` associated with the `QuicStreamEndPoint`.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
  • Loading branch information
sbordet committed Sep 13, 2024
1 parent daa4923 commit e911c23
Show file tree
Hide file tree
Showing 23 changed files with 468 additions and 142 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.eclipse.jetty.client.transport.HttpDestination;
import org.eclipse.jetty.io.ClientConnectionFactory;
import org.eclipse.jetty.util.thread.Invocable;

/**
* {@link HttpClientTransport} represents what transport implementations should provide
Expand Down Expand Up @@ -100,4 +101,9 @@ public default void connect(SocketAddress address, Map<String, Object> context)
* @param factory the factory for ConnectionPool instances
*/
public void setConnectionPoolFactory(ConnectionPool.Factory factory);

public default Invocable.InvocationType getInvocationType(Connection connection)
{
return Invocable.InvocationType.BLOCKING;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -508,12 +508,6 @@ public void failed(Throwable x)
promise.failed(x);
}

@Override
public InvocationType getInvocationType()
{
return InvocationType.NON_BLOCKING;
}

@Override
public void onFillable()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.Attachable;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.thread.Sweeper;
import org.slf4j.Logger;
Expand All @@ -55,6 +56,7 @@ public class HttpConnectionOverHTTP extends AbstractConnection implements IConne
{
private static final Logger LOG = LoggerFactory.getLogger(HttpConnectionOverHTTP.class);

private final Callback fillableCallback = new FillableCallback();
private final AtomicBoolean closed = new AtomicBoolean();
private final AtomicInteger sweeps = new AtomicInteger();
private final Promise<Connection> promise;
Expand Down Expand Up @@ -188,7 +190,7 @@ public void setInitialize(boolean initialize)
public void onOpen()
{
super.onOpen();
fillInterested();
setFillInterest();
boolean initialize = isInitialize();
if (initialize)
{
Expand All @@ -210,6 +212,11 @@ public void onOpen()
}
}

void setFillInterest()
{
fillInterested(fillableCallback);
}

@Override
public boolean isClosed()
{
Expand Down Expand Up @@ -432,4 +439,26 @@ public String toString()
return HttpConnectionOverHTTP.this.toString();
}
}

private class FillableCallback implements Callback
{
@Override
public void succeeded()
{
onFillable();
}

@Override
public void failed(Throwable x)
{
onFillInterestedFailed(x);
}

@Override
public InvocationType getInvocationType()
{
HttpClientTransport transport = getHttpDestination().getHttpClient().getTransport();
return transport.getInvocationType(delegate);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ protected void fillInterested()
{
if (LOG.isDebugEnabled())
LOG.debug("Registering as fill interested in {}", this);
getHttpConnection().fillInterested();
getHttpConnection().setFillInterest();
}

private void shutdown()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.eclipse.jetty.io.RetainableByteBuffer;
import org.eclipse.jetty.util.Attachable;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Promise;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -56,6 +57,7 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements IConne
{
private static final Logger LOG = LoggerFactory.getLogger(HttpConnectionOverFCGI.class);

private final Callback fillableCallback = new FillableCallback();
private final ByteBufferPool networkByteBufferPool;
private final AtomicInteger requests = new AtomicInteger();
private final AtomicBoolean closed = new AtomicBoolean();
Expand Down Expand Up @@ -128,10 +130,15 @@ public SendFailure send(HttpExchange exchange)
public void onOpen()
{
super.onOpen();
fillInterested();
setFillInterest();
promise.succeeded(this);
}

void setFillInterest()
{
fillInterested(fillableCallback);
}

@Override
public void onFillable()
{
Expand Down Expand Up @@ -492,4 +499,25 @@ private enum State
{
STATUS, HEADERS, CONTENT, COMPLETE
}

private class FillableCallback implements Callback
{
@Override
public void succeeded()
{
onFillable();
}

@Override
public void failed(Throwable x)
{
onFillInterestedFailed(x);
}

@Override
public InvocationType getInvocationType()
{
return getHttpDestination().getHttpClient().getTransport().getInvocationType(delegate);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ void receive()
HttpConnectionOverFCGI httpConnection = getHttpChannel().getHttpConnection();
boolean setFillInterest = httpConnection.parseAndFill(true);
if (!hasContent() && setFillInterest)
httpConnection.fillInterested();
fillInterested(httpConnection);
}
else
{
Expand Down Expand Up @@ -86,7 +86,7 @@ public Content.Chunk read(boolean fillInterestIfNeeded)
if (chunk != null)
return chunk;
if (needFillInterest && fillInterestIfNeeded)
httpConnection.fillInterested();
fillInterested(httpConnection);
return null;
}

Expand Down Expand Up @@ -138,7 +138,12 @@ private void receiveNext()
HttpConnectionOverFCGI httpConnection = getHttpChannel().getHttpConnection();
boolean setFillInterest = httpConnection.parseAndFill(true);
if (!hasContent() && setFillInterest)
httpConnection.fillInterested();
fillInterested(httpConnection);
}

private void fillInterested(HttpConnectionOverFCGI httpConnection)
{
httpConnection.setFillInterest();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.eclipse.jetty.server.HttpChannel;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.util.Attributes;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.StringUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -42,6 +43,7 @@ public class ServerFCGIConnection extends AbstractMetaDataConnection implements
{
private static final Logger LOG = LoggerFactory.getLogger(ServerFCGIConnection.class);

private final Callback fillableCallback = new FillableCallback();
private final HttpChannel.Factory httpChannelFactory = new HttpChannel.DefaultFactory();
private final Attributes attributes = new Lazy();
private final Connector connector;
Expand Down Expand Up @@ -160,7 +162,7 @@ public void clearAttributes()
public void onOpen()
{
super.onOpen();
fillInterested();
setFillInterest();
}

@Override
Expand Down Expand Up @@ -188,7 +190,7 @@ public void onFillable()
else if (read == 0)
{
releaseInputBuffer();
fillInterested();
setFillInterest();
return;
}
else
Expand Down Expand Up @@ -305,11 +307,16 @@ void onCompleted(Throwable failure)
{
releaseInputBuffer();
if (failure == null)
fillInterested();
setFillInterest();
else
getFlusher().shutdown();
}

private void setFillInterest()
{
fillInterested(fillableCallback);
}

@Override
public boolean onIdleExpired(TimeoutException timeoutException)
{
Expand Down Expand Up @@ -419,4 +426,25 @@ public void close()
}
super.close();
}

private class FillableCallback implements Callback
{
@Override
public void succeeded()
{
onFillable();
}

@Override
public void failed(Throwable x)
{
onFillInterestedFailed(x);
}

@Override
public InvocationType getInvocationType()
{
return getConnector().getServer().getInvocationType();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public Runnable onTimeout(TimeoutException timeout, Promise<Boolean> promise)
promise.succeeded(true);
return null;
}
return new Invocable.ReadyTask(Invocable.InvocationType.NON_BLOCKING, () ->
return new Invocable.ReadyTask(getInvocationType(), () ->
{
boolean expire = connection.onIdleExpired(timeout);
if (expire)
Expand All @@ -78,7 +78,7 @@ public Runnable onTimeout(TimeoutException timeout, Promise<Boolean> promise)
@Override
public Runnable onFailure(Throwable failure, Callback callback)
{
return new Invocable.ReadyTask(Invocable.InvocationType.NON_BLOCKING, () ->
return new Invocable.ReadyTask(getInvocationType(), () ->
{
processFailure(failure);
close(failure);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ public void onNewStream(Stream stream)
@Override
public void onHeaders(Stream stream, HeadersFrame frame)
{
receiver.onHeaders(stream, frame);
offerTask(receiver.onHeaders(stream, frame));
}

@Override
Expand All @@ -197,28 +197,33 @@ public Stream.Listener onPush(Stream stream, PushPromiseFrame frame)
public void onDataAvailable(Stream stream)
{
HTTP2Channel.Client channel = (HTTP2Channel.Client)((HTTP2Stream)stream).getAttachment();
connection.offerTask(channel.onDataAvailable(), false);
offerTask(channel.onDataAvailable());
}

@Override
public void onReset(Stream stream, ResetFrame frame, Callback callback)
{
HTTP2Channel.Client channel = (HTTP2Channel.Client)((HTTP2Stream)stream).getAttachment();
connection.offerTask(channel.onReset(frame, callback), false);
offerTask(channel.onReset(frame, callback));
}

@Override
public void onIdleTimeout(Stream stream, TimeoutException x, Promise<Boolean> promise)
{
HTTP2Channel.Client channel = (HTTP2Channel.Client)((HTTP2Stream)stream).getAttachment();
connection.offerTask(channel.onTimeout(x, promise), false);
offerTask(channel.onTimeout(x, promise));
}

@Override
public void onFailure(Stream stream, int error, String reason, Throwable failure, Callback callback)
{
HTTP2Channel.Client channel = (HTTP2Channel.Client)((HTTP2Stream)stream).getAttachment();
connection.offerTask(channel.onFailure(failure, callback), false);
offerTask(channel.onFailure(failure, callback));
}

private void offerTask(Runnable task)
{
connection.offerTask(task, false);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.thread.Invocable;
import org.eclipse.jetty.util.thread.Sweeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -293,6 +294,11 @@ void offerTask(Runnable task, boolean dispatch)
connection.offerTask(task, dispatch);
}

Invocable.InvocationType getInvocationType()
{
return getHttpClient().getTransport().getInvocationType(this);
}

@Override
public String toString()
{
Expand Down
Loading

0 comments on commit e911c23

Please sign in to comment.