Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.github.ericsson</groupId>
<artifactId>eiffel-intelligence</artifactId>
<version>3.1.3</version>
<version>3.2.0</version>
<packaging>war</packaging>

<parent>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

import javax.servlet.http.HttpServletRequest;

import com.ericsson.ei.exception.InvalidRulesException;
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
Expand All @@ -20,10 +19,12 @@

import com.ericsson.ei.controller.model.RuleCheckBody;
import com.ericsson.ei.controller.model.RulesCheckBody;
import com.ericsson.ei.exception.InvalidRulesException;
import com.ericsson.ei.jmespath.JmesPathInterface;
import com.ericsson.ei.rules.IRuleTestService;
import com.ericsson.ei.utils.ResponseMessage;

import io.netty.util.internal.StringUtil;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import lombok.Setter;
Expand Down Expand Up @@ -85,8 +86,14 @@ public ResponseEntity<?> createRuleTestRunFullAggregation(
final HttpServletRequest httpRequest) {
if (testEnabled) {
try {
String aggregatedObject = ruleTestService.prepareAggregatedObject(
new JSONArray(body.getListRulesJson()), new JSONArray(body.getListEventsJson()));
String aggregatedObject = StringUtil.EMPTY_STRING;
try {
aggregatedObject = ruleTestService.prepareAggregatedObject(
new JSONArray(body.getListRulesJson()), new JSONArray(body.getListEventsJson()));
} catch (Exception e) {
String errorMessage = "Failed to generate aggregated object.";
LOGGER.error(errorMessage, e);
}
if (aggregatedObject != null && !aggregatedObject.equals("[]")) {
return new ResponseEntity<>(aggregatedObject, HttpStatus.OK);
} else {
Expand All @@ -96,11 +103,7 @@ public ResponseEntity<?> createRuleTestRunFullAggregation(
return new ResponseEntity<>(errorJsonAsString, HttpStatus.BAD_REQUEST);
}
}
catch (InvalidRulesException e) {
String errorJsonAsString = ResponseMessage.createJsonMessage(e.getMessage());
return new ResponseEntity<>(errorJsonAsString, HttpStatus.BAD_REQUEST);
}
catch (JSONException | IOException e) {
catch (JSONException e) {
String errorMessage = "Failed to generate aggregated object.";
LOGGER.error(errorMessage, e);
String errorJsonAsString = ResponseMessage.createJsonMessage(errorMessage);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ public ResponseEntity sendRequestToER(String eventId, SearchOption searchOption,
return request.performRequest();
}


private HttpRequest prepareRequest(String eventId, SearchOption searchOption, int limit,
int levels, boolean tree, HttpRequest request) throws IOException, URISyntaxException {
Boolean shallowParameter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import com.ericsson.ei.exception.MongoDBConnectionException;
import com.ericsson.ei.jmespath.JmesPathInterface;
import com.ericsson.ei.jsonmerge.DownstreamMergeHandler;
import com.ericsson.ei.rules.RulesObject;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.mongodb.MongoExecutionTimeoutException;

@Component
public class DownstreamExtractionHandler {
Expand All @@ -35,6 +37,7 @@ public class DownstreamExtractionHandler {
@Autowired private JmesPathInterface jmesPathInterface;
@Autowired private DownstreamMergeHandler mergeHandler;
@Autowired private ObjectHandler objectHandler;


public void runExtraction(RulesObject rulesObject, String mergeId, String event, String aggregatedDbObject) {
try {
Expand All @@ -47,7 +50,8 @@ public void runExtraction(RulesObject rulesObject, String mergeId, String event,
}
}

public void runExtraction(RulesObject rulesObject, String mergeId, String event, JsonNode aggregatedDbObject) {
public void runExtraction(RulesObject rulesObject, String mergeId, String event, JsonNode aggregatedDbObject)
throws MongoExecutionTimeoutException, MongoDBConnectionException {
JsonNode extractedContent;
extractedContent = extractContent(rulesObject, event);
LOGGER.debug("Start extraction of Aggregated Object:\n{} \nwith Event:\n{}", aggregatedDbObject.toString(), event);
Expand Down
30 changes: 19 additions & 11 deletions src/main/java/com/ericsson/ei/handlers/EventHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@
*/
package com.ericsson.ei.handlers;

import com.ericsson.ei.exception.MongoDBConnectionException;
import com.ericsson.ei.rules.IdRulesHandler;
import org.apache.http.conn.HttpHostConnectException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
Expand All @@ -26,13 +25,16 @@
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;

import com.ericsson.ei.exception.MongoDBConnectionException;
import com.ericsson.ei.rules.IdRulesHandler;
import com.ericsson.ei.rules.RulesHandler;
import com.ericsson.ei.rules.RulesObject;
import com.ericsson.ei.utils.MongoDBMonitorThread;
import com.ericsson.ei.utils.SpringContext;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.mongodb.MongoExecutionTimeoutException;
import com.rabbitmq.client.Channel;
import com.ericsson.ei.utils.MongoDBMonitorThread;
import com.ericsson.ei.utils.SpringContext;

@Component
public class EventHandler {
Expand All @@ -58,9 +60,10 @@ public RulesHandler getRulesHandler() {
return rulesHandler;
}

public void eventReceived(String event) throws MongoDBConnectionException {
public void eventReceived(String event, final boolean isRelivered)
throws MongoDBConnectionException, Exception {
RulesObject eventRules = rulesHandler.getRulesForEvent(event);
idRulesHandler.runIdRules(eventRules, event);
idRulesHandler.runIdRules(eventRules, event, isRelivered);
}

@Async
Expand All @@ -69,12 +72,13 @@ public void onMessage(Message message, Channel channel) throws Exception {
ObjectMapper objectMapper = new ObjectMapper();
JsonNode node = objectMapper.readTree(messageBody);
String id = node.get("meta").get("id").toString();

final boolean isRedelivered = message.getMessageProperties().isRedelivered();
final int waitBeforeSendBack = 2000;
long deliveryTag = message.getMessageProperties().getDeliveryTag();
LOGGER.debug("Thread id {} spawned for EventHandler", Thread.currentThread().getId());
try {
LOGGER.info("Event {} Received", id);
eventReceived(messageBody);
eventReceived(messageBody, isRedelivered);
channel.basicAck(deliveryTag, false);
LOGGER.info("Event {} processed", id);
} catch (MongoDBConnectionException mdce) {
Expand Down Expand Up @@ -108,10 +112,14 @@ public void onMessage(Message message, Channel channel) throws Exception {
// once the mongoDB Connection is up event will be sent back to queue with
// un-acknowledgement
channel.basicNack(deliveryTag, false, true);
LOGGER.info(
"Sent back the event {} to queue with un-acknowledgement due to {}", id, mdce);
LOGGER.info("Sent back the event {} to queue with un-acknowledgement due to {}", id, mdce);
} catch (HttpHostConnectException | MongoExecutionTimeoutException e) {
LOGGER.info("Waiting for {} mili-seconds before sending the event back to queue", waitBeforeSendBack);
Thread.sleep(waitBeforeSendBack);
channel.basicNack(deliveryTag, false, true);
LOGGER.info("Sent back the event {} to queue with un-acknowledgement: ", id);
} catch (Exception e) {
LOGGER.error("Event is not Re-queued due to exception for id: {} Exception: {} ", id, e);
LOGGER.error("Event is not Re-queued due to exception for id: {} Exception: {} ", id, e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,7 @@ public void init() throws AbortExecutionException {
LOGGER.error("Failed to create an index for {} due to: {}", collectionName, e);
}
}



public void setCollectionName(String collectionName) {
this.collectionName = collectionName;
}
Expand Down Expand Up @@ -112,8 +111,7 @@ public ArrayList<String> getObjectsForEventId(String eventId) {
* @param event
* @param objectId aggregated event object Id
*/
public void updateEventToObjectMapInMemoryDB(RulesObject rulesObject, String event,
String objectId, int ttlValue) {
public void updateEventToObjectMapInMemoryDB(RulesObject rulesObject, String event, String objectId, int ttlValue) {
String eventId = getEventId(rulesObject, event);

final MongoCondition condition = MongoCondition.idCondition(objectId);
Expand Down
39 changes: 26 additions & 13 deletions src/main/java/com/ericsson/ei/handlers/ExtractionHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
*/
package com.ericsson.ei.handlers;

import com.ericsson.ei.rules.ProcessRulesHandler;
import java.io.IOException;

import org.apache.http.conn.HttpHostConnectException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
Expand All @@ -26,10 +28,12 @@
import com.ericsson.ei.exception.PropertyNotFoundException;
import com.ericsson.ei.jmespath.JmesPathInterface;
import com.ericsson.ei.jsonmerge.MergeHandler;
import com.ericsson.ei.rules.ProcessRulesHandler;
import com.ericsson.ei.rules.RulesObject;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.mongodb.MongoExecutionTimeoutException;

@Component
public class ExtractionHandler {
Expand Down Expand Up @@ -62,23 +66,21 @@ public void setObjectHandler(ObjectHandler objectHandler) {
this.objectHandler = objectHandler;
}

public void runExtraction(RulesObject rulesObject, String id, String event, String aggregatedDbObject) throws MongoDBConnectionException {
public void runExtraction(RulesObject rulesObject, String id, String event, String aggregatedDbObject, boolean isRedelivered)
throws HttpHostConnectException, MongoExecutionTimeoutException, MongoDBConnectionException {
try {
ObjectMapper mapper = new ObjectMapper();
JsonNode aggregatedJsonObject = mapper.readTree(aggregatedDbObject);
runExtraction(rulesObject, id, event, aggregatedJsonObject);
} catch (Exception e) {
LOGGER.error("Failed with extraction.", e);
if (e.getMessage() != null && e.getMessage().equalsIgnoreCase("MongoDB Connection down")) {
throw new MongoDBConnectionException("MongoDB Connection down");
}
runExtraction(rulesObject, id, event, aggregatedJsonObject, isRedelivered);
} catch (IOException e) {
LOGGER.warn("Failed to read the aggregated object due to {} ", e);
}
}

public void runExtraction(RulesObject rulesObject, String mergeId, String event, JsonNode aggregatedDbObject) throws MongoDBConnectionException {
public void runExtraction(RulesObject rulesObject, String mergeId, String event, JsonNode aggregatedDbObject, boolean isRedelivered)
throws HttpHostConnectException, MongoExecutionTimeoutException, MongoDBConnectionException {
try {
JsonNode extractedContent = extractContent(rulesObject, event);

String mergedContent = null;
String aggregatedObjectId = null;

Expand All @@ -89,7 +91,17 @@ public void runExtraction(RulesObject rulesObject, String mergeId, String event,
aggregatedDbObject.toString(), extractedContent.toString(), event);
aggregatedObjectId = objectHandler.extractObjectId(aggregatedDbObject);
mergedContent = mergeHandler.mergeObject(aggregatedObjectId, mergeId, rulesObject, event, extractedContent);
mergedContent = processRulesHandler.runProcessRules(event, rulesObject, mergedContent, aggregatedObjectId, mergeId);
if (mergedContent == null) {
return;
}

// Need to extract the history rules for the re-delivered start event type.
if (rulesObject.isStartEventRules() && isRedelivered) {
upStreamEventsHandler.runHistoryExtractionRulesOnAllUpstreamEvents(mergeId);
} else {
mergedContent = processRulesHandler.runProcessRules(event, rulesObject, mergedContent,
aggregatedObjectId, mergeId);
}
} else {
LOGGER.trace("***** Extraction starts for the aggregation Id: " + mergeId);
ObjectNode objectNode = (ObjectNode) extractedContent;
Expand All @@ -101,8 +113,9 @@ public void runExtraction(RulesObject rulesObject, String mergeId, String event,
LOGGER.trace("**** Extraction ends for the aggregation Id: " + mergeId);
}
objectHandler.checkAggregations(mergedContent, aggregatedObjectId);
} catch (PropertyNotFoundException e) {
LOGGER.debug("Did not run history extraction on upstream events.", e);
} catch (HttpHostConnectException | MongoExecutionTimeoutException e) {
LOGGER.warn("Extraction failed for {}, due to {}. Sending back to queue.", event, e.getMessage());
throw e;
} catch (Exception e) {
LOGGER.error("Failed to run extraction for event {}", event, e);
if (e.getMessage() != null && e.getMessage().equalsIgnoreCase("MongoDB Connection down")) {
Expand Down
14 changes: 10 additions & 4 deletions src/main/java/com/ericsson/ei/handlers/ObjectHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@

import javax.annotation.PostConstruct;

import com.ericsson.ei.mongo.*;
import com.fasterxml.jackson.annotation.JsonIgnore;
import org.apache.commons.lang3.StringUtils;
import org.bson.Document;
import org.slf4j.Logger;
Expand All @@ -35,8 +33,14 @@
import com.ericsson.ei.exception.AbortExecutionException;
import com.ericsson.ei.exception.MongoDBConnectionException;
import com.ericsson.ei.jmespath.JmesPathInterface;
import com.ericsson.ei.mongo.MongoCondition;
import com.ericsson.ei.mongo.MongoConstants;
import com.ericsson.ei.mongo.MongoDBHandler;
import com.ericsson.ei.mongo.MongoQuery;
import com.ericsson.ei.mongo.MongoQueryBuilder;
import com.ericsson.ei.rules.RulesObject;
import com.ericsson.ei.subscription.SubscriptionHandler;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.mongodb.BasicDBObject;
Expand Down Expand Up @@ -82,7 +86,7 @@ public class ObjectHandler {
@Setter
@Autowired
private SubscriptionHandler subscriptionHandler;

@PostConstruct
public void init() throws AbortExecutionException {
try {
Expand All @@ -93,6 +97,7 @@ public void init() throws AbortExecutionException {
LOGGER.error("Failed to create an index for {} due to: {}", aggregationsCollectionName, e);
}
}

/**
* This method is responsible for inserting an aggregated object in to the database.
*
Expand All @@ -113,14 +118,15 @@ public String insertObject(String aggregatedObject, RulesObject rulesObject, Str
BasicDBObject document = prepareDocumentForInsertion(id, aggregatedObject);
LOGGER.debug("ObjectHandler: Aggregated Object document to be inserted: {}",
document.toString());

mongoDbHandler.insertDocument(databaseName, aggregationsCollectionName, document.toString());
postInsertActions(aggregatedObject, rulesObject, event, id);
return aggregatedObject;
}

public String insertObject(JsonNode aggregatedObject, RulesObject rulesObject, String event,
String id) throws MongoDBConnectionException {
return insertObject(aggregatedObject.toString(), rulesObject, event, id);
return insertObject(aggregatedObject.toString(), rulesObject, event, id);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,18 +66,24 @@ public void setHistoryExtractionHandler(final HistoryExtractionHandler historyEx
* @throws Exception
* @throws PropertyNotFoundException
*/
public void runHistoryExtractionRulesOnAllUpstreamEvents(String aggregatedObjectId) throws PropertyNotFoundException, Exception {
public void runHistoryExtractionRulesOnAllUpstreamEvents(String aggregatedObjectId) throws Exception {

// Use aggregatedObjectId as eventId since they are the same for start
// events.
long start = System.currentTimeMillis();
final ResponseEntity responseEntity = eventRepositoryQueryService
.getEventStreamDataById(aggregatedObjectId, SearchOption.UP_STREAM, -1, -1, true);

long stop = System.currentTimeMillis();
LOGGER.debug("%%%% Response time for upstream query for id: {}: {} ", aggregatedObjectId, stop-start);

LOGGER.debug("ResponseEntity: " + responseEntity);

if (responseEntity == null) {
LOGGER.info("Asked for upstream from {} but got null response entity back!", aggregatedObjectId);
return;
}

final String searchResultString = responseEntity.getBody();
LOGGER.debug("Search result string is: " + searchResultString);
ObjectMapper mapper = new ObjectMapper();
final JsonNode searchResult = mapper.readTree(searchResultString);

Expand Down
Loading