Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
96950fd
Update version to 5.0.1-SNAPSHOT
skin27 Oct 14, 2024
ca86984
Set max connections to 5000
skin27 Oct 24, 2024
1f05236
set max connections to 3 per connection
skin27 Oct 24, 2024
3109c89
update dependencies
skin27 Oct 25, 2024
4f43d27
comment custom loggers
skin27 Oct 28, 2024
5c2738e
getLastExchangeCompletedTimestamp instead of getLastExchangeFailureTi…
brunovg Nov 4, 2024
150db77
removed unreachable code
brunovg Nov 4, 2024
312b536
Merge pull request #303 from assimbly/300-keep-statistics-up-to-date
skin27 Nov 5, 2024
476ff01
update statistic API
skin27 Nov 5, 2024
7cd179b
Merge branch 'develop' of https://github.com/assimbly/runtime into de…
skin27 Nov 5, 2024
feec5d2
renamed bundleId to flowId
brunovg Nov 5, 2024
7c5bad3
Enhance statistic API
skin27 Nov 5, 2024
a03b53a
Merge pull request #304 from assimbly/4795-phasing-out-bundle_id
skin27 Nov 5, 2024
fb3a657
use 0 if string cannot be parsed
brunovg Nov 5, 2024
c85e5bd
Merge pull request #305 from assimbly/fix/cpu-load-parse-double-empty…
skin27 Nov 5, 2024
dfab524
Small fix to parse CPU to two decimals
skin27 Nov 6, 2024
4269791
parseDouble for other double values
skin27 Nov 6, 2024
94e846e
update flow dependencies property
skin27 Nov 6, 2024
7045820
Update CPU load statistic to use BigDecimal instead of Double
skin27 Nov 6, 2024
7f5b40c
check null of empty for BigDecimal
skin27 Nov 6, 2024
3caf130
if newExchange is null, it means there's no remote file to consume
brunovg Nov 14, 2024
c16a6c9
Add environment variables to set AMQ Client settings
skin27 Nov 18, 2024
a55bca8
use FILE_NAME_CONSUMED instead of FILE_NAME
brunovg Nov 19, 2024
f825c32
update start/stop of flows
skin27 Nov 19, 2024
41a6c34
convert resource body using camel type conversion framework
brunovg Nov 20, 2024
5e07166
Merge pull request #312 from assimbly/307-enricherror-occurred-during…
skin27 Nov 20, 2024
189ee00
extend stats endpoint
skin27 Nov 20, 2024
fbdc62d
extend stats endpoint
skin27 Nov 20, 2024
a6c4d7e
exclude null entries
brunovg Nov 21, 2024
42f0495
disable stop report when stopFlow is called internally by startFlow
brunovg Nov 21, 2024
6a84a6c
Merge pull request #316 from assimbly/4840-unclear-status-of-a-seemin…
skin27 Nov 21, 2024
3d7f696
if CamelFileNameConsumed header is null then use the CamelFileName he…
brunovg Nov 22, 2024
2e6ff04
Merge branch '307-enricherror-occurred-during-enrichment' of https://…
brunovg Nov 22, 2024
3737757
Merge pull request #317 from assimbly/307-enricherror-occurred-during…
brunovg Nov 27, 2024
2d2d66d
update log processors (not used yet)
skin27 Nov 29, 2024
25e0830
Merge branch 'develop' of https://github.com/assimbly/runtime into de…
skin27 Nov 29, 2024
84bddc5
Update version to 5.0.1
skin27 Nov 29, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion broker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<parent>
<artifactId>runtime</artifactId>
<groupId>org.assimbly</groupId>
<version>5.0.0</version>
<version>5.0.1</version>
</parent>

<name>broker</name>
Expand Down
2 changes: 1 addition & 1 deletion brokerRest/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<parent>
<artifactId>runtime</artifactId>
<groupId>org.assimbly</groupId>
<version>5.0.0</version>
<version>5.0.1</version>
</parent>

<name>broker-rest</name>
Expand Down
2 changes: 1 addition & 1 deletion dil/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<parent>
<artifactId>runtime</artifactId>
<groupId>org.assimbly</groupId>
<version>5.0.0</version>
<version>5.0.1</version>
</parent>

