Skip to content

Commit

Permalink
Improvements to HttpSender. (#12111)
Browse files Browse the repository at this point in the history
* Changed ContentSender demand from iterate()+IDLE to succeeded()+SCHEDULED.
  This ensures that there is no re-iteration in case a 100 Continue response arrives.
  This, in turn, avoids that the demand is performed multiple times, causing ISE to be thrown.
* Changed the 100 Continue action of the proxy Servlet/Handler, that provides the request content, to be executed by the HttpSender, rather than by the HttpReceiver.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
  • Loading branch information
sbordet authored Aug 2, 2024
1 parent 58cfe77 commit fa143fa
Show file tree
Hide file tree
Showing 11 changed files with 106 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,9 @@ public Response.Listener getResponseListener()
return new ContinueListener();
}

protected void onContinue(Request request)
protected Runnable onContinue(Request request)
{
return null;
}

protected class ContinueListener extends BufferingResponseListener
Expand All @@ -79,8 +80,10 @@ public void onSuccess(Response response)
{
// All good, continue.
exchange.resetResponse();
exchange.proceed(null);
onContinue(request);
Runnable proceedAction = onContinue(request);
// Pass the proceed action to be executed
// by the sender, not here by the receiver.
exchange.proceed(proceedAction, null);
}
else
{
Expand All @@ -90,7 +93,7 @@ public void onSuccess(Response response)
ResponseListeners listeners = exchange.getResponseListeners();
HttpContentResponse contentResponse = new HttpContentResponse(response, getContent(), getMediaType(), getEncoding());
listeners.emitSuccess(contentResponse);
exchange.proceed(new HttpRequestException("Expectation failed", request));
exchange.proceed(null, new HttpRequestException("Expectation failed", request));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,9 @@ public void send()

public abstract void release();

public void proceed(HttpExchange exchange, Throwable failure)
public void proceed(HttpExchange exchange, Runnable proceedAction, Throwable failure)
{
getHttpSender().proceed(exchange, failure);
getHttpSender().proceed(exchange, proceedAction, failure);
}

public void abort(HttpExchange exchange, Throwable requestFailure, Throwable responseFailure, Promise<Boolean> promise)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,11 +317,11 @@ public void resetResponse()
}
}

