Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
49967e6
Changed fields name from Count.java so it will automatically extract …
mavroudo Mar 2, 2025
76ba4b9
Extended the SparkConf object to include 3 jars. These jars allow Spa…
mavroudo Mar 2, 2025
f5f5718
Modified functions getEventNames, getEventPairs, getCounts, getCountF…
mavroudo Mar 2, 2025
bb654a3
Was able to get a simple detection query to work on cluster. Pending …
mavroudo Mar 2, 2025
6ea7d55
Can support from/till queries in the detection
mavroudo Mar 3, 2025
5d2e9b5
Can support group of traces now
mavroudo Mar 3, 2025
d3d09eb
Removing unused methods and adding some commenting
mavroudo Mar 4, 2025
dab59e9
Restructure the code in the SparkDatabaseRepository.java and S3Connec…
mavroudo Mar 4, 2025
28eaf26
Was able to extract the complete set of constraints for positions, ex…
mavroudo Mar 4, 2025
127055a
Was able to extract the complete set of constraints for positions, ex…
mavroudo Mar 4, 2025
cf8b764
Simple order relation extraction works.
mavroudo Mar 4, 2025
a39a8f8
All ordered relations (along with the QueryPlanDeclareAll.java) are w…
mavroudo Mar 5, 2025
75b91bb
All state extractions are executed and output is coming out
mavroudo Mar 5, 2025
df6f4f6
Remove references to RDD
mavroudo Mar 5, 2025
3a29132
Some minor fixes including DeclareController, the Dockerfile and a te…
mavroudo Mar 5, 2025
bb23364
merged with current version of main
mavroudo Mar 12, 2025
28ae0b6
Fixed so that delta can work together with parquet. Also made sure th…
mavroudo Mar 12, 2025
223c210
Exploration query fix - Sorting IMR events
antliarokapis Mar 17, 2025
f322902
Fixed the issue where single detection was not working correctly
mavroudo Mar 20, 2025
1decc52
Fixed issue with matches that contained null events
mavroudo Mar 20, 2025
95ec7f4
Merge branch 'exploration' into cluster
mavroudo Mar 20, 2025
91de196
Fixed issue with matches that contained null events
mavroudo Mar 20, 2025
3293330
Fixed issue with Kleene* when working in PatternDetectionSingle. The …
mavroudo Apr 3, 2025
1dbe9e1
Merge pull request #11 from siesta-tool/bugfix_detection
mavroudo Apr 3, 2025
d3bf47c
Fixed issue that exploration was not working correctly for a single e…
mavroudo Apr 4, 2025
b0550cd
Added CassandraConnector
balaktsis Oct 13, 2025
4800750
Exploit index's extra info in cassandra
balaktsis Oct 13, 2025
eac2cad
Adjusting output
balaktsis Oct 14, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
target/classes/*
target/test-classes/*
target/*
src/main/resources/lib/*.jar
src/main/resources/jars/*.jar
siesta-query-processor.iml
experiments/*

Expand Down
11 changes: 9 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ FROM ubuntu:20.04

#ENV JAVA_HOME="/usr/lib/jvm/default-jvm/"

RUN apt-get update && apt-get install -y openjdk-17-jdk maven && \
RUN apt-get update && apt-get install -y openjdk-17-jdk maven wget && \
echo "export JAVA_HOME=$(dirname $(dirname $(readlink -f $(which java))))" >> /etc/profile.d/java.sh
ENV JAVA_HOME=/usr/lib/jvm/java-17-openjdk-amd64
ENV PATH=$PATH:${JAVA_HOME}/bin

ENV JARS_DIR=/code/src/main/resources/jars


# Install maven
Expand All @@ -20,8 +20,15 @@ RUN mvn dependency:resolve

# Adding source, compile and package into a fat jar
ADD src /code/src
RUN test -f ${JARS_DIR}/hadoop-aws-3.3.4.jar || wget -P ${JARS_DIR} https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.3.4/hadoop-aws-3.3.4.jar
RUN test -f ${JARS_DIR}/aws-java-sdk-bundle-1.12.262.jar || wget -P ${JARS_DIR} https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.12.262/aws-java-sdk-bundle-1.12.262.jar
RUN test -f ${JARS_DIR}/hadoop-client-3.3.4.jar || wget -P ${JARS_DIR} https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-client/3.3.4/hadoop-client-3.3.4.jar
RUN test -f ${JARS_DIR}/delta-spark_2.12-3.3.0.jar || wget -P ${JARS_DIR} https://repo1.maven.org/maven2/io/delta/delta-spark_2.12/3.3.0/delta-spark_2.12-3.3.0.jar
RUN test -f ${JARS_DIR}/delta-storage-3.3.0.jar || wget -P ${JARS_DIR} https://repo1.maven.org/maven2/io/delta/delta-storage/3.3.0/delta-storage-3.3.0.jar
RUN mvn clean compile package -f pom.xml -DskipTests

# Making sure jars are where they should be


CMD ["java", "--add-exports", "java.base/sun.nio.ch=ALL-UNNAMED" , "-jar", "target/siesta-query-processor-3.0.jar"]
#ENTRYPOINT ["tail", "-f", "/dev/null"]
17 changes: 3 additions & 14 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
version: '3.7'
networks:
cluster_net:
external: true
siesta-net:
name: siesta-net

services:
Expand All @@ -17,24 +16,13 @@ services:
s3.user: minioadmin
s3.key: minioadmin
s3.timeout: 600000
#for cassandra
cassandra.max_requests_per_local_connection: 32768
cassandra.max_requests_per_remote_connection: 22000
cassandra.connections_per_host: 1000
cassandra.max_queue_size: 1024
cassandra.connection_timeout: 30000
cassandra.read_timeout: 30000
spring.data.cassandra.contact-points: cassandra
spring.data.cassandra.port: 9042
spring.data.cassandra.user: cassandra
spring.data.cassandra.password: cassandra
server.port: 8090 # port of the application
volumes:
- ./build:/root/.m2
ports:
- '8090:8090'
networks:
- cluster_net
- siesta-net

# spark-master:
# image: bitnami/spark:3.5.4
Expand Down Expand Up @@ -76,3 +64,4 @@ services:
# - spark-master
# networks:
# - cluster_net

29 changes: 29 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,25 @@
<!-- <version>2.1.0</version>-->
<!-- </dependency>-->

<!-- ScyllaDB/Cassandra dependencies -->
<dependency>
<groupId>com.datastax.spark</groupId>
<artifactId>spark-cassandra-connector_2.12</artifactId>
<version>3.4.1</version>
</dependency>

<dependency>
<groupId>com.datastax.oss</groupId>
<artifactId>java-driver-core</artifactId>
<version>4.17.0</version>
</dependency>

<dependency>
<groupId>com.datastax.oss</groupId>
<artifactId>java-driver-query-builder</artifactId>
<version>4.17.0</version>
</dependency>

<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-spark_2.12</artifactId>
Expand All @@ -239,6 +258,16 @@
<build>
<!-- <sourceDirectory>src/main/java</sourceDirectory>-->
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.11.0</version>
<configuration>
<source>17</source>
<target>17</target>
<release>17</release>
</configuration>
</plugin>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,18 @@ public List<Occurrences> evaluate(SIESTAPattern pattern, Map<String, List<Event>
Occurrences ocs = new Occurrences();
ocs.setTraceID(e.getKey());
for (Match m : ec.getMatches()) {
ocs.addOccurrence(new Occurrence(Arrays.stream(m.getEvents()).parallel()
.map(x -> (SaseEvent) x)
.map(SaseEvent::getEventBoth)
.collect(Collectors.toList())));
List<EventBoth> eventBothList = new ArrayList<>();
try {
for (edu.umass.cs.sase.stream.Event event : m.getEvents()) {
SaseEvent sEvent = (SaseEvent) event;
EventBoth both = sEvent.getEventBoth();
eventBothList.add(both);
}
Occurrence oc = new Occurrence(eventBothList);
ocs.addOccurrence(oc);
}catch (Exception ignored){

}
}
occurrences.add(ocs);
}
Expand Down Expand Up @@ -98,10 +106,18 @@ public List<GroupOccurrences> evaluateGroups(SIESTAPattern pattern, Map<Integer,
GroupOccurrences ocs = new GroupOccurrences();
ocs.setGroupId(e.getKey());
for (Match m : ec.getMatches()) {
ocs.addOccurrence(new Occurrence(Arrays.stream(m.getEvents()).parallel()
.map(x -> (SaseEvent) x)
.map(SaseEvent::getEventBoth)
.collect(Collectors.toList())));
List<EventBoth> eventBothList = new ArrayList<>();
try {
for (edu.umass.cs.sase.stream.Event event : m.getEvents()) {
SaseEvent sEvent = (SaseEvent) event;
EventBoth both = sEvent.getEventBoth();
eventBothList.add(both);
}
Occurrence oc = new Occurrence(eventBothList);
ocs.addOccurrence(oc);
}catch (Exception ignored){

}
}
occurrences.add(ocs);
}
Expand Down Expand Up @@ -132,10 +148,18 @@ public List<Occurrences> evaluateSmallPatterns(SIESTAPattern pattern, Map<String
Occurrences ocs = new Occurrences();
ocs.setTraceID(e.getKey());
for (Match m : ec.getMatches()) {
ocs.addOccurrence(new Occurrence(Arrays.stream(m.getEvents()).parallel()
.map(x -> (SaseEvent) x)
.map(SaseEvent::getEventBoth)
.collect(Collectors.toList())));
List<EventBoth> eventBothList = new ArrayList<>();
try {
for (edu.umass.cs.sase.stream.Event event : m.getEvents()) {
SaseEvent sEvent = (SaseEvent) event;
EventBoth both = sEvent.getEventBoth();
eventBothList.add(both);
}
Occurrence oc = new Occurrence(eventBothList);
ocs.addOccurrence(oc);
}catch (Exception ignored){

}
}
occurrences.add(ocs);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ private Tuple2<List<IndexPair>, List<IndexPair>> splitLogInstances(String log_da
List<IndexPair> legitimateInstances = new ArrayList<>();
List<IndexPair> illegitimateInstances = new ArrayList<>();
indexRecords.forEach((et, instances) -> instances.forEach(instance -> {
if (legitimateTraces.contains(instance.getTraceId())) {
if (legitimateTraces.contains(instance.getTrace_id())) {
legitimateInstances.add(instance);
} else {
illegitimateInstances.add(instance);
Expand All @@ -192,8 +192,8 @@ private Tuple2<List<IndexPair>, List<IndexPair>> splitLogInstances(String log_da
*/
private double getCV(Count pair) {
double norm_factor = 1.0 / pair.getCount();
double mean = (double) pair.getSum_duration() / pair.getCount();
double var = norm_factor * (pair.getSum_squares() - norm_factor * Math.pow(pair.getSum_duration(), 2));
double mean = (double) pair.getSumDuration() / pair.getCount();
double var = norm_factor * (pair.getSumSquares() - norm_factor * Math.pow(pair.getSumDuration(), 2));
return var / mean;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,31 +1,22 @@
package com.datalab.siesta.queryprocessor.declare;

import com.datalab.siesta.queryprocessor.declare.model.EventPairToTrace;
import com.datalab.siesta.queryprocessor.declare.model.EventSupport;
import com.datalab.siesta.queryprocessor.declare.model.OccurrencesPerTrace;
import com.datalab.siesta.queryprocessor.declare.model.UniqueTracesPerEventPair;
import com.datalab.siesta.queryprocessor.declare.model.UniqueTracesPerEventType;
import com.datalab.siesta.queryprocessor.declare.model.*;
import com.datalab.siesta.queryprocessor.declare.model.declareState.ExistenceState;
import com.datalab.siesta.queryprocessor.declare.model.declareState.NegativeState;
import com.datalab.siesta.queryprocessor.declare.model.declareState.OrderState;
import com.datalab.siesta.queryprocessor.declare.model.declareState.PositionState;
import com.datalab.siesta.queryprocessor.declare.model.declareState.UnorderStateI;
import com.datalab.siesta.queryprocessor.declare.model.declareState.UnorderStateU;
import com.datalab.siesta.queryprocessor.model.DBModel.IndexPair;
import com.datalab.siesta.queryprocessor.model.DBModel.Trace;
import com.datalab.siesta.queryprocessor.model.Events.EventBoth;
import com.datalab.siesta.queryprocessor.model.Events.EventPos;
import com.datalab.siesta.queryprocessor.storage.model.EventTypeTracePositions;
import com.datalab.siesta.queryprocessor.storage.model.Trace;
import com.datalab.siesta.queryprocessor.storage.DatabaseRepository;

import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.functions;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import scala.Tuple2;
import scala.Tuple3;

import java.util.List;
import java.util.Map;

@Service
public class DeclareDBConnector {
Expand All @@ -37,62 +28,60 @@ public DeclareDBConnector(DatabaseRepository databaseRepository){
this.db=databaseRepository;
}

public JavaRDD<Trace> querySequenceTableDeclare(String logName){
public Dataset<Trace> querySequenceTableDeclare(String logName){
return db.querySequenceTableDeclare(logName);
}

public JavaRDD<UniqueTracesPerEventType> querySingleTableDeclare(String logname){
public Dataset<UniqueTracesPerEventType> querySingleTableDeclare(String logname){
return this.db.querySingleTableDeclare(logname);
}

public JavaRDD<EventSupport> querySingleTable(String logname){
public Dataset<EventSupport> querySingleTable(String logname){
return this.db.querySingleTable(logname);
}


public JavaRDD<UniqueTracesPerEventPair> queryIndexTableDeclare(String logname){
public Dataset<UniqueTracesPerEventPair> queryIndexTableDeclare(String logname){
return this.db.queryIndexTableDeclare(logname);
}

public JavaRDD<IndexPair> queryIndexTableAllDeclare(String logname){
return this.db.queryIndexTableAllDeclare(logname);
}

public JavaPairRDD<Tuple2<String,String>, List<Integer>> querySingleTableAllDeclare(String logname){
public Dataset<EventTypeTracePositions> querySingleTableAllDeclare(String logname){
return this.db.querySingleTableAllDeclare(logname);
}


public JavaRDD<EventPairToTrace> queryIndexOriginalDeclare(String logname){
public Dataset<EventPairToTrace> queryIndexOriginalDeclare(String logname){
return this.db.queryIndexOriginalDeclare(logname);
}

public Map<String,Long> extractTotalOccurrencesPerEventType(String logname){
return this.querySingleTableDeclare(logname)
.map(x -> {
long all = x.getOccurrences().stream().mapToLong(OccurrencesPerTrace::getOccurrences).sum();
return new Tuple2<>(x.getEventType(), all);
}).keyBy(x -> x._1).mapValues(x -> x._2).collectAsMap();
public Dataset<EventTypeOccurrences> extractTotalOccurrencesPerEventType(String logname){
Dataset<EventTypeOccurrences> eventTypeOccurrencesDataset= this.querySingleTableDeclare(logname)
.withColumn("numberOfTraces", functions.expr(
"aggregate(occurrences, 0, (acc, a) -> acc + a.occs)"
))
.selectExpr("eventName", "numberOfTraces")
.as(Encoders.bean(EventTypeOccurrences.class));
return eventTypeOccurrencesDataset;
}

public JavaRDD<PositionState> queryPositionState(String logname){
public Dataset<PositionState> queryPositionState(String logname){
return this.db.queryPositionState(logname);
}

public JavaRDD<ExistenceState> queryExistenceState(String logname){
public Dataset<ExistenceState> queryExistenceState(String logname){
return this.db.queryExistenceState(logname);
}

public JavaRDD<UnorderStateI> queryUnorderStateI(String logname){
public Dataset<UnorderStateI> queryUnorderStateI(String logname){
return this.db.queryUnorderStateI(logname);
}
public JavaRDD<UnorderStateU> queryUnorderStateU(String logname){
public Dataset<UnorderStateU> queryUnorderStateU(String logname){
return this.db.queryUnorderStateU(logname);
}
public JavaRDD<OrderState> queryOrderState(String logname){
public Dataset<OrderState> queryOrderState(String logname){
return this.db.queryOrderState(logname);
}
public JavaRDD<NegativeState> queryNegativeState(String logname){
public Dataset<NegativeState> queryNegativeState(String logname){
return this.db.queryNegativeState(logname);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package com.datalab.siesta.queryprocessor.declare;

import com.datalab.siesta.queryprocessor.declare.model.EventPairToNumberOfTrace;
import com.datalab.siesta.queryprocessor.model.Events.Event;
import com.datalab.siesta.queryprocessor.model.Events.EventPair;
import org.apache.spark.api.java.JavaRDD;
import com.datalab.siesta.queryprocessor.model.DBModel.EventTypes;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.jvnet.hk2.annotations.Service;
import org.springframework.stereotype.Component;

Expand All @@ -22,21 +22,24 @@ public DeclareUtilities() {
* @param joined a rdd containing all the event pairs that occurred
* @return a set of all the event pairs that did not appear in the log database
*/
public Set<EventPair> extractNotFoundPairs(Set<String> eventTypes,JavaRDD<EventPairToNumberOfTrace> joined) {
public Set<EventTypes> extractNotFoundPairs(Set<String> eventTypes, Dataset<EventPairToNumberOfTrace> joined) {

//calculate all the event pairs (n^2) and store them in a set
//event pairs of type (eventA,eventA) are excluded
Set<EventPair> allEventPairs = new HashSet<>();
Set<EventTypes> allEventPairs = new HashSet<>();
for (String eventA : eventTypes) {
for (String eventB : eventTypes) {
if (!eventA.equals(eventB)) {
allEventPairs.add(new EventPair(new Event(eventA), new Event(eventB)));
allEventPairs.add(new EventTypes(eventA, eventB));
}
}
}
//removes from the above set all the event pairs that have at least one occurrence in the
List<EventPair> foundEventPairs = joined.map(x -> new EventPair(new Event(x.getEventA()), new Event(x.getEventB())))
.collect();
List<EventTypes> foundEventPairs = joined
.select("eventA","eventB")
.as(Encoders.bean(EventTypes.class))
.collectAsList();

foundEventPairs.forEach(allEventPairs::remove);
return allEventPairs;
}
Expand Down
Loading