Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.nio.ByteBuffer;
import java.util.Objects;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Supplier;

Expand Down Expand Up @@ -63,16 +64,17 @@ public class HttpStreamOverHTTP2 implements HttpStream, HTTP2Channel.Server
{
private static final Logger LOG = LoggerFactory.getLogger(HttpStreamOverHTTP2.class);

private final AutoLock lock = new AutoLock();
private final AutoLock _lock = new AutoLock();
private final HTTP2ServerConnection _connection;
private final HttpChannel _httpChannel;
private final AtomicBoolean _recycle = new AtomicBoolean(); // Set to true when _httpChannel has been recycled or cannot be recycled anymore.
private final HTTP2Stream _stream;
private MetaData.Request _requestMetaData;
private MetaData.Response _responseMetaData;
private TunnelSupport tunnelSupport;
private TunnelSupport _tunnelSupport;
private Content.Chunk _chunk;
private Content.Chunk _trailer;
private boolean committed;
private boolean _committed;
private boolean _demand;

public HttpStreamOverHTTP2(HTTP2ServerConnection connection, HttpChannel httpChannel, HTTP2Stream stream)
Expand Down Expand Up @@ -105,7 +107,7 @@ public Runnable onRequest(HeadersFrame frame)

if (frame.isEndStream())
{
try (AutoLock ignored = lock.lock())
try (AutoLock ignored = _lock.lock())
{
_chunk = Content.Chunk.EOF;
}
Expand All @@ -114,7 +116,7 @@ public Runnable onRequest(HeadersFrame frame)
HttpFields fields = _requestMetaData.getHttpFields();

if (_requestMetaData instanceof MetaData.ConnectRequest)
tunnelSupport = new TunnelSupportOverHTTP2(_requestMetaData.getProtocol());
_tunnelSupport = new TunnelSupportOverHTTP2(_requestMetaData.getProtocol());

if (LOG.isDebugEnabled())
{
Expand Down Expand Up @@ -161,20 +163,20 @@ private Runnable onBadMessage(HttpException x)
LOG.debug("badMessage {} {}", this, x);

Throwable failure = (Throwable)x;
return _httpChannel.onFailure(failure);
return onFailure(failure, Callback.NOOP);
}

@Override
public Content.Chunk read()
{
// Tunnel requests do not have HTTP content, avoid
// returning chunks meant for a different protocol.
if (tunnelSupport != null)
if (_tunnelSupport != null)
return null;

// Check if there already is a chunk, e.g. EOF.
Content.Chunk chunk;
try (AutoLock ignored = lock.lock())
try (AutoLock ignored = _lock.lock())
{
chunk = _chunk;
_chunk = Content.Chunk.next(chunk);
Expand All @@ -190,7 +192,7 @@ public Content.Chunk read()
if (data.frame().isEndStream())
{
Content.Chunk trailer;
try (AutoLock ignored = lock.lock())
try (AutoLock ignored = _lock.lock())
{
trailer = _trailer;
if (trailer != null)
Expand All @@ -207,7 +209,7 @@ public Content.Chunk read()
// the two actions cancel each other, no need to further retain or release.
chunk = createChunk(data);

try (AutoLock ignored = lock.lock())
try (AutoLock ignored = _lock.lock())
{
_chunk = Content.Chunk.next(chunk);
}
Expand All @@ -219,7 +221,7 @@ public void demand()
{
boolean notify = false;
boolean demand = false;
try (AutoLock ignored = lock.lock())
try (AutoLock ignored = _lock.lock())
{
if (_chunk != null || _trailer != null)
notify = true;
Expand All @@ -245,7 +247,7 @@ else if (demand)
@Override
public Runnable onDataAvailable()
{
try (AutoLock ignored = lock.lock())
try (AutoLock ignored = _lock.lock())
{
_demand = false;
}
Expand All @@ -264,7 +266,7 @@ public Runnable onDataAvailable()
public Runnable onTrailer(HeadersFrame frame)
{
HttpFields trailers = frame.getMetaData().getHttpFields().asImmutable();
try (AutoLock ignored = lock.lock())
try (AutoLock ignored = _lock.lock())
{
_trailer = new Trailers(trailers);
}
Expand Down Expand Up @@ -331,7 +333,7 @@ private void sendHeaders(MetaData.Request request, MetaData.Response response, B
}
else
{
committed = true;
_committed = true;
if (last)
{
long realContentLength = BufferUtil.length(content);
Expand Down Expand Up @@ -580,7 +582,7 @@ private void sendTrailersFrame(MetaData metaData, Callback callback)
@Override
public boolean isCommitted()
{
return committed;
return _committed;
}

@Override
Expand All @@ -593,13 +595,13 @@ public boolean isIdle()
@Override
public TunnelSupport getTunnelSupport()
{
return tunnelSupport;
return _tunnelSupport;
}

@Override
public Throwable consumeAvailable()
{
if (tunnelSupport != null)
if (_tunnelSupport != null)
return null;
Throwable result = HttpStream.consumeAvailable(this, _httpChannel.getConnectionMetaData().getHttpConfiguration());
if (result != null)
Expand All @@ -615,13 +617,22 @@ public Throwable consumeAvailable()
@Override
public void onTimeout(TimeoutException timeout, BiConsumer<Runnable, Boolean> consumer)
{
boolean wasRecycled = !_recycle.compareAndSet(false, true);
if (wasRecycled)
{
consumer.accept(null, true);
return;
}
HttpChannel.IdleTimeoutTask task = _httpChannel.onIdleTimeout(timeout);
consumer.accept(task.action(), !task.handlingRequest());
}

@Override
public Runnable onFailure(Throwable failure, Callback callback)
{
boolean wasRecycled = !_recycle.compareAndSet(false, true);
if (wasRecycled)
return new FailureTask(null, callback);
boolean remote = failure instanceof EOFException;
Runnable task = remote ? _httpChannel.onRemoteFailure(new EofException(failure)) : _httpChannel.onFailure(failure);
return new FailureTask(task, callback);
Expand All @@ -643,7 +654,7 @@ public void succeeded()
}
else
{
EndPoint endPoint = tunnelSupport.getEndPoint();
EndPoint endPoint = _tunnelSupport.getEndPoint();
_stream.setAttachment(endPoint);
endPoint.upgrade(connection);
}
Expand Down Expand Up @@ -673,8 +684,13 @@ public void succeeded()
}
}
}
_httpChannel.recycle();
_connection.offerHttpChannel(_httpChannel);

boolean canRecycle = _recycle.compareAndSet(false, true);
if (canRecycle)
{
_httpChannel.recycle();
_connection.offerHttpChannel(_httpChannel);
}
}

@Override
Expand All @@ -700,6 +716,7 @@ public void failed(Throwable x)
LOG.atDebug().setCause(x).log("HTTP2 response #{}/{}: failed {}", _stream.getId(), Integer.toHexString(_stream.getSession().hashCode()), errorCode);
_stream.reset(new ResetFrame(_stream.getId(), errorCode.code), Callback.NOOP);
}
_recycle.set(true);
}

private class SendTrailers extends Callback.Nested
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ private Runnable onFailure(Throwable x, boolean remote)
try (AutoLock ignored = _lock.lock())
{
if (LOG.isDebugEnabled())
LOG.atDebug().setCause(x).log("onFailure {}", this);
LOG.atDebug().setCause(x).log("onFailure remote={} {}", remote, this);

// If the channel doesn't have a stream, then the error is ignored.
stream = _stream;
Expand Down