An EPICS areaDetector plugin which sends areaDetector data serialised using flatbuffers to a Kafka broker. The plugin is in a state which should make it useful (ignoring unknown bugs). Several suggestions on improvements are listed last in this document however.
Apache Kafka is an open-source platform for handling streaming data using or more data brokers in order to maximize throughput and reliability. More information on Apache Kafka can be found at the website of that project.
For communicating with the Kafka broker, the C++ version of librdkafka
is used. The source code for this library can be downloaded from https://github.com/edenhill/librdkafka.
For serializing the areaDetector (NDArray) data, Google FlatBuffers is used. Serializing data using FlatBuffers is fast with a relatively small memory overhead while being easier to use than C-structs.
To simplify building of this project, tha flatbuffers source code has been included in this repository. Read the file flatbuffers_LICENSE.txt for the flatbuffers license.
librdkafka
produces statistics messages in JSON and these are parsed using jsoncpp
(https://github.com/open-source-parsers/jsoncpp). To simplify building of this project, the jsoncpp
source code has been included in this project. The license of this library can be found in the file jsoncpp_LICENSE.txt.
The steps shown here worked on the development machine but has been tested nowhere else.
- Copy the ADPluginKafka directory to your $(EPICS_MODULES_PATH)/areaDetector directory.
- Set the variable
SIMDET
in the file ADPluginKafka/iocs/ADPluginKafkaIOC/configure/RELEASE to point to the location of ADSimDetector in your EPICS installation. - Modify the file /ADPluginKafka/ADPluginKafkaApp/src/Makefile such that
RDKAFKA
points to the directory where librdkafka is installed (usually something like/usr/local/
). - Modify the files Makefile and start_epics in the ADPluginKafka/iocs/ADPluginKafkaIOC/iocBoot/iocADPluginKafka directory to reflect the current platform.
- Change (
cd
) to the ADPluginKafka directory and compile by runningmake
. - Modify the ADPluginKafka/iocs/ADPluginKafkaIOC/iocBoot/iocADPluginKafka/st.cmd file to use the address of your Kafka broker.
- Run
sh start_epics
from that directory.
This plugin provides a few extra process variables (PV) besides the ones provided through inheritance from NDPluginDriver
. The plugin also modifies one process variable inherited from NDPluginDriver
directly. The new PV are listed below. Note that the PV names are prefixed by $(P)$(R)
.
PV | Type | Default value | Description |
---|---|---|---|
SourceName, SourceName_RBV | string |
n/a | The name of the data source in the flatbuffers produced by this plugin. Can not be an empty string. |
ReconnectFlush, ReconnectFlush_RBV | bool (0 or 1) |
false |
Tells the plugin if it should attempt to flush the current message buffer before re-doing a connection. Disabled by default as this is ablocking call. |
ReconnectFlushTime, ReconnectFlushTime_RBV | int |
500 [ms] |
The (maximum) amount of time in ms to wait (block) for a flush to finish if ReconnectFlush is set to true . |
KafkaBufferSize, KafkaBufferSize_RBV | int |
500000 [kb] |
The maximum kafka message buffer size in kb. Note that this setting has a higher priority than KafkaMaxQueueSize. Changing this value will trigger a disconnect and re-connect of the Kafka connection. |
KafkaMaxMessageSize, KafkaMaxMessageSize_RBV | int |
10000000 [b] |
The maximum accepted message size (of individual flatbuffer messages) in bytes. Changing this value will trigger a disconnect and re-connect of the Kafka connection. |
UnsentPackets_RBV | int |
n/a | The number of (flatbuffer) messages lost/dropped due to connection issues with the Kafka broker. Note that we will only start dropping (permanently losing) messages when the message buffer is full. |
ConnectionStatus_RBV | enum |
n/a | The current Kafka connection status of the plugin. Can take the values "Connected" (0), "Connecting" (1), "Disconnected" (2), "Error" (3). |
ConnectionMessage_RBV | string |
n/a | The current connection status in the form of a string. For easier debugging of connection issues. |
KafkaTopic, KafkaTopic_RBV | string |
n/a | The Kafka topic to which the flatbufffer messages are to be transmitted. Can not be an empty string. |
KafkaBrokerAddress, KafkaBrokerAddress_RBV | string |
n/a | The address (and port) to a Kafka broker. Note that if you have more than one broker in your cluster, the address provided here might not be the one that is ultimately used. Changing this value will trigger a disconnect and re-connect of the Kafka connection. |
KafkaStatsIntervalTime, KafkaStatsIntervalTime_RBV | int |
500 [ms] |
How often the Kafka connection status PVs are updated in ms. Changing this value will trigger a disconnect and re-connect of the Kafka connection. |
KafkaMaxQueueSize, KafkaMaxQueueSize_RBV | int |
200 | Maximum number of messages in the buffer of messages to be sent to Kafka. Note that this setting has a lower priority than KafkaBufferSize. Changing this value will trigger a disconnect and re-connect of the Kafka connection. |
The repository contains a directory with code for unit tests of the two projects. Do note that the build system of the unit tests (specifically the CMake file) will most likely require some modification to work on your system. Due to differences in EPICS installations, the CMake file has only been tested on the development machine (running MacOSX).
The unit tests use GTest/GMock (and librdkafka of course). These dependencies are provided using conan. To compile and run the unit tests (assuming the CMake code has been suitably modified for your system), execute the following command line arguments.
cd ad-kafka-interface
mkdir build
cd build
conan install ../ --build=outdated
cmake ..
make
./bin/unit_tests