Skip to content
Bradley Erickson edited this page Jun 20, 2022 · 8 revisions

Kafka

General Architecture

The below figure gives an overview of the Kafka architecture. Note that in newer versions of Kafka, they no longer use ZooKeeper.

Producer
    |  Send data
Kafka clusters
    Broker 1, Broker 2, Broker 3 - Zookeeper
    |  Read data
Consumers

Brokers

Brokers act as the main server. They handle read and write operations as well as replicating data and data retention. The Brokers have a multitude of configuration options. A full list can be found in the documentation link below. Two of the common properties we want to use are auto.create.topics.enable to allow the creation of topics from non-Brokers and delete.topic.enable to allow for the deletion of topics.

Configuration options: https://docs.confluent.io/platform/current/installation/configuration/broker-configs.html

Topics

Topics are logs. Topics are labels for different types of events. They are used when reading/writing to the server to better store events. Each topic is stored on the disc. Each topic can also have an expiration date. Any event that is older than the specified expiration timeframe (last 30 days, 3 years, or never, etc.) will be removed from the topic. Newer Kafka servers, which no longer use ZooKeeper, can handle millions of topics. See Stack Overflow post snippet below.

Update March 2021: With Kafka's new KRaft mode, which entirely removes ZooKeeper from Kafka's architecture, a Kafka cluster can handle millions of topics/partitions. See https://www.confluent.io/blog/kafka-without-zookeeper-a-sneak-peek/ for details. *short for "Kafka Raft Metadata mode"; in Early Access as of Kafka v2.8” https://stackoverflow.com/a/32963227/9115551

We can use various levels of topics. Messages can be sent to multiple topics making hierarchical topics possible. We can use topics for each student, each classroom, each day. We can keep a 30 day topic for a more recent analysis and an overall topic.

Creating topics:

from kafka.admin import KafkaAdminClient, NewTopic
from kafka.errors import TopicAlreadyExistsError

admin_client = KafkaAdminClient(
    bootstrap_servers=[server1, server2],
    client_id='test'
)

topics = [
    NewTopic(
        'new_topic',            # topic name
        num_partitions=1,       # number of partitions for the topic
        replication_factor=1,   # replication factor of partitions
        # topic_configs=        # https://kafka.apache.org/documentation.html#topicconfigs
        # The topic configs can be used to set expiration dates of logs and compact them
    )
]

try:
    admin_client.create_topics(topics)
except TopicAlreadyExistsError as e:
    print('Unable to create topic:\n', e)

Listing topics:

from kafka import KafkaConsumer

consumer = KafkaConsumer(
    bootstrap_servers=[server1, server2]
)

print(consumer.topics())

Partitions

Partitions are where the events are located within the topics. These can be distributed across systems. Messages with the same key will always go in the same partition via a hash function. If we use a student as the key, all their events will always be located on the same partition.Using a key will also preserve order for the given event. If it does not contain a key, events are added to partitions in a round robin fashion.

There exists various ways to set-up the partition aside from using the default hash and mod methodology. However, if we are using the confluent-kafka Python implementation, custom partitioners are not supporter. See this linked Github issue on the Repository for more information.

Custom partitioners are currently not supported.

Looking up messages by Key

We need to iterate over messages in order to find a given key. The following excerpt details this:

How to get message by key from kafka topic - Stack Overflow

Kafka isn't indexed by key though, it's indexed by offset, and optionally by timestamp. If you need to lookup a key then you'll need to materialize the data to a system that's been designed to lookup by key: relational database, key value store, or some index. You can do this pretty easily with Kafka Connect, or if you'd like to build it into your service you could use the interactive queries feature of Kafka Streams.

Writing to the server

To write to the Kafka server, you need to run a KafkaProducer. Pass the Kafka server location (single string or multiple with list). Specify how to parse the messages (value_serializer). Then whenever we send, we specify the topic and message to send. Each message can be sent to more than one topic.

import json
from kafka import KafkaProducer
from time import sleep

producer = KafkaProducer(
    bootstrap_servers=[server1, server2],  # list of Broker servers
    api_version=(2, 8, 1),  # Kafka version running on Broker
    value_serializer=lambda x: json.dumps(x).encode('utf-8')  # what to do with the data before sending it
)

# send some data to the Kafka server
topic = 'test_topic'
for n in range(100):
    data = {'num': n}
    producer.send(topic, value=data)  # depending on the configuration of the Broker, this will automatically create the topic if its not already present
    sleep(1)

Reading from the server

To read from the kafka server, you need to run a KafkaConsumer. Pass the Kafka topic you wish to subscribe to and the Kafka servers to connect to (singular string or multiple with list). Run a while forever loop to iterate over the messages within that log file. Need to specify how to read the messages (value_deserializer). This will read all current events within a given topic and continue listening for more data.

from kafka import KafkaConsumer

consumer = KafkaConsumer(
    'test_topic',  # topic we want to read from
    bootstrap_servers=[server1, server2],  # list of Broker servers
    api_version=(2, 8, 1),  # Kafka version running on Broker
    value_deserializer=lambda x: json.loads(x.decode('utf-8')),  # how to parse the data
    auto_offset_reset='earliest'  # determine where in the topic to start reading from
)

Securing the server

