This project aims to create an open sourced option for setting up a Unified Namespace for IIOT transformation.
My objective is to build an open source, free to use UNS solution for the community which can be enhanced and adapted by other enthusiasts.
All components used in this solution are community versions and I do not own any rights on them. Most of them also provide a commercial / enterprise version which may also be considered to have better tool support. I also used this, as an opportunity to learn Python.
If you are looking for an alternative Unified Namespace implementation with enterprise support, check out the United Manufacturing Hub, which is an Open-Source Helm Chart for Kubernetes.
A Unified Namespace is an architecture that establishes a centralized repository of data, events, information, and context across all IT and OT systems where any application or device can consume or publish data needed for a specific action via an event-driven and loosely coupled architecture along with the relevant context and history
This is a critical concept to allow scalability by preventing point to point connectivity.
- Video explaining UNS
- UNS Q&A by Walter Reynolds
- Event driven architecture on Wikipedia
- Advantages of Event Driven Architecture
- Unified Namespace as extended event-driven architecture
The overall architecture and the deployment setup is as follows
-
Factory1
- K8s Cluster on the edge
- MQTT edge installed on K8s
- Bridge between Factory1 and the Enterprise MQTT clusters
- Graph DB installed and running on docker
- UNS graphdb client to persist messages to the Graph DB instance
- UNS SparkplugB client to translate message from SparkPlug to UNS
-
Factory2
- K8s Cluster on the edge
- MQTT edge installed on K8s
- Bridge between Factory2 and the Enterprise MQTT clusters
- Graph DB installed and running on docker
- UNS graphdb client to persist messages to the Graph DB instance
- UNS SparkplugB client to translate message from SparkPlug to UNS
-
Enterprise on Cloud
- K8s Cluster of the enterprise
- MQTT Broker installed on K8s
- TimescaleDB installed and running on docker / cluster / K8s / hosted service
- Graph DB installed and running on docker / cluster / K8s / hosted service
- Kafka cluster/ K8s / hosted service
- GraphQL service running and connected to the cloud data stores
- UNS graphdb client to persist messages to the Graph DB instance
- UNS historian client to persist messages to the Graph DB instance
- UNS Kafka listener to stream/convert MQTT messages to the Kafka instance
The following section lists the various options and technology choices that I evaluated and the reasoning for choosing them. This should hopefully also give you possible alternatives to consider if you choose to implement and extend this for your needs. The opinions below are my personal ones with no influence from the companies that built them
To run the MQTT broker on the Edge, a cluster is not a prerequisite. If you do not have a business need for a high availability MQTT cluster, running just a single instance ( probably within a docker) would be a lot more easier.
Even for a clustered setup most of the MQTT brokers do provide an option for clustering however running this cluster on K8s provides significant benefits for scaling, auto healing etc.
I evaluated the following K8s options because it needs to be a extremely light weight and high performant distribution to be able to run on the edge (constrained environments). Any of these is a perfectly good choice depending on your context.
There are quite some comparisons between the the k8s distributions on the net so I am not going to list detailed comparison here.
I finally choose to go ahead with MicroK8s because
-
Most of my environment was on Ubuntu hence enabling a snap for microk8s was very easy
-
The inbuilt addons and the ease of enabling them without wading through YAML files
-
The default CNI provided for the cluster is Calico which claimed to be more performant
-
Setting up a High Availability cluster is extremely easy by adding multiple master nodes
Note: Avoid setting up high availability and multiple masters for your edge cluster as this increases the resource consumption & load on the edge devices. A single master node should suffice majority of your availability requirements if you actually even need a k8s cluster on the edge in the first place.
-
Having a bit more experience with Ubuntu I found the documentation and guides a lot more easy to find and follow, including the community support, especially troubleshooting.
However microk8s did show up some limitations as well as bugs. Details of these are in 01_k8scluster. The link will provide details of all the addons, workarounds etc. that I did for bringing up my cluster. If you choose to setup your k8s with a different distribution, each of those addons could be setup / configured albeit in a different manner.
Some key limitations to bear in mind
- I faced some stability issues while trying to run this lower raspberry pi (pi3)
- microk8s is not available for every linux distribution.
The backbone of the Unified Name Space is the MQTT broker.
The overall structure of the UNS is based on the hierarchical structure as defined in ISA-95 part 2.
<enterprise>/<facility>/<area>/<line>/<device>
The level at which the message is published has a direct implication on it's time sensitivity as well as guidance on being processed at the edge or on the cloud.
I evaluated and read the user guides of the following brokers (open source versions only). All three also provide commercial / enterprise versions which is recommended for more robust setup and professional support
While HIVEMQ has the best documentation and community support I decided try out EMQX for the following reasons
- EMQX is written in erlang which has a lower footprint than java (HIVEMQ).
- They also provide 2 versions of the broker, one specifically lightweight for edge deployment and the standard for enterprise or cloud deployment.
The details of setting up the MQTT cluster are provided in 02_mqtt-cluster. The link provides the guidance to install EMQX on a K8s cluster using helm.
Having said that, any of the above three would be perfectly good selections because
-
All the three have extension capabilities via standard as well as custom plugins. However I liked the rules plugin from EMQX which comes by default allowing for lot of flexibility for pre and post processing messages. Also EMQX seems to be supporting the ability to create plugins in multiple languages
-
All three deploy very easily on K8s and all three have community (free) as well as commercial offering
-
All three support MQTT 5 which is critical for manufacturers. e.g. The concept of Shared Subscriptions enables clustering of the subscribers in order to better scale message processing if needed)
-
All three support Sparkplug B
-
All three support MQTT bridging allowing copying data between edge to cloud instances
-
Both HiveMQ and EMQX provide fully managed cloud services which might be interesting offer to explore for your cloud / enterprise MQTT Cluster
Important Note: The community edition of these brokers do not provide all functionalities. e.g. EMQX community doesn't allow plugins to be triggered on message delivery (this is an enterprise feature). As I wanted this solution to be completely open source and free, I decided to write an MQTT client subscribing to
"#"
. This works but is less efficient than creating a plugin within the broker and natively persisting the messages to a database. You can further optimize this by subscribing to a subset e.g."<enterprise>/#"
However if you go for the enterprise version, I would recommend creating a plugin instead of the MQTT Listeners provided here for better performance. But for most scenarios, an MQTT client should suffice and be broker independent.
Hence I decided to write my own plugin as an MQTT client which listens to the broker and on message persists the message ( either the GraphDB or the Historian)
Normally I configured the MQTT publishers to publish messages with retain flag so that consumers are able to get the latest message even if they weren't connected with broker at the time of publishing.
However I realized that, in order to merge messages, or provide the capability to add relationships across multiple messages, MQTT alone will not be able to support that. Hence after some deliberation decided to use a Graph Database.
This provide the flexibility of defining relationships , simple way representing your object hierarchy as well as support merging of attributes
I choose to go with Neo4J simply because it was the only graphDB I was aware of as well as the fact that it runs seamlessly on Kubernetes. The GraphDB also allows for extremely fine grained access control across the nodes, specific sections of the tree as well as limit access to specific properties. Refer Neo4j - Access Control
Important Note: The clustering feature of neo4j on K8s is an enterprise feature and not available in the community version
The other critical component of the Unified Name Space is the historian. This allows to keep a full history of all messages, entities and artifacts generated. Since the graph databases are not suited for historian data (there were a couple of projects enhancing Neo4j but all were archived), it makes sense to delegate that to specialist.
I evaluated and read the user guides of the following historians
- InfluxDb combined with Telegraf
- TimescaleDB combined with MQTT Listeners
Both of these are excellent options and have significant user adoption. InfluxDb combined with Telegraph provide a strong low code approach to the integration. Telegraf however did not have a plugin for Neo4j and InfluxDb does not support K8s. Given the stronger stability of postgres (on which TimescaleDB is built) as well as support for JSON I decided to go ahead with TimescaleDB
For production systems you might want to consider the cloud versions of the historians (InfluxDB Cloud or TimescaleDB) for lower maintenance and higher scalability
Since I did not have the enterprise version of the MQTT brokers, I decided to develop a broker agnostic solution. Hence the MQTT client seems to be a the best option ( even if it may not be as performant as the Broker plugin/module).
- The MQTT listener to persist UNS messages & SPB messages to the GraphDB can be found at 03_uns_graphdb
- The MQTT listener to persist UNS messages & SPB messages to the Historian can be found at 04_uns_historian
- The MQTT listener to read SPB messages, translate and transform them to the UNS can be found at 05_sparkplugb
- The MQTT listener to publish UNS messages, to a kafka topic 06_uns_kafka
- A module which connects with all the data sources; Neo4j, TimescaleDB, Kafka and MQTT to provide GraphQL apis to query the UNS 07_uns_graphql
I choose to write the client in Python even thought Python is not as performant as Go, C or Rust primarily because
- In the OT space most professionals ( in my experience) were more familiar coding with Python than Go, C or Rust. Hence I hope this increases the adoptions and contributions from the community in further developing this tool
- Should a team want to further optimize the code, given the readability and the inline comments in the code, they are hopefully able to rewrite the application in their choice of language
Sparkplug B consist of three primary features in its definition.
- The first is the MQTT topic namespace definition.
- The second is the definition of the order and flow of MQTT messages to and from various MQTT clients in the system.
- The final is the payload data format. As the messages are published in the Sparkplug Namespace , they are not visible in the UNS hierarchy which is based on ISA-95 part 2. Also given that they are packaged in protocol buffers, these message payloads are not easily understandable and need some parsing / transformation to a JSON structure. This plugin listens on the SparkplugB topic hierarchy and translate the protocol buffer messages into appropriate UNS messages The detailed description of the plugin can be found at 05_sparkplugb
GraphQL is a query language for APIs and a runtime for executing those queries with your existing data. It allows clients to request only the data they need and nothing more, enabling precise and efficient data fetching. Some key benefits of adding this support to the UNS are:
- Simplified Data Access: A Unified Namespace typically brings together diverse data sources or systems into a single cohesive structure. By integrating GraphQL capabilities, it provides a unified and simplified way to access and query this diverse dataset. GraphQL's flexible querying allows for precise data retrieval, avoiding the need to interact with each individual data source separately.
- Consolidated Querying: With GraphQL, querying data from different sources becomes seamless. It allows for composing complex queries across multiple data sources within the Unified Namespace, retrieving precisely the required data without unnecessary overhead or complexity.
- Service/Node Discovery:Given the contextual and hierarchical nature of the UNS, the ability to search for specific Nodes and/or Properties will significantly simplify data discovery and facilitate easier consumption by providing a coherent interface to access the combined data in the Unified Namespace
- Dynamic Data Retrieval: GraphQL's nature allows for dynamic data retrieval, enabling clients to specify the exact fields, relationships, and data they need. This flexibility aligns well with the diverse nature of data sources within a Unified Namespace, allowing clients to fetch the required information efficiently.
The current project contains the following microservices
- 01_k8scluster: Scripts and utilities to create a K8s cluster (on the edge and in the cloud)
- 02_mqtt-cluster: Scripts and utilities to create a MQTT cluster (on the edge and in the cloud). Common python package for all uns mqtt listeners and sparkplugB generated code and helper code
- 03_uns_graphdb: Python project for mqtt listener that persists all message of the UNS and SparkplugB namespaces to a GraphDB. Spb messages are translated from protocol buffers to JSON prior to persisting
- 04_uns_historian: Python project for mqtt listener that persists all message of the UNS and SparkplugB namespaces to a Historian. Spb messages are translated from protocol buffers to JSON prior to persisting
- 05_sparkplugb: Python project for mqtt listener that listens to the SparkplugB namespace and for translates relevant messages to publish to the UNS namespace
- 06_uns_kafka: Python project for mqtt listener that subscribes to the MQTT broker and publishes to the KAFKA broker
- 07_uns_graphql: Python project for GraphQL server to query the Unified NameSpace
Each microservice can be independently imported into VSCode by going into the specific microservice folder. Instructions on setting up the python pip & virtual environments are provided in the respective ´README.md´ within that folder
However to import all microservices into the same workspace, the following commands need to be executed in the terminal of your VSCode and the current folder as .
(parent to all the microservices)
This has been tested on Unix(bash), Windows(powershell) and Mac(zsh)
pip install poetry
# Ensure that the poetry shell is activated
poetry shell
python -m pip install --upgrade pip poetry
poetry install
While importing the folder into VSCode remember to do the following steps the first time
Open a terminal in VSCode
Activate the poetry shell
poetry shell python -m pip install --upgrade pip poetry poetry installSelect the correct python interpreter in VSCode (should automatically detect the poetry virtual environment)
# run all tests
poetry run pytest
# run all tests excluding integration tests
poetry run pytest -m "not integrationtest"
# run all tests for a specific module
poetry run pytest ./02_mqtt-cluster
poetry run pytest ./03_uns_graphdb
poetry run pytest ./04_uns_historian
poetry run pytest ./05_sparkplugb
poetry run pytest ./06_uns_kafka
poetry run pytest ./07_uns_graphql
# run all tests for a specific module excluding integration test
poetry run pytest -m "not integrationtest" ./02_mqtt-cluster
poetry run pytest -m "not integrationtest" ./03_uns_graphdb
poetry run pytest -m "not integrationtest" ./04_uns_historian
poetry run pytest -m "not integrationtest" ./05_sparkplugb
poetry run pytest -m "not integrationtest" ./06_uns_kafka
poetry run pytest -m "not integrationtest" ./07_uns_graphql
-
Windows: Allowing powershell to run scripts If you get windows security error for running scripts, please run this first
powershell Set-ExecutionPolicy RemoteSigned
-
pytest-xdist & VSCode: To optimize and speed up the project is using the pytest-xdist This however has some challenges Working with VSCode Issue As a workaround run all tests which are marked
@pytest.mark.xdist_group
via the command line instead of within VSCode -
pytest-asyncio & Integration Testing: Similar to
pytest-xdist
I have also enabledpytest-asyncio
for the project. While this has significantly decreased the execution time, for some integration tests ( marked by@pytest.mark.integrationtest
) sometimes fail (flaky tests) if there is too much CPU / IO load. Executing them again normally works. Need to investigate how to make those more robust/race proof. The issue is not in the code but in the test case where the validation starts before the test data has completely been setup in the data store.