Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
e5ed33b
Limit SubcriptionHandler threading
m-linner-ericsson Jun 11, 2020
b1d43fe
Various fixes
m-linner-ericsson Jun 11, 2020
887db2e
Revert name change on existing properties and use
e-pettersson-ericsson Jun 11, 2020
1ca3cbf
2.x Use waitlist queue name as routing key (#461)
Christoffer-Cortes Jun 12, 2020
6d1d5dd
Remove duplicate logging (#466)
e-pettersson-ericsson Jun 12, 2020
ead4bfb
Added check to see if the mongo client is null
Jun 12, 2020
01a661f
2.x Remove port logging in event handler (#464)
Christoffer-Cortes Jun 12, 2020
b060456
Added more checks on mongoclient
Jun 12, 2020
344f465
For triggering travis CI
Jun 12, 2020
ff600ed
Trying to trigger travis
Jun 12, 2020
3004823
Limit SubcriptionHandler threading
m-linner-ericsson Jun 11, 2020
0864651
Various fixes
m-linner-ericsson Jun 11, 2020
0c6fd2f
Revert name change on existing properties and use
e-pettersson-ericsson Jun 11, 2020
956e8e5
Added check to see if the mongo client is null
Jun 12, 2020
2d4892d
Added more checks on mongoclient
Jun 12, 2020
950ed52
For triggering travis CI
Jun 12, 2020
613f041
Trying to trigger travis
Jun 12, 2020
fe4b684
Rebased the changes with 2.0.0-maintainance branch
Jun 12, 2020
6a63c6a
Merge branch 'threading_fixes' of https://github.com/m-linner-ericsso…
Jun 12, 2020
ce102df
rebased with 2.0.0-maintainance branch
Jun 12, 2020
54740cd
Revert new connection to mongo in each thread
e-pettersson-ericsson Jun 12, 2020
051a170
Revert changes in mongodb handler
e-pettersson-ericsson Jun 12, 2020
9e67b9a
Step patch version to 2.0.1
e-pettersson-ericsson Jun 12, 2020
871dd0d
Update default values to match eventhandler threadpool
e-pettersson-ericsson Jun 12, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.github.ericsson</groupId>
<artifactId>eiffel-intelligence</artifactId>
<version>2.2.0</version>
<version>2.2.1</version>
<packaging>war</packaging>

<parent>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
"threads.corePoolSize= 3",
"threads.queueCapacity= 1",
"threads.maxPoolSize= 4",
"subscription-handler.threads.corePoolSize= 3",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this not be a bottleneck if EventHandler thread works faster than the SubscriptionHandler,, we will queue up more and more threads. So Subscriptionhandler threadpool should be much bigger than Eventhandler threadpool.
This could be a good load test scenario, that tests number SubscritionHandler threads needed for different EventHandler threadpool sizes.
What happens if threads.queueCapacity is reached, is those new threads never created and Aggregations never Subscription checked and no triggering is happening ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I'm not sure if it will be a bottleneck (or if it's a big risk) . The problem we saw before was that an infinite amount of threads were created from SubscriptionHandler and because it reads from the database, it blocked all other processing of events (all EventHandler threads were blocked). Now at least we have a limit on the amount of threads allowed for subscription handler. It's a trade off, which one do we want to prioritize: processing events from rabbitmq or trigger subscriptions as fast as possible?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to do a "baton pass" so that you wait to process until a thread is available? But then I guess both operations they might as well be in the same thread process...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I explained the situation in a chat.
Short summary of the situation I am trying to explain:
If EventHandler thread (event consumer) executes much faster than SubscriptionHandler that executes much slower due to MongoDb connection,queries, updates and read read of big JSON objects, then a lot of threads will be queuing up in SubscriptionHandler thread pool.
And what happens when that queue is full.
If RabbitMq queue has 100 000 events, than we almost must have 100 000 in queue capacity in SubscriptionHandler treadpool queue, to be on safe side so we don't loose any aggregation subscription checks that is made by SubscriptionHandler thread.

Copy link
Contributor

@tobiasake tobiasake Jun 12, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If eventHandler threads blocks until a new thread slot is free in SubscriptionHandler thread queue before creating the thread in queue, then it will not be a problem, I think.
Someone can maybe check that?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand your point @tobiasake but it's hard to test such a scenario. For now I have the same amount of threads in both pools. And don't forget that the event handler also does read/write to mongodb to merge and update aggregations so I don't think it executes so much faster than subscription handler threads do.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it would use bigger event linked chains that causes bigger Aggregations and casuign longer/bigger opeations agains MongoDb
It also possible to mock som functions in code and add sleeps in subscriptionHandler threads,, so it takes longer time to process subscriptionhandler threads.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am mean, why implement something to fix a issue without testing that specific situation?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It sounds complex, doesn't the thread pool have some sort of queue? This queue would keep growing though if the SubscriptionHandler is slower and more and more events are coming in.

Copy link
Contributor

@tobiasake tobiasake Jun 12, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Of what I can understand on the info about ThreadPool,, queue throws exeception if queue is full. So Aggregation and matching subscription will not happen for those aggreagation that happens when subscriptionQueueHandler Queue is full. So alot of Subscripion triggering will never happen if its alot of events in MessageBus queue and EI consumes more event than number of SubscritpionHandler threads can handle due to the ThreadPool limitations.

"subscription-handler.threads.queueCapacity= 1",
"subscription-handler.threads.maxPoolSize= 4",
"waitlist.collection.ttlValue: 60",
"waitlist.initialDelayResend= 500",
"waitlist.fixedRateResend= 1000",
Expand All @@ -46,12 +49,7 @@ public class ThreadingAndWaitlistRepeatSteps extends FunctionalTestBase {
@Autowired
private Environment environment;

@Value("${threads.corePoolSize}")
private int corePoolSize;
@Value("${threads.queueCapacity}")
private int queueCapacity;
@Value("${threads.maxPoolSize}")
private int maxPoolSize;

@Value("${waitlist.collection.ttlValue}")
private int waitlistTtl;

Expand All @@ -64,41 +62,41 @@ public class ThreadingAndWaitlistRepeatSteps extends FunctionalTestBase {

@Given("^that eiffel events are sent$")
public void that_eiffel_events_are_sent() throws Throwable {
List<String> eventNamesToSend = getEventNamesToSend();
final List<String> eventNamesToSend = getEventNamesToSend();
eventManager.sendEiffelEvents(EIFFEL_EVENTS_JSON_PATH, eventNamesToSend);
}

@Then("^waitlist should not be empty$")
public void waitlist_should_not_be_empty() throws Throwable {
TimeUnit.SECONDS.sleep(5);
int waitListSize = dbManager.waitListSize();
final int waitListSize = dbManager.waitListSize();
assertNotEquals(0, waitListSize);
}

@Given("^no event is aggregated$")
public void no_event_is_aggregated() throws Throwable {
boolean aggregatedObjectExists = dbManager.verifyAggregatedObjectExistsInDB();
final boolean aggregatedObjectExists = dbManager.verifyAggregatedObjectExistsInDB();
assertEquals("aggregatedObjectExists was true, should be false, ", false, aggregatedObjectExists);
}

@Then("^event-to-object-map is manipulated to include the sent events$")
public void event_to_object_map_is_manipulated_to_include_the_sent_events() throws Throwable {
JsonNode parsedJSON = eventManager.getJSONFromFile(EIFFEL_EVENTS_JSON_PATH);
ObjectMapper objectMapper = new ObjectMapper();
final JsonNode parsedJSON = eventManager.getJSONFromFile(EIFFEL_EVENTS_JSON_PATH);
final ObjectMapper objectMapper = new ObjectMapper();
rulesJson = objectMapper.readTree(ID_RULE);
rulesObject = new RulesObject(rulesJson);

String dummyObjectID = "1234abcd-12ab-12ab-12ab-123456abcdef";
List<String> eventNames = getEventNamesToSend();
for (String eventName : eventNames) {
JsonNode eventJson = parsedJSON.get(eventName);
final String dummyObjectID = "1234abcd-12ab-12ab-12ab-123456abcdef";
final List<String> eventNames = getEventNamesToSend();
for (final String eventName : eventNames) {
final JsonNode eventJson = parsedJSON.get(eventName);
eventToObjectMapHanler.updateEventToObjectMapInMemoryDB(rulesObject, eventJson.toString(), dummyObjectID);
}
}

@Then("^when waitlist has resent events they should have been deleted$")
public void when_waitlist_has_resent_events_they_should_have_been_deleted() throws Throwable {
long stopTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(3);
final long stopTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(3);
while (dbManager.waitListSize() > 0 && stopTime > System.currentTimeMillis()) {
TimeUnit.MILLISECONDS.sleep(100);
}
Expand All @@ -108,19 +106,19 @@ public void when_waitlist_has_resent_events_they_should_have_been_deleted() thro

@Then("^after the time to live has ended, the waitlist should be empty$")
public void after_the_time_to_live_has_ended_the_waitlist_should_be_empty() throws Throwable {
long stopTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(waitlistTtl + 60);
final long stopTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(waitlistTtl + 60);
while (dbManager.waitListSize() > 0 && stopTime > System.currentTimeMillis()) {
TimeUnit.MILLISECONDS.sleep(10000);
}
int waitListSize = dbManager.waitListSize();
final int waitListSize = dbManager.waitListSize();
assertEquals(0, waitListSize);
}

/**
* Events used in the aggregation.
*/
protected List<String> getEventNamesToSend() {
List<String> eventNames = new ArrayList<>();
final List<String> eventNames = new ArrayList<>();
eventNames.add("event_EiffelConfidenceLevelModifiedEvent_3_2");
eventNames.add("event_EiffelArtifactPublishedEvent_3");
eventNames.add("event_EiffelTestCaseTriggeredEvent_3");
Expand Down
2 changes: 1 addition & 1 deletion src/main/docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ services:
environment: # Overrides settings in application config file
- SpringApplicationName=eiffel-intelligence-backend
- server.port=8080
- rules.path=src/main/resources/ArtifactRules.json
- rules.path=/rules/ArtifactRules-Eiffel-Agen-Version.json
- rabbitmq.host=rabbitmq
- rabbitmq.port=${RABBITMQ_AMQP_PORT}
- rabbitmq.domainId=ei-domain
Expand Down
3 changes: 3 additions & 0 deletions src/main/java/com/ericsson/ei/config/ConfigurationLogger.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ private void logConfiguration() {
+ "threads.corePoolSize: " + env.getProperty("threads.corePoolSize") + "\n"
+ "threads.queueCapacity: " + env.getProperty("threads.queueCapacity") + "\n"
+ "threads.maxPoolSize: " + env.getProperty("threads.maxPoolSize") + "\n"
+ "subscription-handler.threads.corePoolSize: " + env.getProperty("subscription-handler.threads.corePoolSize") + "\n"
+ "subscription-handler.threads.queueCapacity: " + env.getProperty("subscription-handler.threads.queueCapacity") + "\n"
+ "subscription-handler.threads.maxPoolSize: " + env.getProperty("subscription-handler.threads.maxPoolSize") + "\n"
+ "missedNotificationCollectionName: " + env.getProperty("missedNotificationCollectionName") + "\n"
+ "missedNotificationDataBaseName: " + env.getProperty("missedNotificationDataBaseName") + "\n"
+ "email.sender: " + env.getProperty("email.sender") + "\n"
Expand Down
47 changes: 37 additions & 10 deletions src/main/java/com/ericsson/ei/config/SpringAsyncConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
Expand All @@ -29,23 +30,49 @@
@EnableAsync
public class SpringAsyncConfig implements AsyncConfigurer{

@Value("${threads.corePoolSize}") private int corePoolSize;
@Value("${threads.queueCapacity}") private int queueCapacity;
@Value("${threads.maxPoolSize}") private int maxPoolSize;

@Value("${threads.corePoolSize}")
private int eventHandlerCorePoolSize;

@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(corePoolSize);
executor.setQueueCapacity(queueCapacity);
executor.setMaxPoolSize(maxPoolSize);
executor.setThreadNamePrefix("EventHandler-");
@Value("${threads.queueCapacity}")
private int eventHandlerQueueCapacity;

@Value("${threads.maxPoolSize}")
private int eventHandlerMaxPoolSize;

@Value("${subscription-handler.threads.corePoolSize:50}")
private int subscriptionHandlerCorePoolSize;

@Value("${subscription-handler.threads.queueCapacity:5000}")
private int subscriptionHandlerQueueCapacity;

@Value("${subscription-handler.threads.maxPoolSize:50}")
private int subscriptionHandlerMaxPoolSize;


@Bean("subscriptionHandlerExecutor")
public Executor subscriptionHandlerExecutor() {
final ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(subscriptionHandlerCorePoolSize);
executor.setQueueCapacity(subscriptionHandlerQueueCapacity);
executor.setMaxPoolSize(subscriptionHandlerMaxPoolSize);
executor.setThreadNamePrefix("SubscriptionHandler-");
executor.initialize();
return executor;
}


@Bean("eventHandlerExecutor")
public Executor eventHandlerExecutor() {
final ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(eventHandlerCorePoolSize);
executor.setQueueCapacity(eventHandlerQueueCapacity);
executor.setMaxPoolSize(eventHandlerMaxPoolSize);
executor.setThreadNamePrefix("EventHandler-");
executor.initialize();
return executor;
}

@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
// TODO Auto-generated method stub
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public class ParseInstanceInfoEI {

@PostConstruct
public void init() throws IOException {
Properties properties = new Properties();
final Properties properties = new Properties();
properties.load(
ParseInstanceInfoEI.class.getResourceAsStream("/default-application.properties"));
version = properties.getProperty("version");
Expand Down Expand Up @@ -184,14 +184,26 @@ public void init() throws IOException {
private class ThreadsValue {
@Getter
@Value("${threads.corePoolSize}")
private int corePoolSize;
private int eventHandlerCorePoolSize;

@Getter
@Value("${threads.queueCapacity}")
private int queueCapacity;
private int eventHandlerQueueCapacity;

@Getter
@Value("${threads.maxPoolSize}")
private int maxPoolSize;
private int eventHandlerMaxPoolSize;

@Getter
@Value("${subscription-handler.threads.corePoolSize}")
private int subscriptionHandlerCorePoolSize;

@Getter
@Value("${subscription-handler.threads.queueCapacity}")
private int subscriptionHandlerQueueCapacity;

@Getter
@Value("${subscription-handler.threads.maxPoolSize}")
private int subscriptionHandlerMaxPoolSize;
}
}
26 changes: 12 additions & 14 deletions src/main/java/com/ericsson/ei/handlers/EventHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,26 +51,24 @@ public RulesHandler getRulesHandler() {
return rulesHandler;
}

public void eventReceived(String event) {
RulesObject eventRules = rulesHandler.getRulesForEvent(event);
public void eventReceived(final String event) {
final RulesObject eventRules = rulesHandler.getRulesForEvent(event);
idRulesHandler.runIdRules(eventRules, event);
}

@Async
public void onMessage(Message message, Channel channel) throws Exception {
String messageBody = new String(message.getBody());
ObjectMapper objectMapper = new ObjectMapper();
JsonNode node = objectMapper.readTree(messageBody);
String id = node.get("meta").get("id").toString();
String port = environment.getProperty("local.server.port");
Thread.currentThread().setName(Thread.currentThread().getName() + "-" + port);
LOGGER.debug("Thread id {} spawned for EventHandler on port: {}", Thread.currentThread().getId(), port);
LOGGER.debug("Event {} received on port {}", id, port);
@Async("eventHandlerExecutor")
public void onMessage(final Message message, final Channel channel) throws Exception {
final String messageBody = new String(message.getBody());
final ObjectMapper objectMapper = new ObjectMapper();
final JsonNode node = objectMapper.readTree(messageBody);
final String id = node.get("meta").get("id").toString();
LOGGER.debug("Thread id {} spawned for EventHandler", Thread.currentThread().getId());
LOGGER.debug("Event {} received", id);

eventReceived(messageBody);
long deliveryTag = message.getMessageProperties().getDeliveryTag();
final long deliveryTag = message.getMessageProperties().getDeliveryTag();
channel.basicAck(deliveryTag, false);

LOGGER.debug("Event {} processed on port {}", id, port);
LOGGER.debug("Event {} processed", id);
}
}
34 changes: 16 additions & 18 deletions src/main/java/com/ericsson/ei/handlers/RmqHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,6 @@ public class RmqHandler {
@JsonIgnore
private SimpleMessageListenerContainer container;

private static final String WAITLIST_BINDING_KEY = "eiffel-intelligence.waitlist";

@Bean
public ConnectionFactory connectionFactory() {
cachingConnectionFactory = new CachingConnectionFactory(host, port);
Expand All @@ -137,7 +135,7 @@ public ConnectionFactory connectionFactory() {
try {
LOGGER.debug("Using SSL/TLS version {} connection to RabbitMQ.", tlsVersion);
cachingConnectionFactory.getRabbitConnectionFactory().useSslProtocol(tlsVersion);
} catch (Exception e) {
} catch (final Exception e) {
LOGGER.error("Failed to set SSL/TLS version.", e);
}
}
Expand Down Expand Up @@ -169,24 +167,24 @@ public TopicExchange exchange() {

@Bean
Binding binding() {
return BindingBuilder.bind(internalQueue()).to(exchange()).with(WAITLIST_BINDING_KEY);
return BindingBuilder.bind(internalQueue()).to(exchange()).with(getWaitlistQueueName());
}

@Bean
public List<Binding> bindings() {
String[] bingingKeysArray = splitBindingKeys(bindingKeys);
List<Binding> bindingList = new ArrayList<Binding>();
for (String bindingKey : bingingKeysArray) {
final String[] bingingKeysArray = splitBindingKeys(bindingKeys);
final List<Binding> bindingList = new ArrayList<>();
for (final String bindingKey : bingingKeysArray) {
bindingList.add(BindingBuilder.bind(externalQueue()).to(exchange()).with(bindingKey));
}
return bindingList;
}

@Bean
public SimpleMessageListenerContainer bindToQueueForRecentEvents(
ConnectionFactory springConnectionFactory,
EventHandler eventHandler) {
MessageListenerAdapter listenerAdapter = new EIMessageListenerAdapter(eventHandler);
final ConnectionFactory springConnectionFactory,
final EventHandler eventHandler) {
final MessageListenerAdapter listenerAdapter = new EIMessageListenerAdapter(eventHandler);
container = new SimpleMessageListenerContainer();
container.setConnectionFactory(springConnectionFactory);
container.setQueueNames(getQueueName(), getWaitlistQueueName());
Expand All @@ -213,10 +211,10 @@ public RabbitTemplate rabbitMqTemplate() {

rabbitTemplate.setExchange(exchangeName);
rabbitTemplate.setQueue(getWaitlistQueueName());
rabbitTemplate.setRoutingKey(WAITLIST_BINDING_KEY);
rabbitTemplate.setRoutingKey(getWaitlistQueueName());
rabbitTemplate.setConfirmCallback(new ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
public void confirm(final CorrelationData correlationData, final boolean ack, final String cause) {
LOGGER.info("Received confirm with result : {}", ack);
}
});
Expand All @@ -225,18 +223,18 @@ public void confirm(CorrelationData correlationData, boolean ack, String cause)
}

public String getQueueName() {
String durableName = queueDurable ? "durable" : "transient";
final String durableName = queueDurable ? "durable" : "transient";
return domainId + "." + componentName + "." + consumerName + "." + durableName;
}

public String getWaitlistQueueName() {

String durableName = queueDurable ? "durable" : "transient";
final String durableName = queueDurable ? "durable" : "transient";
return domainId + "." + componentName + "." + consumerName + "." + durableName + "."
+ waitlistSufix;
}

public void publishObjectToWaitlistQueue(String message) {
public void publishObjectToWaitlistQueue(final String message) {
LOGGER.debug("Publishing message to message bus...");
rabbitTemplate.convertAndSend(message);
}
Expand All @@ -245,13 +243,13 @@ public void close() {
try {
container.destroy();
cachingConnectionFactory.destroy();
} catch (Exception e) {
} catch (final Exception e) {
LOGGER.error("Exception occurred while closing connections.", e);
}
}

private String[] splitBindingKeys(String bindingKeys) {
String bindingKeysWithoutWhitespace = bindingKeys.replaceAll("\\s+", "");
private String[] splitBindingKeys(final String bindingKeys) {
final String bindingKeysWithoutWhitespace = bindingKeys.replaceAll("\\s+", "");
return bindingKeysWithoutWhitespace.split(",");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ public interface ISubscriptionService {
*
* @param subscription
* @throws JsonProcessingException
* @throws EncryptorException
* @throws MongoWriteException
*/
void addSubscription(Subscription subscription) throws JsonProcessingException, MongoWriteException;
Expand Down
Loading