Skip to content

Commit

Permalink
When IO sink is disposed of, close the corresponding executor.
Browse files Browse the repository at this point in the history
  • Loading branch information
conniey committed Jun 10, 2021
1 parent 73a2b5b commit c117dca
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -468,13 +468,7 @@ private synchronized void closeConnectionWork() {
final ArrayList<Mono<Void>> closingSessions = new ArrayList<>();
sessionMap.values().forEach(link -> closingSessions.add(link.isClosed()));

final Mono<Void> closedExecutor;
if (executor != null) {
closedExecutor = executor.isClosed();
executor.close();
} else {
closedExecutor = Mono.empty();
}
final Mono<Void> closedExecutor = executor != null ? executor.closeAsync() : Mono.empty();

// Close all the children.
final Mono<Void> closeSessionsMono = Mono.when(closingSessions)
Expand Down Expand Up @@ -521,10 +515,6 @@ private synchronized Connection getOrCreateConnection() throws IOException {

final ReactorExceptionHandler reactorExceptionHandler = new ReactorExceptionHandler();

reactorProvider.getReactorDispatcher().getShutdownSignal()
.subscribe(signal -> reactorExceptionHandler.onConnectionShutdown(signal),
error -> reactorExceptionHandler.onConnectionError(error));

// Use a new single-threaded scheduler for this connection as QPID's Reactor is not thread-safe.
// Using Schedulers.single() will use the same thread for all connections in this process which
// limits the scalability of the no. of concurrent connections a single process can have.
Expand All @@ -539,6 +529,19 @@ private synchronized Connection getOrCreateConnection() throws IOException {
reactorExceptionHandler, pendingTasksDuration,
connectionOptions.getFullyQualifiedNamespace());

reactorProvider.getReactorDispatcher().getShutdownSignal()
.flatMap(signal -> {
logger.info("Shutdown signal received from reactor provider.");
reactorExceptionHandler.onConnectionShutdown(signal);
return executor.closeAsync();
})
.onErrorResume(error -> {
logger.info("Error received from reactor provider.", error);
reactorExceptionHandler.onConnectionError(error);
return executor.closeAsync();
})
.subscribe();

executor.start();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.azure.core.amqp.AmqpShutdownSignal;
import com.azure.core.amqp.exception.AmqpErrorContext;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.util.AsyncCloseable;
import com.azure.core.util.CoreUtils;
import com.azure.core.util.logging.ClientLogger;
import org.apache.qpid.proton.engine.HandlerException;
Expand All @@ -23,7 +24,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

class ReactorExecutor implements Closeable {
class ReactorExecutor implements AsyncCloseable {
private static final String LOG_MESSAGE = "connectionId[{}], message[{}]";

private final ClientLogger logger = new ClientLogger(ReactorExecutor.class);
Expand Down Expand Up @@ -64,6 +65,18 @@ void start() {
scheduler.schedule(this::run);
}

/**
* Stops the reactor and schedules any pending tasks.
*/
void stop() {
if (isDisposed.getAndSet(true)) {
return;
}

logger.info("Stop was called. Disposing of ReactorExecutor.");

}

/**
* Worker loop that tries to process events in the reactor. If there are pending items to process, will reschedule
* the run() method again.
Expand Down Expand Up @@ -175,26 +188,27 @@ private void scheduleCompletePendingTasks() {
}, timeout.toMillis(), TimeUnit.MILLISECONDS);
}

@Override
public void close() {
if (isDisposed.getAndSet(true)) {
return;
}

if (hasStarted.get()) {
scheduleCompletePendingTasks();
}
}

private void close(String reason) {
logger.verbose("Completing close and disposing scheduler. {}", reason);

scheduler.dispose();
isClosedMono.emitEmpty((signalType, emitResult) -> {
logger.verbose("signalType[{}] emitResult[{}]: Unable to emit close event on reactor", signalType,
emitResult);
return false;
});
exceptionHandler.onConnectionShutdown(new AmqpShutdownSignal(false, false, reason));
scheduler.dispose();
}

@Override
public Mono<Void> closeAsync() {
if (isDisposed.getAndSet(true)) {
return isClosedMono.asMono();
}

if (hasStarted.get()) {
scheduleCompletePendingTasks();
}

return isClosedMono.asMono();
}
}

0 comments on commit c117dca

Please sign in to comment.