Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closing a context should interrupt pending virtual thread tasks of this context #5344

Merged
merged 2 commits into from
Oct 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 12 additions & 7 deletions src/main/java/io/vertx/core/Future.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;
Expand Down Expand Up @@ -668,13 +669,17 @@ static <T> Future<T> fromCompletionStage(CompletionStage<T> completionStage, Con
*/
static <T> T await(Future<T> future) {
io.vertx.core.impl.WorkerExecutor executor = io.vertx.core.impl.WorkerExecutor.unwrapWorkerExecutor();
io.vertx.core.impl.WorkerExecutor.TaskController cont = executor.current();
future.onComplete(ar -> cont.resume());
try {
cont.suspendAndAwaitResume();
} catch (InterruptedException e) {
Utils.throwAsUnchecked(e);
return null;
if (executor == null) {
throw new IllegalStateException();
}
CountDownLatch latch = executor.suspend(cont -> future.onComplete(ar -> cont.resume()));
if (latch != null) {
try {
latch.await();
} catch (InterruptedException e) {
Utils.throwAsUnchecked(e);
return null;
}
}
if (future.succeeded()) {
return future.result();
Expand Down
5 changes: 3 additions & 2 deletions src/main/java/io/vertx/core/impl/CloseFuture.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ public CloseFuture(Logger log) {
*
* @param hook the hook to add
*/
public synchronized void add(Closeable hook) {
public synchronized boolean add(Closeable hook) {
if (closed) {
throw new IllegalStateException();
return false;
}
if (hook instanceof CloseFuture) {
// Close future might be closed independently, so we optimize and remove the hooks when
Expand All @@ -72,6 +72,7 @@ public synchronized void add(Closeable hook) {
}
hooks.put(hook, this);
}
return true;
}

/**
Expand Down
40 changes: 24 additions & 16 deletions src/main/java/io/vertx/core/impl/ContextImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ static <T> void setResultHandler(ContextInternal ctx, Future<T> fut, Handler<Asy
final TaskQueue internalOrderedTasks;
final WorkerPool internalWorkerPool;
final WorkerPool workerPool;
final TaskQueue orderedTasks;
final WorkerTaskQueue orderedTasks;

public ContextImpl(VertxInternal vertx,
int localsLength,
Expand All @@ -65,7 +65,7 @@ public ContextImpl(VertxInternal vertx,
EventExecutor executor,
WorkerPool internalWorkerPool,
WorkerPool workerPool,
TaskQueue orderedTasks,
WorkerTaskQueue orderedTasks,
Deployment deployment,
CloseFuture closeFuture,
ClassLoader tccl) {
Expand All @@ -84,6 +84,14 @@ public ContextImpl(VertxInternal vertx,
this.internalOrderedTasks = new TaskQueue();
}

public Future<Void> close() {
if (closeFuture == owner.closeFuture()) {
return Future.future(p -> orderedTasks.shutdown(eventLoop, p));
} else {
return closeFuture.close().eventually(() -> Future.<Void>future(p -> orderedTasks.shutdown(eventLoop, p)));
}
}

public Deployment getDeployment() {
return deployment;
}
Expand Down Expand Up @@ -201,29 +209,29 @@ private static <T> Future<T> internalExecuteBlocking(ContextInternal context, Ha
Object queueMetric = metrics != null ? metrics.submitted() : null;
Promise<T> promise = context.promise();
Future<T> fut = promise.future();
try {
Runnable command = () -> {
Object execMetric = null;
if (metrics != null) {
execMetric = metrics.begin(queueMetric);
}
WorkerTask task = new WorkerTask(metrics, queueMetric) {
@Override
protected void execute() {
context.dispatch(promise, blockingCodeHandler);
}
@Override
void reject() {
if (metrics != null) {
metrics.end(execMetric, fut.succeeded());
metrics.rejected(queueMetric);
}
};
promise.fail(new RejectedExecutionException());
}
};
try {
Executor exec = workerPool.executor();
if (queue != null) {
queue.execute(command, exec);
queue.execute(task, exec);
} else {
exec.execute(command);
exec.execute(task);
}
} catch (RejectedExecutionException e) {
// Pool is already shut down
if (metrics != null) {
metrics.rejected(queueMetric);
}
throw e;
task.reject();
}
return fut;
}
Expand Down
7 changes: 7 additions & 0 deletions src/main/java/io/vertx/core/impl/ContextInternal.java
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,13 @@ default <E> void dispatch(E event, Handler<E> handler) {
}
}

/**
* Close this context, cleanup close future hooks then dispose pending ordered task queue.
*
* @return a future signalling close completion
*/
Future<Void> close();

/**
* Begin the execution of a task on this context.
* <p>
Expand Down
8 changes: 3 additions & 5 deletions src/main/java/io/vertx/core/impl/DeploymentManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ private Future<Deployment> doDeploy(String identifier,
context = vertx.createVirtualThreadContext(deployment, closeFuture, tccl);
break;
}
VerticleHolder holder = new VerticleHolder(verticle, context, closeFuture);
VerticleHolder holder = new VerticleHolder(verticle, context);
deployment.addVerticle(holder);
context.runOnContext(v -> {
try {
Expand Down Expand Up @@ -241,16 +241,14 @@ static class VerticleHolder {

final Verticle verticle;
final ContextImpl context;
final CloseFuture closeFuture;

VerticleHolder(Verticle verticle, ContextImpl context, CloseFuture closeFuture) {
VerticleHolder(Verticle verticle, ContextImpl context) {
this.verticle = verticle;
this.context = context;
this.closeFuture = closeFuture;
}

void close(Handler<AsyncResult<Void>> completionHandler) {
closeFuture.close().onComplete(completionHandler);
context.close().onComplete(completionHandler);
}
}

Expand Down
5 changes: 5 additions & 0 deletions src/main/java/io/vertx/core/impl/DuplicatedContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -193,4 +193,9 @@ public ContextInternal unwrap() {
public boolean isDuplicate() {
return true;
}

@Override
public Future<Void> close() {
return Future.succeededFuture();
}
}
Loading
Loading