Skip to content

Software Design

Ryan Slominski edited this page Aug 28, 2024 · 81 revisions


Overview

The backbone of this alarm system is Apache Kafka. Kafka was chosen because it is one of the most popular and trusted event streaming systems (80% of fortune 100 companies use it), and it has already been demonstrated at SNS as a viable alarm system backbone. Kafka provides a publish/subscribe interface for clients to listen for alarm updates as well as acts as an Event Source [1], [2], [3] database for clients to determine the existing state of the system.

Topics

The alarm system state is stored in Kafka topics. Topic schemas are stored in the Schema Registry in AVRO format. Python scripts are provided for managing the alarm system topics.

Topic Description Key Schema Value Schema Scripts
alarms Set of all possible alarm registration instances and their non-class inherited metadata (descriptions). String: alarm name AVRO: Alarm set-alarm.py, list-alarms.py
alarm-actions Contains class-wide alarm registration information to avoid redundant specification String: action name AVRO: AlarmAction set-action.py, list-actions.py
alarm-locations Set of areas where alarms can be found. Locations are hierarchical. String: location name AVRO: AlarmLocation set-location.py, list-locations.py
alarm-activations Determines whether an alarm is active (alarming) and provides associated metadata. String: alarm name AVRO: AlarmActivationUnion set-activation.py, list-activations.py
alarm-overrides Set of alarms that have been overridden. AVRO: AlarmOverrideKey AVRO: AlarmOverrideUnion set-override.py, list-overrides.py
alarm-systems Additional classification for alarms that also associate a responsible Team. String: system name AVRO: AlarmSystem set-system.py, list-systems.py
effective-registrations Contains effective registrations considering alarm class defaults. Registration fields are formed from the union of an instance and a class specification - for example the corrective action for a certain class of machine Magnets (of which there are hundreds of instances) is typically the same. String: alarm name AVRO: EffectiveRegistration set-effective-registration.py, list-effective-registrations.py
effective-notifications Contains effective state considering overrides. Technically each client could figure out the effective activation state themselves, but it's a chore so its done by the jaws-effective-processor. Timing and synchronization constraints in distributed stream processing apps also means clients could see different intermediate results without a shared central processor to serialize and order the joining of streams. String: alarm name AVRO: EffectiveNotification set-effective-notification.py, list-effective-notifications.py
effective-alarms Contains both effective registrations and effective notifications. This monolog of all events joined together may be useful for clients that require an absolute ordering of changes in the alarm system or clients which prefer all-in-one notification messages. String: alarm name AVRO: EffectiveAlarm set-effective-alarm.py, list-effective-alarms.py

AVRO schemas are registered in the Schema Registry using the standard subject naming strategy of topic-key and topic-value.

All topics have compaction enabled to remove old messages that would be overwritten on replay. Compaction is not very aggressive though so some candidates for deletion are often lingering when clients connect so they must be prepared to handle the ordered messages on replay as ones later in the stream with the same key overwrite ones earlier. To modify a record simply produce a new one with the same key.

We currently use a whole entity per message event sourcing scheme - meaning all fields for a given record are always present (or implied null). For example the entire registration state of a given alarm is always stored in a single record (the class field does act as a 'foreign key' reference, but that's resolved by the processor to effective-registrations). We could have instead allowed a more granular scheme where a message only contains the fields that are changing, and that would have at least two benefits: (1) save on message size with partial field changes and (2) Clients would not need to know the entire state of a record when making partial field changes (otherwise clients must know and re-set even the fields they aren't intending to change in the record). However, we went with the atomic strategy since it is easier to deal with - clients don't have to merge records, they're always replace. In practice in our application clients would always be writing full replace records disregarding current state anyways, and for when they do need to do partial changes they already have current state easily at hand. Further, messages are generally small except for registration ones, which change very infrequently. Performance of the alarm system is a modest concern compared to many other Kafka Streaming applications in which scalability and huge data are more of a concern.

Tombstones

To unset (remove) a record write a tombstone record (null/None value). This can be done with the provided scripts using the --unset option. The tombstone approach is used to unregister, and remove overrides. It is also possible to unset active alarming using tombstones, but it turns out using an explicit NoActivation record is preferrable to allow efficient Kafka topic compaction. If tombstones are used for activations then the alarm-activation topic grows very big and rewinding and replaying messages takes longer.

Headers

The alarm system topics are expected to include audit information in Kafka message headers:

Header Description
user The username of the account whom produced the message
producer The application name that produced the message
host The hostname where the message was produced

Additionally, the built-in timestamp provided in all Kafka messages is used to provide basic message timing information. The alarm system uses the default producer provided timestamps (as opposed to broker provided), so timestamps may not be ordered.

Note: There is no schema for message headers so content is not easily enforceable. However, the topic management scripts provided include the audit headers listed.

