-
Notifications
You must be signed in to change notification settings - Fork 642
Closed
Labels
Milestone
Description
Thus we can receive an AMQConnection shutdown signal.
For CachingConnectionFactory, it does do this thing for AMQChannel creation at line 712:
Lines 698 to 714 in ccca9d9
private Channel doCreateBareChannel(ChannelCachingConnectionProxy conn, boolean transactional) { | |
Channel channel = conn.createBareChannel(transactional); | |
if (!ConfirmType.NONE.equals(this.confirmType)) { | |
try { | |
channel.confirmSelect(); | |
} | |
catch (IOException e) { | |
logger.error("Could not configure the channel to receive publisher confirms", e); | |
} | |
} | |
if ((ConfirmType.CORRELATED.equals(this.confirmType) || this.publisherReturns) | |
&& !(channel instanceof PublisherCallbackChannelImpl)) { | |
channel = this.publisherChannelFactory.createChannel(channel, getChannelsExecutor()); | |
} | |
channel.addShutdownListener(this); | |
return channel; // NOSONAR - Simple connection throws exception | |
} |
We can do this within below codes maybe?
For example, add rabbitConnection.addShutdownListener(this);
under line 584.
Lines 580 to 626 in ccca9d9
protected final Connection createBareConnection() { | |
try { | |
String connectionName = this.connectionNameStrategy.obtainNewConnectionName(this); | |
com.rabbitmq.client.Connection rabbitConnection = connect(connectionName); | |
Connection connection = new SimpleConnection(rabbitConnection, this.closeTimeout, | |
this.connectionCreatingBackOff == null ? null : this.connectionCreatingBackOff.start()); | |
if (rabbitConnection instanceof AutorecoveringConnection auto) { | |
auto.addRecoveryListener(new RecoveryListener() { | |
@Override | |
public void handleRecoveryStarted(Recoverable recoverable) { | |
handleRecovery(recoverable); | |
} | |
@Override | |
public void handleRecovery(Recoverable recoverable) { | |
try { | |
connection.close(); | |
} | |
catch (Exception e) { | |
AbstractConnectionFactory.this.logger.error("Failed to close auto-recover connection", e); | |
} | |
} | |
}); | |
} | |
if (this.logger.isInfoEnabled()) { | |
this.logger.info("Created new connection: " + connectionName + "/" + connection); | |
} | |
if (this.recoveryListener != null && rabbitConnection instanceof AutorecoveringConnection auto) { | |
auto.addRecoveryListener(this.recoveryListener); | |
} | |
if (this.applicationEventPublisher != null) { | |
connection | |
.addBlockedListener(new ConnectionBlockedListener(connection, this.applicationEventPublisher)); | |
} | |
return connection; | |
} | |
catch (IOException | TimeoutException ex) { | |
RuntimeException converted = RabbitExceptionTranslator.convertRabbitAccessException(ex); | |
this.connectionListener.onFailed(ex); | |
throw converted; | |
} | |
} |
Currently, I do this at ConnectionListener's onCreate method instead which is not straightforward.
abstractConnectionFactory.addConnectionListener(new ConnectionListener() {
@Override
public void onCreate(Connection connection) {
// Do something.
connection.getDelegate().addShutdownListener(new ShutdownListener() {
@Override
public void shutdownCompleted(ShutdownSignalException cause) {
// Do something when the AMQConnection shutdown.
}
});
}
@Override
public void onClose(Connection connection) {
// Do something.
}
@Override
public void onShutDown(ShutdownSignalException signal) {
// Do something.
}
@Override
public void onFailed(Exception exception) {
// Do something.
}
});