Skip to content

Commit

Permalink
Closing reactor executor when IO pipe is closed. (#22192)
Browse files Browse the repository at this point in the history
* When IO sink is disposed of, close the corresponding executor.

* Update ReactorExecutor to use AsyncCloseable.

* Removing unused method.

* Add changelog entry.
  • Loading branch information
conniey authored Jun 10, 2021
1 parent 20668ee commit cfc40f8
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 32 deletions.
9 changes: 9 additions & 0 deletions sdk/core/azure-core-amqp/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,22 @@
## 2.1.0-beta.1 (Unreleased)

### New Features

- Exposing CbsAuthorizationType.
- Exposing ManagementNode that can perform management and metadata operations on an AMQP message broker.
- AmqpConnection, AmqpSession, AmqpSendLink, and AmqpReceiveLink extend from AsyncCloseable.
- Delivery outcomes and delivery states are added.

### Bug Fixes

- Fixed a bug where connection and sessions would not be disposed when their endpoint closed.
- Fixed a bug where ReactorExecutor did not dispose of its scheduler when "IO Sink was interrupted".

### Dependency Updates

- Upgraded `azure-core` from `1.16.0` to `1.17.0`.
- Upgraded `proton-j` from `0.33.4` to `0.33.8`.
- Upgraded `qpid-proton-j-extensions` from `1.2.3` to `1.2.4`.

## 2.0.6 (2021-05-24)
### Bug Fixes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -464,17 +464,12 @@ private synchronized void closeConnectionWork() {
}

connection.close();
handler.close();

final ArrayList<Mono<Void>> closingSessions = new ArrayList<>();
sessionMap.values().forEach(link -> closingSessions.add(link.isClosed()));

final Mono<Void> closedExecutor;
if (executor != null) {
closedExecutor = executor.isClosed();
executor.close();
} else {
closedExecutor = Mono.empty();
}
final Mono<Void> closedExecutor = executor != null ? executor.closeAsync() : Mono.empty();

// Close all the children.
final Mono<Void> closeSessionsMono = Mono.when(closingSessions)
Expand All @@ -491,7 +486,6 @@ private synchronized void closeConnectionWork() {
return false;
});

handler.close();
subscriptions.dispose();
}));

Expand Down Expand Up @@ -521,10 +515,6 @@ private synchronized Connection getOrCreateConnection() throws IOException {

final ReactorExceptionHandler reactorExceptionHandler = new ReactorExceptionHandler();

reactorProvider.getReactorDispatcher().getShutdownSignal()
.subscribe(signal -> reactorExceptionHandler.onConnectionShutdown(signal),
error -> reactorExceptionHandler.onConnectionError(error));

// Use a new single-threaded scheduler for this connection as QPID's Reactor is not thread-safe.
// Using Schedulers.single() will use the same thread for all connections in this process which
// limits the scalability of the no. of concurrent connections a single process can have.
Expand All @@ -539,6 +529,22 @@ private synchronized Connection getOrCreateConnection() throws IOException {
reactorExceptionHandler, pendingTasksDuration,
connectionOptions.getFullyQualifiedNamespace());

// To avoid inconsistent synchronization of executor, we set this field with the closeAsync method.
// It will not be kicked off until subscribed to.
final Mono<Void> executorCloseMono = executor.closeAsync();
reactorProvider.getReactorDispatcher().getShutdownSignal()
.flatMap(signal -> {
logger.info("Shutdown signal received from reactor provider.");
reactorExceptionHandler.onConnectionShutdown(signal);
return executorCloseMono;
})
.onErrorResume(error -> {
logger.info("Error received from reactor provider.", error);
reactorExceptionHandler.onConnectionError(error);
return executorCloseMono;
})
.subscribe();

executor.start();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.azure.core.amqp.AmqpShutdownSignal;
import com.azure.core.amqp.exception.AmqpErrorContext;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.util.AsyncCloseable;
import com.azure.core.util.CoreUtils;
import com.azure.core.util.logging.ClientLogger;
import org.apache.qpid.proton.engine.HandlerException;
Expand All @@ -14,7 +15,6 @@
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Scheduler;

import java.io.Closeable;
import java.nio.channels.UnresolvedAddressException;
import java.time.Duration;
import java.util.Locale;
Expand All @@ -23,7 +23,10 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

class ReactorExecutor implements Closeable {
/**
* Schedules the proton-j reactor to continuously run work.
*/
class ReactorExecutor implements AsyncCloseable {
private static final String LOG_MESSAGE = "connectionId[{}], message[{}]";

private final ClientLogger logger = new ClientLogger(ReactorExecutor.class);
Expand Down Expand Up @@ -51,7 +54,7 @@ class ReactorExecutor implements Closeable {

/**
* Starts the reactor and will begin processing any reactor events until there are no longer any left or {@link
* #close()} is called.
* #closeAsync()} is called.
*/
void start() {
if (hasStarted.getAndSet(true)) {
Expand Down Expand Up @@ -142,10 +145,6 @@ private void run() {
}
}

Mono<Void> isClosed() {
return isClosedMono.asMono();
}

/**
* Schedules the release of the current reactor after operation timeout has elapsed.
*/
Expand Down Expand Up @@ -175,26 +174,27 @@ private void scheduleCompletePendingTasks() {
}, timeout.toMillis(), TimeUnit.MILLISECONDS);
}

@Override
public void close() {
if (isDisposed.getAndSet(true)) {
return;
}

if (hasStarted.get()) {
scheduleCompletePendingTasks();
}
}

private void close(String reason) {
logger.verbose("Completing close and disposing scheduler. {}", reason);

scheduler.dispose();
isClosedMono.emitEmpty((signalType, emitResult) -> {
logger.verbose("signalType[{}] emitResult[{}]: Unable to emit close event on reactor", signalType,
emitResult);
return false;
});
exceptionHandler.onConnectionShutdown(new AmqpShutdownSignal(false, false, reason));
scheduler.dispose();
}

@Override
public Mono<Void> closeAsync() {
if (isDisposed.getAndSet(true)) {
return isClosedMono.asMono();
}

if (hasStarted.get()) {
scheduleCompletePendingTasks();
}

return isClosedMono.asMono();
}
}

0 comments on commit cfc40f8

Please sign in to comment.