<name>dil</name>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,21 @@ public class ZipFileEnrichStrategy implements AggregationStrategy {
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
elementNames = new ArrayList<>();

if (newExchange == null) {
// there’s no remote file to consume
return oldExchange;
}

Message in = oldExchange.getIn();
Message resource = newExchange.getIn();

byte[] sourceZip = in.getBody(byte[].class);
byte[] resourceData = resource.getBody(byte[].class);
byte[] resourceData = newExchange.getContext().getTypeConverter().convertTo(byte[].class, resource.getBody());

String fileName = resource.getHeader(Exchange.FILE_NAME, String.class);
String fileName = resource.getHeader(Exchange.FILE_NAME_CONSUMED, String.class);
if(fileName == null) {
fileName = resource.getHeader(Exchange.FILE_NAME, String.class);
}

ByteArrayOutputStream baos = new ByteArrayOutputStream();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,10 @@ private void setFields(){
} else if (conType.equals("pooled")) {

if (maxConnections == null) {
maxConnections = "10";
maxConnections = "2";
}
if (concurentConsumers == null) {
concurentConsumers = "10";
concurentConsumers = "2";
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,10 @@ private void addVariables(Exchange exchange) {
}

private void addHeaders(Exchange exchange) {

JSONObject headers2 = getJsonFromMap(filterHeaderAndProperties(exchange.getIn().getHeaders()));
json.put("Headers", headers2);

if (showAll || showHeaders) {
JSONObject headers = getJsonFromMap(filterHeaderAndProperties(exchange.getIn().getHeaders()));
json.put("Headers", headers);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package org.assimbly.dil.blocks.processors;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;

import java.time.Instant;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;


public class OpenTelemetryLogProcessor implements Processor {

public void process(Exchange exchange) throws Exception {

String json = format(exchange);

System.out.println(json);

}


private String format(Exchange exchange) throws JsonProcessingException {

Object inBody = exchange.getIn().getBody();
Map<String, Object> exchangeMap = new HashMap<>();
exchangeMap.put("ExchangeId", exchange.getExchangeId());
exchangeMap.put("ExchangePattern", exchange.getPattern().toString());
exchangeMap.put("Body", inBody != null ? inBody.toString() : "null");
exchangeMap.put("Headers", exchange.getIn().getHeaders());

Instant now = Instant.now();

// Convert Instant to string using a formatter
String formattedTime = DateTimeFormatter
.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")
.withZone(ZoneOffset.UTC)
.format(now);

Map<String, Object> map = new HashMap<>();
map.put("timestamp", formattedTime);
map.put("logLevel", "INFO");
map.put("serviceName", exchange.getFromRouteId());
map.put("message", exchange.getFromRouteId());
map.put("attributes",exchangeMap);

ObjectMapper mapper = new ObjectMapper();
String jsonString = mapper.writeValueAsString(map);

return jsonString;
}

}
Original file line number Diff line number Diff line change
@@ -1,12 +1,7 @@
package org.assimbly.dil.blocks.processors;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import java.util.HashMap;
import java.util.Map;


public class SetLogProcessor implements Processor {

Expand All @@ -17,26 +12,8 @@ public void process(Exchange exchange) throws Exception {

String json = jsonFormatter.format(exchange);

System.out.println("-------------MyLog --------------------->\n\n" + json);

}


private String formatExchangeToString(Exchange exchange) throws JsonProcessingException {

Object inBody = exchange.getIn().getBody();

Map<String, Object> map = new HashMap<>();
map.put("ExchangeId", exchange.getExchangeId());
map.put("FromRouteId", exchange.getFromRouteId());
map.put("ExchangePattern", exchange.getPattern().toString());
map.put("Body", inBody != null ? inBody.toString() : "null");
map.put("Headers", exchange.getIn().getHeaders());

ObjectMapper mapper = new ObjectMapper();
String jsonString = mapper.writeValueAsString(map);
System.out.println(json);

return jsonString;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public String getId() {
return id;
}

@JsonProperty("bundleId")
@JsonProperty("flowId")
public String getFlowId() {
return flowId;
}
Expand All @@ -100,7 +100,7 @@ public String getFlowVersion() {
return flowVersion;
}

@JsonProperty("previousBundleId")
@JsonProperty("previousFlowId")
public String getPreviousFlowId() {
return previousFlowId;
}
Expand Down
25 changes: 24 additions & 1 deletion dil/src/main/java/org/assimbly/dil/loader/FlowLoader.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

import org.apache.camel.*;
import org.apache.camel.builder.*;
import org.apache.camel.model.ModelCamelContext;
import org.apache.camel.model.RouteConfigurationDefinition;
import org.apache.camel.spi.Resource;
import org.apache.camel.spi.RoutesBuilderLoader;
import org.apache.camel.spi.RoutesLoader;
Expand Down Expand Up @@ -127,14 +129,15 @@ private void setErrorHandlers() throws Exception{

private void setRouteConfigurations() throws Exception{

removeRouteConfiguration(flowId);

for(String prop : props.keySet()){
if(prop.endsWith("routeconfiguration")){

String routeConfiguration = props.get(prop);
String id = props.get(prop + ".id");

if(routeConfiguration!=null && !routeConfiguration.isEmpty()){
context.removeRoute(id);
loadStep(routeConfiguration, "routeconfiguration", id, null);
}
}
Expand Down Expand Up @@ -247,6 +250,26 @@ private void setErrorHandler(String id, String errorUri) throws Exception {

}

private void removeRouteConfiguration(String flowId) {

ModelCamelContext modelContext = (ModelCamelContext) context;

List<RouteConfigurationDefinition> routeConfigurationsToRemove = modelContext.getRouteConfigurationDefinitions().stream()
.filter(Objects::nonNull) // Exclude null entries
.filter(routeConfig -> routeConfig.getId().startsWith(flowId))
.toList(); // Collect into a new list to avoid modifying the original list during iteration

routeConfigurationsToRemove.forEach(routeConfig -> {
try {
modelContext.removeRouteConfiguration(routeConfig);
log.info("Removed routeConfiguration: " + routeConfig.getId());
} catch (Exception e) {
log.warn("Failed to remove route configuration: " + routeConfig.getId());
}
});

}

public String getReport(){
return flowLoaderReport.getReport();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
// This class unmarshalls an XML file into a Java treemap object
// The XML file must be in DIL (Data Integration Language) format
public class Unmarshall {

private Document doc;
private TreeMap<String, String> properties;
private XMLConfiguration conf;
Expand Down Expand Up @@ -97,17 +96,13 @@ private void addDependencies(Element element){
}

String flowDependencies = null;
NodeList dependenciesList = dependencies.getChildNodes();

for (int i = 0; i < dependenciesList.getLength(); i++) {
Node dependency = dependenciesList.item(i);
if (dependency instanceof Element) {
if(i == 0){
flowDependencies = dependency.getTextContent();
}else{
flowDependencies = flowDependencies + "," + dependency.getTextContent();
}
List<Element> dependenciesList = getElementChildren(dependencies);

for(Element dependency: dependenciesList){
if(flowDependencies==null){
flowDependencies = dependency.getTextContent();
}else{
flowDependencies = flowDependencies + "," + dependency.getTextContent();
}
}

Expand Down Expand Up @@ -229,4 +224,18 @@ private String evaluateXpath(String xpath) throws TransformerException, XPathExp
return xp.evaluate(doc);
}

List<Element> getElementChildren(Node parent) {
List<Element> elementChildren = new ArrayList<>();
NodeList childNodes = parent.getChildNodes();

for (int i = 0; i < childNodes.getLength(); i++) {
Node node = childNodes.item(i);
if (node.getNodeType() == Node.ELEMENT_NODE) {
elementChildren.add((Element) node);
}
}

return elementChildren;
}

}
2 changes: 1 addition & 1 deletion integration/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<parent>
<artifactId>runtime</artifactId>
<groupId>org.assimbly</groupId>
<version>5.0.0</version>
<version>5.0.1</version>
</parent>

<name>integration</name>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,11 +262,30 @@ public interface Integration {
* Gets the stats of an integration
*
* @param mediaType (xml or json)
* @throws Exception if flow doesn't start
* @return returns number of messages
* @throws Exception if stats can't be retrieved
* @return returns stats of integration (system)
*
*/
public String getStats(String mediaType) throws Exception;

/**
* Gets the stats of all steps
*
* @param mediaType (xml or json)
* @throws Exception if stats can't be retrieved
* @return returns stats of integration (system)
*/
public String getStepsStats(String mediaType) throws Exception;

/**
* Gets the stats of all flows
*
* @param mediaType (xml or json)
* @throws Exception if stats can't be retrieved
* @return returns stats of integration (system)
*/
public String getFlowsStats(String mediaType) throws Exception;

/**
* Gets the stats of an integration
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,10 @@ public String testConnection(String host, int port, int timeOut) {

public abstract String getStats(String mediaType) throws Exception;

public abstract String getFlowsStats(String mediaType) throws Exception;

public abstract String getStepsStats(String mediaType) throws Exception;

public abstract String getMessages(String mediaType) throws Exception;

public abstract String getStatsByFlowIds(String flowIds, String filter, String mediaType) throws Exception;
Expand Down
Loading