Skip to content

Commit

Permalink
Restore thread starvation tests (#12395)
Browse files Browse the repository at this point in the history
For #12214 restore ee9 and ee10 thread starvation tests

* Deprecated ContentSourceCompletableFuture and added protection to method to match invocation type
* more demand invocables with invocation type
* Deprecated the CF APIs and replaced with explicit getXxx onXxx methods
* replace FutureCallback and FuturePromise usages with Blocker.* instead
* Converted new API on FormFields to use Promise
* Modified SerializedInvoker to take into account the task InvocationType.
* Added ThreadStarvationTest for all transports, to check that also HTTP/2 and HTTP/3 do not starve in case of non-blocking reads.
* Improved Core ThreadStarvationTest
* Fixed AsyncServletLongPollTest in ee9 and ee10 to match the current behavior in case of close.
* Removed Invocable.InvocableCompletableFuture
* Fixed SerializedInvoker.
* Improved ThreadStarvationTest.
* refined more DemandTask implementations
* Added InvocableType.runWithoutBlocking (name is WIP)
* replace Thread.sleep() with awaitility

---------

Signed-off-by: Ludovic Orban <lorban@bitronix.be>
Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
Co-authored-by: gregw <gregw@webtide.com>
Co-authored-by: Simone Bordet <simone.bordet@gmail.com>
  • Loading branch information
3 people authored Oct 31, 2024
1 parent 17e5820 commit 94c3f9d
Show file tree
Hide file tree
Showing 51 changed files with 2,705 additions and 846 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;

import org.eclipse.jetty.http.HttpCookie;
Expand All @@ -36,8 +35,8 @@
import org.eclipse.jetty.server.Response;
import org.eclipse.jetty.server.Session;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.CompletableTask;
import org.eclipse.jetty.util.Fields;
import org.eclipse.jetty.util.Promise;

import static java.nio.charset.StandardCharsets.UTF_8;

Expand Down Expand Up @@ -211,18 +210,19 @@ public boolean handle(Request request, Response response, Callback callback) thr
{
// Non-blocking read the request content as a String.
// Use with caution as the request content may be large.
CompletableFuture<String> completable = Content.Source.asStringAsync(request, UTF_8);

completable.whenComplete((requestContent, failure) ->
Content.Source.asString(request, UTF_8, new Promise<>()
{
if (failure == null)
@Override
public void succeeded(String result)
{
// Process the request content here.

// Implicitly respond with status code 200 and no content.
callback.succeeded();
}
else

@Override
public void failed(Throwable failure)
{
// Implicitly respond with status code 500.
callback.failed(failure);
Expand All @@ -243,18 +243,19 @@ public boolean handle(Request request, Response response, Callback callback) thr
{
// Non-blocking read the request content as a ByteBuffer.
// Use with caution as the request content may be large.
CompletableFuture<ByteBuffer> completable = Content.Source.asByteBufferAsync(request);

completable.whenComplete((requestContent, failure) ->
Content.Source.asByteBuffer(request, new Promise<>()
{
if (failure == null)
@Override
public void succeeded(ByteBuffer result)
{
// Process the request content here.

// Implicitly respond with status code 200 and no content.
callback.succeeded();
}
else

@Override
public void failed(Throwable failure)
{
// Implicitly respond with status code 500.
callback.failed(failure);
Expand Down Expand Up @@ -303,7 +304,8 @@ public class RequestContentAPIsSource extends Handler.Abstract
@Override
public boolean handle(Request request, Response response, Callback callback) throws Exception
{
CompletableTask<Void> reader = new CompletableTask<>()
// When the read is complete, complete the Handler callback.
Promise.Task<Void> reader = new Promise.Task<>(callback::succeeded, callback::failed)
{
@Override
public void run()
Expand All @@ -326,7 +328,7 @@ public void run()
if (Content.Chunk.isFailure(chunk))
{
Throwable failure = chunk.getFailure();
completeExceptionally(failure);
failed(failure);
return;
}

Expand All @@ -343,7 +345,7 @@ public void run()
// If the last chunk is read, complete normally.
if (chunk.isLast())
{
complete(null);
succeeded(null);
return;
}

Expand All @@ -353,10 +355,7 @@ public void run()
};

// Initiate the read of the request content.
reader.start();

// When the read is complete, complete the Handler callback.
callback.completeWith(reader);
reader.run();

return true;
}
Expand Down Expand Up @@ -440,7 +439,7 @@ public boolean handle(Request request, Response response, Callback callback) thr
// Replaces:
// - servletResponse.encodeRedirectURL(location)
// - servletResponse.sendRedirect(location)
String location = Request.toRedirectURI(request, "/redirect");
String location = Response.toRedirectURI(request, "/redirect");
Response.sendRedirect(request, response, callback, location);

// Sends an error response.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.util.thread.Invocable;

/**
* <p>{@link Response} represents an HTTP response and offers methods to retrieve status code, HTTP version
Expand Down Expand Up @@ -187,8 +188,8 @@ interface AsyncContentListener extends ContentSourceListener
@Override
default void onContentSource(Response response, Content.Source contentSource)
{
Runnable demandCallback = Invocable.from(Invocable.InvocationType.NON_BLOCKING, () -> onContentSource(response, contentSource));
Content.Chunk chunk = contentSource.read();
Runnable demandCallback = () -> onContentSource(response, contentSource);
if (chunk == null)
{
contentSource.demand(demandCallback);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import java.net.URI;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;

import org.eclipse.jetty.client.ContentDecoder;
Expand Down Expand Up @@ -67,15 +68,17 @@ public abstract class HttpReceiver
{
private static final Logger LOG = LoggerFactory.getLogger(HttpReceiver.class);

private final SerializedInvoker invoker = new SerializedInvoker(HttpReceiver.class);
private final HttpChannel channel;
private final SerializedInvoker invoker;
private ResponseState responseState = ResponseState.IDLE;
private NotifiableContentSource contentSource;
private Throwable failure;

protected HttpReceiver(HttpChannel channel)
{
this.channel = channel;
Executor executor = channel.getHttpDestination().getHttpClient().getExecutor();
invoker = new SerializedInvoker(HttpReceiver.class.getSimpleName(), executor);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -369,11 +369,11 @@ public Request accept(String... accepts)
StringBuilder result = new StringBuilder();
for (String accept : accepts)
{
if (result.length() > 0)
if (!result.isEmpty())
result.append(", ");
result.append(accept);
}
if (result.length() > 0)
if (!result.isEmpty())
headers.put(HttpHeader.ACCEPT, result.toString());
return this;
}
Expand Down Expand Up @@ -859,7 +859,7 @@ private void extractParams(String query)
if (parts.length > 0)
{
String name = urlDecode(parts[0]);
if (name.trim().length() == 0)
if (name.trim().isEmpty())
continue;
param(name, parts.length < 2 ? "" : urlDecode(parts[1]), true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package org.eclipse.jetty.client.transport;

import java.nio.ByteBuffer;
import java.nio.channels.ReadPendingException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
Expand All @@ -28,6 +29,7 @@
import org.eclipse.jetty.io.content.ByteBufferContentSource;
import org.eclipse.jetty.util.ExceptionUtil;
import org.eclipse.jetty.util.thread.AutoLock;
import org.eclipse.jetty.util.thread.Invocable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -231,7 +233,7 @@ private static void consume(Content.Source contentSource)
if (chunk != null)
chunk.release();
if (chunk == null || !chunk.isLast())
contentSource.demand(() -> consume(contentSource));
contentSource.demand(Invocable.from(Invocable.InvocationType.NON_BLOCKING, () -> consume(contentSource)));
}

private static void notifyContentSource(Response.ContentSourceListener listener, Response response, Content.Source contentSource)
Expand Down Expand Up @@ -490,7 +492,7 @@ private void onDemandCallback()
{
// Retry the demand on spurious wakeup to avoid passing
// a null chunk to the demultiplexer's ContentSources.
originalContentSource.demand(this::onDemandCallback);
originalContentSource.demand(Invocable.from(getInvocationType(), this::onDemandCallback));
return;
}
// Demultiplexer content sources are invoked sequentially to be consistent with other listeners,
Expand All @@ -502,6 +504,16 @@ private void onDemandCallback()
chunk.release();
}

private Invocable.InvocationType getInvocationType()
{
Invocable.InvocationType invocationType = Invocable.InvocationType.NON_BLOCKING;
for (ContentSource contentSource : contentSources)
{
invocationType = Invocable.combine(invocationType, contentSource.getInvocationType());
}
return invocationType;
}

private void registerFailure(ContentSource contentSource, Throwable failure)
{
boolean processFail = false;
Expand All @@ -524,7 +536,7 @@ else if (counters.total() == listeners.size())
if (processFail)
originalContentSource.fail(failure);
else if (processDemand)
originalContentSource.demand(this::onDemandCallback);
originalContentSource.demand(Invocable.from(getInvocationType(), this::onDemandCallback));

if (LOG.isDebugEnabled())
LOG.debug("Registered failure on {}; {}", contentSource, counters);
Expand All @@ -547,7 +559,7 @@ private void registerDemand(ContentSource contentSource)
}
}
if (processDemand)
originalContentSource.demand(this::onDemandCallback);
originalContentSource.demand(Invocable.from(getInvocationType(), this::onDemandCallback));

if (LOG.isDebugEnabled())
LOG.debug("Registered demand on {}; {}", contentSource, counters);
Expand Down Expand Up @@ -641,6 +653,11 @@ private void onDemandCallback()
}
}

private Invocable.InvocationType getInvocationType()
{
return Invocable.getInvocationType(demandCallbackRef.get());
}

@Override
public Content.Chunk read()
{
Expand All @@ -663,7 +680,7 @@ public Content.Chunk read()
public void demand(Runnable demandCallback)
{
if (!demandCallbackRef.compareAndSet(null, Objects.requireNonNull(demandCallback)))
throw new IllegalStateException();
throw new ReadPendingException();
Content.Chunk currentChunk = this.chunk;
if (LOG.isDebugEnabled())
LOG.debug("Content source #{} demand while current chunk is {}", index, currentChunk);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,20 @@ public void close()
{
Runnable task = stream.getHttpChannel().onClose();
if (task != null)
task.run();
{
ThreadPool.executeImmediately(getExecutor(), () ->
{
try
{
task.run();
}
finally
{
super.close();
}
});
return;
}
}
super.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.Utf8StringBuilder;
import org.eclipse.jetty.util.thread.AutoLock;
import org.eclipse.jetty.util.thread.Invocable;
import org.eclipse.jetty.util.thread.SerializedInvoker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -823,14 +824,8 @@ public void demand(Runnable demandCallback)
}
if (part != null)
{
part.getContentSource().demand(() ->
{
try (AutoLock ignoredAgain = lock.lock())
{
this.demand = null;
}
demandCallback.run();
});
// Inner class used instead of lambda for clarity in stack traces.
part.getContentSource().demand(new DemandTask(demandCallback));
}
else if (invoke)
{
Expand Down Expand Up @@ -887,6 +882,27 @@ private enum State
{
FIRST, MIDDLE, HEADERS, CONTENT, COMPLETE
}

private class DemandTask extends Invocable.Task.Abstract
{
private final Runnable demandCallback;

private DemandTask(Runnable demandCallback)
{
super(Invocable.getInvocationType(demandCallback));
this.demandCallback = demandCallback;
}

@Override
public void run()
{
try (AutoLock ignoredAgain = lock.lock())
{
AbstractContentSource.this.demand = null;
}
demandCallback.run();
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@

import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.util.Attributes;
import org.eclipse.jetty.util.Promise;

import static org.eclipse.jetty.http.ComplianceViolation.Listener.NOOP;

/**
* The Configuration needed to parse multipart/form-data.
* @see MultiPartFormData#from(Content.Source, Attributes, String, MultiPartConfig)
* @see MultiPartFormData#getParts(Content.Source, Attributes, String, MultiPartConfig)
* @see MultiPartFormData#onParts(Content.Source, Attributes, String, MultiPartConfig, Promise.Invocable)
*/
public class MultiPartConfig
{
Expand Down
Loading

0 comments on commit 94c3f9d

Please sign in to comment.