public void proceed(Throwable failure)
public void proceed(Runnable proceedAction, Throwable failure)
{
HttpChannel channel = getHttpChannel();
if (channel != null)
channel.proceed(this, failure);
channel.proceed(this, proceedAction, failure);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,12 +317,15 @@ protected void dispose()
{
}

public void proceed(HttpExchange exchange, Throwable failure)
public void proceed(HttpExchange exchange, Runnable proceedAction, Throwable failure)
{
// Received a 100 Continue, although Expect header was not sent.
// Received a 100 Continue, although the Expect header was not sent.
if (!contentSender.expect100)
return;

// Write the fields in this order, since the reader of
// these fields will read them in the opposite order.
contentSender.proceedAction = proceedAction;
contentSender.expect100 = false;
if (failure == null)
{
Expand Down Expand Up @@ -462,32 +465,39 @@ private enum RequestState

private class ContentSender extends IteratingCallback
{
private HttpExchange exchange;
// Fields that are set externally.
private volatile HttpExchange exchange;
private volatile Runnable proceedAction;
private volatile boolean expect100;
// Fields only used internally.
private Content.Chunk chunk;
private ByteBuffer contentBuffer;
private boolean expect100;
private boolean committed;
private boolean success;
private boolean complete;
private Promise<Boolean> abort;
private boolean demanded;

@Override
public boolean reset()
{
exchange = null;
proceedAction = null;
expect100 = false;
chunk = null;
contentBuffer = null;
expect100 = false;
committed = false;
success = false;
complete = false;
abort = null;
demanded = false;
return super.reset();
}

@Override
protected Action process() throws Throwable
{
HttpExchange exchange = this.exchange;
if (complete)
{
if (success)
Expand All @@ -498,15 +508,26 @@ protected Action process() throws Throwable
HttpRequest request = exchange.getRequest();
Content.Source content = request.getBody();

boolean expect100 = this.expect100;
if (expect100)
{
// If the request was sent already, wait for
// the 100 response before sending the content.
if (committed)
return Action.IDLE;
else
chunk = null;
// Do not send any content yet.
chunk = null;
}
else
{
// Run the proceed action first, which likely will provide
// content after having received the 100 Continue response.
Runnable action = proceedAction;
proceedAction = null;
if (action != null)
action.run();

// Read the request content.
chunk = content.read();
}
if (LOG.isDebugEnabled())
Expand All @@ -516,11 +537,14 @@ protected Action process() throws Throwable
{
if (committed)
{
content.demand(this::iterate);
return Action.IDLE;
// No content after the headers, demand.
demanded = true;
content.demand(this::succeeded);
return Action.SCHEDULED;
}
else
{
// Normalize to avoid null checks.
chunk = Content.Chunk.EMPTY;
}
}
Expand All @@ -545,49 +569,50 @@ protected Action process() throws Throwable
@Override
protected void onSuccess()
{
boolean proceed = true;
if (committed)
if (demanded)
{
if (contentBuffer.hasRemaining())
proceed = someToContent(exchange, contentBuffer);
// Content is now available, reset
// the demand and iterate again.
demanded = false;
}
else
{
committed = true;
if (headersToCommit(exchange))
boolean proceed = true;
if (committed)
{
// Was any content sent while committing?
if (contentBuffer.hasRemaining())
proceed = someToContent(exchange, contentBuffer);
}
else
{
proceed = false;
committed = true;
proceed = headersToCommit(exchange);
if (proceed)
{
// Was any content sent while committing?
if (contentBuffer.hasRemaining())
proceed = someToContent(exchange, contentBuffer);
}
}
}

boolean last = chunk.isLast();
chunk.release();
chunk = null;
boolean last = chunk.isLast();
chunk.release();
chunk = null;

if (proceed)
{
if (last)
if (proceed)
{
success = true;
complete = true;
if (last)
{
success = true;
complete = true;
}
}
else if (expect100)
else
{
if (LOG.isDebugEnabled())
LOG.debug("Expecting 100 Continue for {}", exchange.getRequest());
// There was some concurrent error, terminate.
complete = true;
}
}
else
{
// There was some concurrent error, terminate.
complete = true;
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -444,12 +444,11 @@ protected void onServerToProxyResponseFailure(Request clientToProxyRequest, org.
Response.writeError(clientToProxyRequest, proxyToClientResponse, callback, status);
}

protected void onServerToProxyResponse100Continue(Request clientToProxyRequest, org.eclipse.jetty.client.Request proxyToServerRequest)
protected Runnable onServerToProxyResponse100Continue(Request clientToProxyRequest, org.eclipse.jetty.client.Request proxyToServerRequest)
{
if (LOG.isDebugEnabled())
LOG.debug("{} P2C 100 continue response", requestId(clientToProxyRequest));
Runnable action = (Runnable)proxyToServerRequest.getAttributes().get(PROXY_TO_SERVER_CONTINUE_ATTRIBUTE);
action.run();
return (Runnable)proxyToServerRequest.getAttributes().get(PROXY_TO_SERVER_CONTINUE_ATTRIBUTE);
}

protected void onServerToProxyResponse102Processing(Request clientToProxyRequest, org.eclipse.jetty.client.Request proxyToServerRequest, HttpFields serverToProxyResponseHeaders, Response proxyToClientResponse)
Expand Down Expand Up @@ -776,13 +775,12 @@ public InvocationType getInvocationType()
private class ProxyContinueProtocolHandler extends ContinueProtocolHandler
{
@Override
protected void onContinue(org.eclipse.jetty.client.Request proxyToServerRequest)
protected Runnable onContinue(org.eclipse.jetty.client.Request proxyToServerRequest)
{
super.onContinue(proxyToServerRequest);
var clientToProxyRequest = (Request)proxyToServerRequest.getAttributes().get(CLIENT_TO_PROXY_REQUEST_ATTRIBUTE);
if (LOG.isDebugEnabled())
LOG.debug("{} S2P received 100 Continue", requestId(clientToProxyRequest));
onServerToProxyResponse100Continue(clientToProxyRequest, proxyToServerRequest);
return onServerToProxyResponse100Continue(clientToProxyRequest, proxyToServerRequest);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -763,10 +763,9 @@ protected void sendProxyResponseError(HttpServletRequest clientRequest, HttpServ
}
}

protected void onContinue(HttpServletRequest clientRequest, Request proxyRequest)
protected Runnable onContinue(HttpServletRequest clientRequest, Request proxyRequest)
{
if (_log.isDebugEnabled())
_log.debug("{} handling 100 Continue", getRequestId(clientRequest));
return null;
}

/**
Expand Down Expand Up @@ -851,10 +850,10 @@ protected String rewriteTarget(HttpServletRequest request)
class ProxyContinueProtocolHandler extends ContinueProtocolHandler
{
@Override
protected void onContinue(Request request)
protected Runnable onContinue(Request request)
{
HttpServletRequest clientRequest = (HttpServletRequest)request.getAttributes().get(CLIENT_REQUEST_ATTRIBUTE);
AbstractProxyServlet.this.onContinue(clientRequest, request);
return AbstractProxyServlet.this.onContinue(clientRequest, request);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -171,11 +171,11 @@ protected ContentTransformer newServerResponseContentTransformer(HttpServletRequ
}

@Override
protected void onContinue(HttpServletRequest clientRequest, Request proxyRequest)
protected Runnable onContinue(HttpServletRequest clientRequest, Request proxyRequest)
{
super.onContinue(clientRequest, proxyRequest);
Runnable action = (Runnable)proxyRequest.getAttributes().get(CONTINUE_ACTION_ATTRIBUTE);
action.run();
if (_log.isDebugEnabled())
_log.debug("{} handling 100 Continue", getRequestId(clientRequest));
return (Runnable)proxyRequest.getAttributes().get(CONTINUE_ACTION_ATTRIBUTE);
}

private void transform(ContentTransformer transformer, ByteBuffer input, boolean finished, List<ByteBuffer> output) throws IOException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;

import jakarta.servlet.AsyncContext;
Expand Down Expand Up @@ -144,12 +143,11 @@ protected void onResponseContent(HttpServletRequest request, HttpServletResponse
}

@Override
protected void onContinue(HttpServletRequest clientRequest, Request proxyRequest)
protected Runnable onContinue(HttpServletRequest clientRequest, Request proxyRequest)
{
super.onContinue(clientRequest, proxyRequest);
Runnable action = (Runnable)proxyRequest.getAttributes().get(CONTINUE_ACTION_ATTRIBUTE);
Executor executor = getHttpClient().getExecutor();
executor.execute(action);
if (_log.isDebugEnabled())
_log.debug("{} handling 100 Continue", getRequestId(clientRequest));
return (Runnable)proxyRequest.getAttributes().get(CONTINUE_ACTION_ATTRIBUTE);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -768,10 +768,17 @@ protected void sendProxyResponseError(HttpServletRequest clientRequest, HttpServ
}
}

protected void onContinue(HttpServletRequest clientRequest, Request proxyRequest)
/**
* <p>Returns the action to perform when the proxy receives
* a 100 Continue response from the server.</p>
*
* @param clientRequest the client request
* @param proxyRequest the request being proxied
* @return the 100 Continue action to run
*/
protected Runnable onContinue(HttpServletRequest clientRequest, Request proxyRequest)
{
if (_log.isDebugEnabled())
_log.debug("{} handling 100 Continue", getRequestId(clientRequest));
return null;
}

/**
Expand Down Expand Up @@ -856,10 +863,10 @@ protected String rewriteTarget(HttpServletRequest request)
class ProxyContinueProtocolHandler extends ContinueProtocolHandler
{
@Override
protected void onContinue(Request request)
protected Runnable onContinue(Request request)
{
HttpServletRequest clientRequest = (HttpServletRequest)request.getAttributes().get(CLIENT_REQUEST_ATTRIBUTE);
AbstractProxyServlet.this.onContinue(clientRequest, request);
return AbstractProxyServlet.this.onContinue(clientRequest, request);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -171,11 +171,11 @@ protected ContentTransformer newServerResponseContentTransformer(HttpServletRequ
}

@Override
protected void onContinue(HttpServletRequest clientRequest, Request proxyRequest)
protected Runnable onContinue(HttpServletRequest clientRequest, Request proxyRequest)
{
super.onContinue(clientRequest, proxyRequest);
Runnable action = (Runnable)proxyRequest.getAttributes().get(CONTINUE_ACTION_ATTRIBUTE);
action.run();
if (_log.isDebugEnabled())
_log.debug("{} handling 100 Continue", getRequestId(clientRequest));
return (Runnable)proxyRequest.getAttributes().get(CONTINUE_ACTION_ATTRIBUTE);
}

private void transform(ContentTransformer transformer, ByteBuffer input, boolean finished, List<ByteBuffer> output) throws IOException
Expand Down
Loading

0 comments on commit fa143fa

Please sign in to comment.