Customize Alarms

The information registered with an alarm can be customized by modifying the Alarm and related schemas (AlarmAction). For example, producers may be domain specific.

Overrides can be customized by modifying the AlarmOverrideKey and AlarmOverrideUnion schemas. For example, to add or remove override options.

Generally alarm producers should simply indicate that an alarm is active or not. However, not all producers work this way - some are a tangled mess (like EPICS, which indicates priority and type at the time of activation notification - a single EPICS PV therefore maps to multiple alarms). It is possible to modify the AlarmActivationUnion schema to be anything you want. The schema is currently a union of schemas for flexibility:

  • Activation: Alarming state for a simple alarm, if record is present then alarming. There are no fields.
  • NoteActivation: Alarming state for an alarm with an extra information string.
  • EPICSActivation: Alarming state for an EPICS alarm with STAT and SEVR fields.
  • ChannelErrorActivation: Alarming state indicating that the Kafka Producer client (often a proxy for another message passing system), encountered a communication error on the channel to the pluggable alarm source. For example for the epics2kafka Kafka Producer if an EPICS CA Channel is Never Connected or Disconnected those exceptions would be "thrown" via a ChannelErrorActivation message to propagate it to clients to handle (operators probably should know when a registered alarm is unable to be monitored).
  • NoActivation: An explicit no activation record can be aggressively compacted in Kafka (unlike a tombstone)

Services

  • Sources
    • anything authorized to produce messages on the alarm-activations topic
  • Middleware
    • Broker: Kafka - distributed message system (Docker examples use KRAFT mode for coordination to avoid ZooKeeper)
    • Registry: Confluent Schema Registry - message schema lookup
    • Stream Processors:
      • jaws-effective-processor - Compute effective state and output to the effective-registrations, effective-notifications, and effective-alarms topics
      • extension: jaws-registrations2epics - alarm registrations inform epics2kafka what to monitor
  • Apps
    • Admin GUI - Web app for admins to manage registered alarms
    • Operator GUI - Python desktop app for operators to monitor alarms and manage overrides
  • APIs
    • jaws-libp - Python admin Command Line Interface (CLI) and API to setup and manage the alarm system
    • jaws-libj - Java API to JAWS

Note: The admin CLI scritps are used to:

  1. Setup/manage the Kafka topics
  2. Setup/manage AVRO schemas in the registry
  3. Produce and consume alarm system messages

Effective Processor

Alarm Processor Diagram

The alarm processor is actually comprised of a pipeline of multiple Kafka Streams apps that pass intermediate results via enrichment of an IntermediateMonolog entity on intermediate topics. The pipeline looks like:

* = Kafka Streams app

    alarm-actions,
    alarms                -> effective-registrations                                                                   
                   \    /                                                                                                     
                     > *                                                                                                          -> effective-notifications
                   /    \    intermediate       int.             int.        int.          int.       int.          int.        /
  alarm-activations,      -> registration > * > activation > * > latch > * > oneshot > * > mask > * > ondelay > * > offdelay > *               
    alarm-overrides <-                      |                |           |             |          |             |               \
                       \                    |                |           |             |          |             |                 -> effective-alarms
                         ---------------------------------------------------------------------------------------                            

Many of the stream processors must produce messages on the alarm-overrides topic in addition to passing records through to the next processor in the chain. The Kafka-Streams filtering feature is used to output to multiple topics at once (alarm-overrides plus the pass-through topic to next stream processor). There is a cyclic nature to these branches as adding a record to the alarm-overrides topic ultimately results in a new record pushed into the monolog processing pipeline. We enrich the monolog record with a ProcessorTransitions field consisting of flags indicating that a processor in the pipeline needs to change the state. When a Kafka Streams processor in the pipeline detects a condition resulting in an automated override such as latching an alarm, the processor can either (1) drop the record indicating a state change or (2) forward the record, but indicate that a state change is coming. In the second approach the final output topic will show the correct effective state twice: once when the conditions that trigger the transition is first detected, then again once the record added to the alarm-overrides table is propagated through the pipeline. However, that creates double the number of messages. We're still experimenting with the best approach, but at the moment we've opted to introduce some latency by dropping intermediate messages and waiting for the override message to stream back through. This should probably be configurable.

The relative order of the latch, oneshot, mask, ondelay, and offdelay processing doesn't matter and could be done in parallel (if it was simple to orchestrate) - they all just need to come after activation processing and before effective state processing. They all depend on the enriched monolog available after activation processing, but they don't have any dependencies on each other, and the effective processing looks at the intermediate results after they all run.

There is a also a Kafka Streams app bundled with the processor that does not participate in the pipeline and is used to manage the shelved alarm expiration timer, which produces tombstone messages on the alarm-overrides topic asynchronously.