Conformance Checking for Spark is a Python library which enables users to execute alignment problems in a distributed environment. This solution enhances the execution time of alignment problems when (1) The event logs are too large, with huge numbers of events and traces, and (2) when the PNML model is too complex. Hence, this tool is founded in the following principles:
- When event logs are too large, storing all of them in a single
xes
file might be quite tricky. In those cases, logs are usually stored on distributed systems (for example, HDFS or databases). This tool supports consuming event logs from distributed systems, transforming them in accordance to theOpen XES
standard, an enablingpm4py
algorithms to apply its algorithms trace by trace in a distributed basis. Note: not all algorithms frompm4py
can be applied trace by trace. Note 2: although this library provides native support for Conformance Checking algorithms, the user is free to employ any other algorithm or process mining technique which could be applied trace by trace. Please check section "Creating RDD of traces from a XES file". - Regarding the Conformance Checking algorithms, these can be applied individually trace by trace against a PNML model (or partial PNML models, see next bullet). It enables to generate subproblems which are distributed among the cluster nodes, preventing the creation of bottlenecks.
- For complex PNML models, we propose a novel horizontal acyclic model decomposition. In this way, complex PNML models can be decomposed in partial models. This enables to build smaller subproblem, distribute them, and execute them in parallel. In order to find out more about this decomposition technique, please visit this website.
This library is based on pm4py, whose algorithms are employed for the computation of the alignments.
Python 3.7 is required.
Requirements:
- pm4py==2.2.8
- pyspark==3.0.1
- numpy==1.19.2
- pandas==1.2.4
- pypandoc==1.5
Structure of the library:
conformancechecking4spark
|- alignments.py (1)
|--- DistributedAlignmentConfiguration (1.1)
|--- DistributedAlignmentProblem (1.2)
|- heuristics (2)
|--- algorithm.py (2.1)
|--- utils.py (2.2)
|- utils.py
- (1) The
alignments.py
module includes those classes which encapsulate the logic for the distributed alignment problems. - (1.1) The
DistributedAlignmentConfiguration
wraps the configuration of a distributed alignment problem. It enables to construct problems in twofold:
- By means of its constructor, or
- By means of its builder, which is invoked in the following way:
DistributedAlignmentConfiguration.builder
Once theDistributedAlignmentConfiguration
object has been built, invoking the methodapply
will generate a RDD by scheduling all the necessary transformations. This RDD is wrapped in aDistributedAlignmentProblem
object.
- (1.2) The
DistributedAlignmentProblem
wraps the RDD which entails the logic of the problem distribution with the configuration previously established. - (2) The
heuristic
module includes those algorithms which might be employed as heuristics (2.1) and utility functions (2.2) for supporting the implementation of those functions.
The constructor of this class receives the minimum required parameters. These are:
log_rdd
. The RDD of event logs (a event in the format required by the pm4py library)pm_rdd
. The RDD of partial models (a PNML model or a set of them in the format required by the pm4py library).log_slices
. Number of slices in which the RDD of logs is split.pm_slices
. Number of slices in which the RDD of (partial) models is split.global_timeout
. Sets thePARAM_MAX_ALIGN_TIME
parameter of the pm4py algorithm. Default isNone
.trace_timeout
. Sets thePARAM_MAX_ALIGN_TIME_TRACE
parameter of the pm4py algorithm. Default isNone
algorithm
. Sets theparapeters
property of the pm4py algorithm. It is a dictionary in which the algorithm can be specified please refer to the documentation of pm4py for further details. Default isNone
, so it will employ the default algorithm defined by that library (A*).heuristic
. Sets the heuristic function to use in order to optimise the execution of the slices (partitions) of subproblems. Default isNone
, but setting a heuristic function is highly recommended. This library includes by defaultheuristic.algorithm.sum_of_differences
, but it must be explicitly indicated.
When the apply
method is called, a RDD
is created in accordance to the configuration that has been specified. Remark
that the operations are simply schedule in the RDD
, but these are not executed yet. A DistributedAlignmentProblem
object is created, wrapping that RDD
.
We strongly recommend constructing the Distributed Alignment Problems by using the builder. It can be accessed in this way:
conformancechecking4spark.alignments.DistributedAlignmentConfiguration.builder
The following methods are available to set the configuration of the alignment problem.
Required parameters:
set_spark_session
: it receives a SparkSession object. Required to generate the problem.set_log_slices
: sets the number of slices for the RDD of logs. Required.set_pm_slices
: sets the number of slices for the RDD of models. Required.
Setting up the RDD of logs: it can be set, created or generated by calling just one of the following methods:
set_log_rdd
: set a previously created RDD of traces.set_log_df
: set a DataFrame representing event logs. The RDD of logs will be automatically generated when calling thebuild
method. It is necessary to specify the columns which form the case ids, task ids, and event timestamps. Otherwise, default values are employed:
def set_log_df(self, log_df, case_id="case:concept:name", task_id="concept:name", event_timestamp="time:timestamp")
set_log
: set a nativepm4py
event log. The RDD of logs will be automatically generated when calling thebuild
method.set_log_path
: set a path to a XES event log file. It can be stored in any storage system supported by Apache Spark (HDFS, for example). The RDD of logs will be automatically generated when calling thebuild
method.
Setting up the RDD of models: it can be set, created or generated by calling just one of the following methods:
set_pm_rdd
: set a previously created RDD of models.set_pm
,set_pms
: set a single model or a list of models. The RDD of models will be automatically generated when calling thebuild
method.set_pm_path
: set a path to either a single PNML file, or a directory containing a bunch of PNML files. It can be stored in any storage system supported by Apache Spark (HDFS, for example). The RDD of models will be automatically generated when calling thebuild
method.
Optional setters:
set_pnml_variant
set_pnml_parameters
set_global_timeout
set_trace_timeout
set_algorithm
set_heuristic
When calling the build
method, a DistributedAlignmentProblemConfiguration
object is created, wrapping the
configuration specified by the user.
These objects enable to (1) retrieve the RDD
object with the operations that solve the alignment problems (rdd()
),
(2) retrieve a DataFrame
object wrapping the RDD
with the operations that solve the alignment problems, and with a
tabular structure (df
) and (3) print the results of the alignment problems (print()
).
If you retrieve the RDD
or the DataFrame
object, bear in mind that the alignment problems are not executed until an
action is called.
This library supports the creation of RDDs representing event logs following the OpenXES
standard. These RDDs are
modelled as RDD of Trace
objects
(see the official pm4py repository).
RDD of Trace objects can be created in twofold.
Use the log_rdd
module, and call the function create_from_xes
. A SparkSession
object and the path to the XES file
are required. The path to this file can be stored any storage system supported by Apache Spark (e.g., hdfs).
from conformancechecking4spark import log_rdd
log_rdd.create_from_xes(spark, "hdfs://hdfs-server:8020/path/to/file.xes")
Access to a database or storage system by using the Apache Spark DataFrame API. Once created, use the log_rdd
module
and call the function format_df
. It receives a DataFrame object, and the following optional parameters:
case_id="case:concept:name", task_id="concept:name", event_timestamp="time:timestamp"
case_id
: the name of the DataFrame column which contains the case ids (default:case:concept:name
).task_id
: the name of the DataFrame column which contains the task ids (default:concept:name
)event_timestamp
: the name of the column which contains the event timestamps (default:time:timestamp
)
from conformancechecking4spark import log_rdd
# From MongoDB (required MongoDB library)
df = spark.read.format("mongo").option("uri", "mongodb://mongo-server/db.eventLogs").load()
log_rdd.format_df(df)
# Explicitly indicating the case_id, task_id and event_timestamp columns:
log_rdd.format_df(df, case_id="user", task_id="action", event_timestamp="action_date")
Please bear in mind that all dependencies must be satisfied.
Example datasets can be found here.
The following snippet would create a local distributed alignment problem. The event log data is specified by using the
set_log_path
method, and the directory in which the partial models are found by using the method set_pm_path
. Then,
the slices of each set is specified by the set_log_slices
and set_pm_slices
methods. The heuristic is set by the
set_heuristic
method, receiving the heuristic function sum_of_differences
located in
conformancechecking4spark.heuristics.algorithm
.
Finally, the configuration object is built, and then it is executed with the print
method, which will print in console
the results.
from conformancechecking4spark.utils import create_default_spark_session
from conformancechecking4spark.alignments import DistributedAlignmentConfiguration
import os
import config
from conformancechecking4spark.heuristics.algorithm import sum_of_differences
spark_session = create_default_spark_session()
config = DistributedAlignmentConfiguration.builder \
.set_spark_session(spark_session) \
.set_log_path(os.path.join(config.ROOT_DIR, 'data/M2.xes')) \
.set_pm_path(os.path.join(config.ROOT_DIR, 'data/M2')) \
.set_log_slices(1) \
.set_pm_slices(1) \
.set_heuristic(sum_of_differences)\
.build()
config.apply().df().show()
Requirements: You need a cluster based on Apache Spark 3.0.1. Each node must have all the required dependencies installed. Also, Python 3.7 is required.
First, package this project with the following command: python setup.py bdist_egg
(note that for this purpose, the
python module setuptools
is required). It will produce a .egg
file (let's call it conformancechecking4spark.egg).
Secondly, you have to create a spark_submit.py
file. This is the entrypoint from the point of view of you Spark
cluster. The only purpose is to create and solve the distributed alignment problem. Next, an example is provided, in
which the M2 dataset is employed.
from pyspark.sql import SparkSession
from conformancechecking4spark.heuristics.algorithm import sum_of_differences
from conformancechecking4spark.alignments import DistributedAlignmentConfiguration
# Create the SparkSession object properly configured
spark_session = SparkSession.builder\
.appName("pm4py_test")\
.config("spark.executor.memory", "4g") \
.config("spark.driver.memory", "4g") \
.config("spark.executor.cores", 4) \
.config("spark.driver.cores", 1) \
.config("spark.jars.packages", "com.databricks:spark-xml_2.12:0.12.0")\
.config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1") \
.config("spark.mongodb.input.uri", "mongodb://mongo-uri:27017/cc4spark.results") \
.config("spark.mongodb.output.uri", "mongodb://mongo-uri:27017/cc4spark.results") \
.getOrCreate()
# Specify the path to the XES file and to the directory where the models are stored.
# Then, select the number of slices for each set
config = DistributedAlignmentConfiguration.builder \
.set_spark_session(spark_session) \
.set_log_path("hdfs://hdfs-uri:8020/path/to/M2.xes") \
.set_pm_path("hdfs://hdfs-uri:8020/path/to/M2") \
.set_log_slices(500) \
.set_pm_slices(1) \
.set_heuristic(sum_of_differences)\
.build()
# Once build, get the DataFrame object with the results, and write them in MongoDB.
config.apply().df().write.format("mongo").mode("append").save()
Here is an example of spark-submit
command configuration:
spark-submit --master <master-uri> \
--deploy-mode cluster \
--conf spark.master.rest.enabled=true \
--conf spark.executor.uri=<path-to>spark-3.0.1-bin-hadoop2.7.tgz \
--conf spark.pyspark.python=<path-to-python> \
--packages com.databricks:spark-xml_2.12:0.12.0,org.mongodb.spark:mongo-spark-connector_2.12:3.0.1 \
--py-files http://<path-to>conformancechecking4spark-0.3.0-py3.7.egg \
http://<path-to>/spark_submit.py
IMPORTANT:
In order to submit the application, you have to upload the following files to a path wich is visible to all the nodes of
the cluster (spark_submit.py
and conformancechecking4spark.egg
).
spark-submit --master URL_MASTER \
--deploy-mode cluster \
--conf spark.master.rest.enabled=true \
--conf spark.executor.uri=PATH/spark-3.0.1-bin-hadoop2.7.tgz \
--conf spark.pyspark.python=/usr/bin/python3.7 \
--py-files PATH/conformancechecking4spark.egg \
PATH/spark_submit.py
- Timeouts are not fully supported. This will be fixed in future versions. By now, you can set a timeout by using the
set_trace_timeout
method, but bear in mind that:
- If a timeout is specified, it is not possible to guarantee that the alignments found are optimal (please check our research article.)
- If a timeout is specified, exceptions due to null values (None type) might raise.
Copyright (C) 2021 IDEA Research Group (IC258: Data-Centric Computing Research Hub)
In keeping with the traditional purpose of furthering education and research, it is the policy of the copyright owner to permit non-commercial use and redistribution of this software. It has been tested carefully, but it is not guaranteed for any particular purposes. The copyright owner does not offer any warranties or representations, nor do they accept any liabilities with respect to them.
If you use this tool, we kindly ask to to reference the following research article:
[1] Valencia-Parra, Á., Varela-Vaca, Á. J., Gómez-López, M. T., Carmona, J., & Bergenthum, R. (2021). Empowering conformance checking using Big Data through horizontal decomposition. Information Systems, 99, 101731. https://doi.org/10.1016/j.is.2021.101731
This work has been partially funded by the Ministry of Science and Technology of Spain ECLIPSE (RTI2018-094283-B-C33) project, the European Regional Development Fund (ERDF/FEDER), MINECO (TIN2017-86727-C2-1-R), and by the University of Seville with VI Plan Propio de Investigación y Transferencia (VI PPIT-US).