Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
## 2.1.1
- Implemented the changes to log the eventId and HTTPStatus while the level is INFO.
- Implemented the changes to print the user information while the log level is INFO.
- Made changes to resolve extra RabbitMQ connection issue.

## 2.1.0
- Implemented new routing key template for Sepia.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,11 @@
import org.slf4j.LoggerFactory;

import com.ericsson.eiffel.remrem.publish.config.PropertiesConfig;
import com.ericsson.eiffel.remrem.publish.exception.RemRemPublishException;
import com.ericsson.eiffel.remrem.publish.exception.NackException;
import com.ericsson.eiffel.remrem.publish.exception.RemRemPublishException;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.AlreadyClosedException;
import com.rabbitmq.client.BlockedListener;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;
Expand Down Expand Up @@ -296,15 +298,36 @@ public void createRabbitMqConnection() throws RemRemPublishException {
}
factory.setConnectionTimeout(tcpTimeOut);
rabbitConnection = factory.newConnection();
rabbitConnection.addShutdownListener(new ShutdownListener() {
@Override
public void shutdownCompleted(ShutdownSignalException cause) {
log.debug("Connection Shutdown completed " + cause.getMessage());
try {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the section is useless. The connection is always closed at that point of time...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have tested by removing this piece of code than I'm seeing more connections..,

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, very interesting... In that case we should change throwable to IOException or AlreadyClosedException or something what is thrown. Next, e.printStackTrace() should be removed---in fact we don't need any code there. Instead add a comment explainig why we have this try/catch block and that should not be removed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure Roman

rabbitConnection.close();
} catch (AlreadyClosedException | IOException e) {
// This is intentionally added, if we do not call the close function, connection is not closed properly
// and the connections count is getting increased..
}
}
});

rabbitConnection.addBlockedListener(new BlockedListener() {
public void handleBlocked(String reason) throws IOException {
// Connection is now blocked
log.debug("Connection is blocked " + reason);
}

public void handleUnblocked() throws IOException {
// Connection is now unblocked
}
});
log.info("Connected to RabbitMQ.");
rabbitChannels = new ArrayList<>();
if(channelsCount == null || channelsCount == 0 ) {
channelsCount = DEFAULT_CHANNEL_COUNT;
}
for (int i = 0; i < channelsCount; i++) {
Channel channel = rabbitConnection.createChannel();
channel.confirmSelect();
rabbitChannels.add(channel);
createNewChannel();
}
} catch (IOException | TimeoutException e) {
log.error(e.getMessage(), e);
Expand All @@ -313,6 +336,32 @@ public void createRabbitMqConnection() throws RemRemPublishException {
}
}

/**
* This method is used to create Rabbitmq channels
* @throws IOException
*/
private Channel createNewChannel() throws IOException {
Channel channel = rabbitConnection.createChannel();
channel.addShutdownListener(new ShutdownListener() {
public void shutdownCompleted(ShutdownSignalException cause) {
// Beware that proper synchronization is needed here
if (cause.isInitiatedByApplication()) {
log.debug("Shutdown is initiated by application. Ignoring it.");
} else {
log.error("Shutdown is NOT initiated by application.");
log.error(cause.getMessage());
boolean cliMode = Boolean.getBoolean(PropertiesConfig.CLI_MODE);
if (cliMode) {
System.exit(-3);
}
}
}
});
channel.confirmSelect();
rabbitChannels.add(channel);
return channel;
}

private void initCli() {
setValues();
}
Expand Down Expand Up @@ -517,21 +566,7 @@ public void send(String routingKey, String msg, String eventId)
throws IOException, NackException, TimeoutException, RemRemPublishException, IllegalArgumentException {
Channel channel = giveMeRandomChannel();
checkAndCreateExchangeIfNeeded();
channel.addShutdownListener(new ShutdownListener() {
public void shutdownCompleted(ShutdownSignalException cause) {
// Beware that proper synchronization is needed here
if (cause.isInitiatedByApplication()) {
log.debug("Shutdown is initiated by application. Ignoring it.");
} else {
log.error("Shutdown is NOT initiated by application.");
log.error(cause.getMessage());
boolean cliMode = Boolean.getBoolean(PropertiesConfig.CLI_MODE);
if (cliMode) {
System.exit(-3);
}
}
}
});

BasicProperties msgProps = usePersitance ? PERSISTENT_BASIC_APPLICATION_JSON
: MessageProperties.BASIC;

Expand Down Expand Up @@ -577,9 +612,7 @@ private Channel giveMeRandomChannel() throws RemRemPublishException {
}
}
try {
Channel channel = rabbitConnection.createChannel();
channel.confirmSelect();
rabbitChannels.add(channel);
Channel channel = createNewChannel();
return channel;
} catch (IOException e) {
log.error(e.getMessage(), e);
Expand Down