There are multiple ways to secure the Kafka Broker using either Mutual TLS (mTLS) or some form of Simple Authentication and Security Layer (SASL). A broker can use more than one form of SASL authentication, but cannot use both mTLS and SASL. The three suggested methods are:

  • Mutual TLS (mTLS)
    • rely on SSL/TLS certificates
    • Configuration uses the term SSL for historical purposes
    • 2 methods of doing this
      • Truststore contains one or many certificates, we trust any certificate listed in the truststore - easier to block authentication, just simply remove the certificate from the truststore
      • Truststore contains a Certificate Authority (CA), we trust any certificate signed by the CA - more convenient since we don’t have to change the truststore
    • https://docs.confluent.io/platform/current/kafka/authentication_ssl.html
  • SASL SCRAM - Salted Challenge Response Authentication Mechanism
  • SASL GSSAPI Only for Kerberos, so probably not for us

Set up notes for Mutual TLS

The following code chunks are various commands I used to try and set up mutual TLS. The keytool -keystore server.keystore.jks -alias localhost -import -file cert-signed command fails because no file named cert-signed is found. The guide I followed for these commands are a mix of the official confluent documentation and the following link: https://www.ibm.com/docs/en/cloud-paks/cp-biz-automation/20.0.x?topic=emitter-preparing-ssl-certificates-kafka

keytool -keystore server.keystore.jks -alias localhost -validity 365 -genkey -keyalg RSA
keytool -keystore client.keystore.jks -alias localhost -validity 365 -genkey -keyalg RSA

openssl req -new -x509 -keyout ca-key -out ca-cert -days 365

keytool -keystore server.keystore.jks -alias localhost -certreq -file cert-file
keytool -keystore client.keystore.jks -alias localhost -certreq -file cert-file

openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 365 -CAcreateserial -passin pass:whitegum

keytool -keystore server.keystore.jks -alias CARoot -import -file ca-cert
keytool -keystore server.keystore.jks -alias localhost -import -file cert-signed

keytool -keystore client.keystore.jks -alias CARoot -import -file ca-cert
keytool -keystore client.keystore.jks -alias localhost -import -file cert-signed

keytool -keystore server.truststore.jks -alias CARoot -import -file ca-cert
keytool -keystore client.truststore.jks -alias CARoot -import -file ca-cert

/home/kafka/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic test --from-beginning --consumer.config /home/kafka/kafka/config/client-ssl.properties

config/server.properties

listeners=PLAINTEXT://localhost:9092,SSL://localhost:9093
advertised.listeners=PLAINTEXT://localhost:9092,SSL://localhost:9093
ssl.keystore.location=/home/kafka/server.keystore.jks
ssl.keystore.password=whitegum
ssl.key.password=whitegum
ssl.truststore.location=/home/kafka/server.truststore.jks
ssl.truststore.password=whitegum
ssl.client.auth=required
ssl.enabled.protocols=TLSv1.2
ssl.truststore.type=JKS
ssl.keystore.type=JKS
ssl.secure.random.implementation=SHA1PRNG
ssl.endpoint.identification.algorithm=

config/client-ssl.properties

security.protocol=SSL
ssl.truststore.location=/home/kafka/client.truststore.jks
ssl.truststore.password=whitegum
ssl.keystore.location=/home/kafka/client.keystore.jks
ssl.keystore.password=whitegum
ssl.key.password=whitegum
ssl.enabled.protocols=TLSv1.2
ssl.truststore.type=JKS
ssl.keystore.type=JKS
ssl.endpoint.identification.algorithm=

Deleting data

Kafka can handle topic deletion. There is some work that needs to be done for it to happen though. We need to make sure that the Broker is set up to allow for topic deletion. It does not appear to be possible to remove specific keys from a given topic. We can delete up to a given offset though.

Backing up data

There exists a product for handling the backup and recovery process for the Kafka brokers, called kafka-backup. This is possible through the use of Kafka Connect, a process for connecting Kafka to external sources like Databases or in our case, the local file system. For information can be found in the following Medium article: https://medium.com/@anatolyz/introducing-kafka-backup-9dc0677ea7ee

Possible architecture

  • Topic for each student, partitions are not really used well here
  • Topic for each classroom, keys are specific to each student
  • Topic for overall
  • Topic for last 30 days (more recent analysis)
  • Topics for general system related things (one for each server WO is running on)
  • Topic for removal of data
  • Topic for each of our current main logs

Kafka Streams

Kafka Streams are a standalone system to the Kafka brokers. They are helpers to the consumers and provide filtering, aggregation, grouping, joining, etc. This is accessed through a functional Java API. Depending on our full use cases, this might be a component we wish to explore in the future.

ksqlDB

This is a highly specialized database for consuming events. This is essentially a more abstract version of the Kafka Streams as it does not depend on the Java API. Instead, this framework uses SQL queries to accomplish tasks.

Guides Used

Apache Spark

Apache Spark is a product used for data analysis. Spark is built on Hadoop, a distributed file system management system also by Apache. Spark can be run in a distributed environment. This is to provide better performance and not rely solely on your own machine’s resources. Spark also provides a data storage aspect using Hadoop.

Usage

This might be useful in some of the post-hoc analysis, where we need to pull a mass amount of data and run some of the reducers on them.

We can however read data from Kafka. We can use Kafka as our overall data management system and when we get to the point where we need to process BIG data, we can use Spark and read directly from Kafka topics to reduce the data.

A couple ideas:

  • Language tools could process data with Spark, while reading it from the Kafka topics
  • Spark is used as a reducer that processes data from the Kafka topics and passes the processed data to language tools
Clone this wiki locally