Skip to content

Is AbstractConnectionFactory need to register itself as an AMQConnection's ShutdownListener when a new connection was created? #2891

@we1sper

Description

@we1sper

Thus we can receive an AMQConnection shutdown signal.
For CachingConnectionFactory, it does do this thing for AMQChannel creation at line 712:

private Channel doCreateBareChannel(ChannelCachingConnectionProxy conn, boolean transactional) {
Channel channel = conn.createBareChannel(transactional);
if (!ConfirmType.NONE.equals(this.confirmType)) {
try {
channel.confirmSelect();
}
catch (IOException e) {
logger.error("Could not configure the channel to receive publisher confirms", e);
}
}
if ((ConfirmType.CORRELATED.equals(this.confirmType) || this.publisherReturns)
&& !(channel instanceof PublisherCallbackChannelImpl)) {
channel = this.publisherChannelFactory.createChannel(channel, getChannelsExecutor());
}
channel.addShutdownListener(this);
return channel; // NOSONAR - Simple connection throws exception
}

We can do this within below codes maybe?
For example, add rabbitConnection.addShutdownListener(this); under line 584.

protected final Connection createBareConnection() {
try {
String connectionName = this.connectionNameStrategy.obtainNewConnectionName(this);
com.rabbitmq.client.Connection rabbitConnection = connect(connectionName);
Connection connection = new SimpleConnection(rabbitConnection, this.closeTimeout,
this.connectionCreatingBackOff == null ? null : this.connectionCreatingBackOff.start());
if (rabbitConnection instanceof AutorecoveringConnection auto) {
auto.addRecoveryListener(new RecoveryListener() {
@Override
public void handleRecoveryStarted(Recoverable recoverable) {
handleRecovery(recoverable);
}
@Override
public void handleRecovery(Recoverable recoverable) {
try {
connection.close();
}
catch (Exception e) {
AbstractConnectionFactory.this.logger.error("Failed to close auto-recover connection", e);
}
}
});
}
if (this.logger.isInfoEnabled()) {
this.logger.info("Created new connection: " + connectionName + "/" + connection);
}
if (this.recoveryListener != null && rabbitConnection instanceof AutorecoveringConnection auto) {
auto.addRecoveryListener(this.recoveryListener);
}
if (this.applicationEventPublisher != null) {
connection
.addBlockedListener(new ConnectionBlockedListener(connection, this.applicationEventPublisher));
}
return connection;
}
catch (IOException | TimeoutException ex) {
RuntimeException converted = RabbitExceptionTranslator.convertRabbitAccessException(ex);
this.connectionListener.onFailed(ex);
throw converted;
}
}

Currently, I do this at ConnectionListener's onCreate method instead which is not straightforward.

abstractConnectionFactory.addConnectionListener(new ConnectionListener() {
    @Override
    public void onCreate(Connection connection) {
        // Do something.
        connection.getDelegate().addShutdownListener(new ShutdownListener() {
            @Override
            public void shutdownCompleted(ShutdownSignalException cause) {
                // Do something when the AMQConnection shutdown.
            }
        });
    }

    @Override
    public void onClose(Connection connection) {
        // Do something.
    }

    @Override
    public void onShutDown(ShutdownSignalException signal) {
        // Do something.
    }

    @Override
    public void onFailed(Exception exception) {
        // Do something.
    }
 });

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions