-
Notifications
You must be signed in to change notification settings - Fork 2
Kafka
The below figure gives an overview of the Kafka architecture. Note that in newer versions of Kafka, they no longer use ZooKeeper.
Producer
| Send data
Kafka clusters
Broker 1, Broker 2, Broker 3 - Zookeeper
| Read data
Consumers
Brokers act as the main server. They handle read and write operations as well as replicating data and data retention. The Brokers have a multitude of configuration options. A full list can be found in the documentation link below. Two of the common properties we want to use are auto.create.topics.enable to allow the creation of topics from non-Brokers and delete.topic.enable to allow for the deletion of topics.
Configuration options: https://docs.confluent.io/platform/current/installation/configuration/broker-configs.html
Topics are logs. Topics are labels for different types of events. They are used when reading/writing to the server to better store events. Each topic is stored on the disc. Each topic can also have an expiration date. Any event that is older than the specified expiration timeframe (last 30 days, 3 years, or never, etc.) will be removed from the topic. Newer Kafka servers, which no longer use ZooKeeper, can handle millions of topics. See Stack Overflow post snippet below.
“Update March 2021: With Kafka's new KRaft mode, which entirely removes ZooKeeper from Kafka's architecture, a Kafka cluster can handle millions of topics/partitions. See https://www.confluent.io/blog/kafka-without-zookeeper-a-sneak-peek/ for details. *short for "Kafka Raft Metadata mode"; in Early Access as of Kafka v2.8” https://stackoverflow.com/a/32963227/9115551
We can use various levels of topics. Messages can be sent to multiple topics making hierarchical topics possible. We can use topics for each student, each classroom, each day. We can keep a 30 day topic for a more recent analysis and an overall topic.
Creating topics:
from kafka.admin import KafkaAdminClient, NewTopic
from kafka.errors import TopicAlreadyExistsError
admin_client = KafkaAdminClient(
bootstrap_servers=[server1, server2],
client_id='test'
)
topics = [
NewTopic(
'new_topic', # topic name
num_partitions=1, # number of partitions for the topic
replication_factor=1, # replication factor of partitions
# topic_configs= # https://kafka.apache.org/documentation.html#topicconfigs
# The topic configs can be used to set expiration dates of logs and compact them
)
]
try:
admin_client.create_topics(topics)
except TopicAlreadyExistsError as e:
print('Unable to create topic:\n', e)Listing topics:
from kafka import KafkaConsumer
consumer = KafkaConsumer(
bootstrap_servers=[server1, server2]
)
print(consumer.topics())Partitions are where the events are located within the topics. These can be distributed across systems. Messages with the same key will always go in the same partition via a hash function. If we use a student as the key, all their events will always be located on the same partition.Using a key will also preserve order for the given event. If it does not contain a key, events are added to partitions in a round robin fashion.
There exists various ways to set-up the partition aside from using the default hash and mod methodology. However, if we are using the confluent-kafka Python implementation, custom partitioners are not supporter. See this linked Github issue on the Repository for more information.
Custom partitioners are currently not supported.
We need to iterate over messages in order to find a given key. The following excerpt details this:
How to get message by key from kafka topic - Stack Overflow
Kafka isn't indexed by key though, it's indexed by offset, and optionally by timestamp. If you need to lookup a key then you'll need to materialize the data to a system that's been designed to lookup by key: relational database, key value store, or some index. You can do this pretty easily with Kafka Connect, or if you'd like to build it into your service you could use the interactive queries feature of Kafka Streams.
To write to the Kafka server, you need to run a KafkaProducer. Pass the Kafka server location (single string or multiple with list). Specify how to parse the messages (value_serializer). Then whenever we send, we specify the topic and message to send. Each message can be sent to more than one topic.
import json
from kafka import KafkaProducer
from time import sleep
producer = KafkaProducer(
bootstrap_servers=[server1, server2], # list of Broker servers
api_version=(2, 8, 1), # Kafka version running on Broker
value_serializer=lambda x: json.dumps(x).encode('utf-8') # what to do with the data before sending it
)
# send some data to the Kafka server
topic = 'test_topic'
for n in range(100):
data = {'num': n}
producer.send(topic, value=data) # depending on the configuration of the Broker, this will automatically create the topic if its not already present
sleep(1)To read from the kafka server, you need to run a KafkaConsumer. Pass the Kafka topic you wish to subscribe to and the Kafka servers to connect to (singular string or multiple with list). Run a while forever loop to iterate over the messages within that log file. Need to specify how to read the messages (value_deserializer). This will read all current events within a given topic and continue listening for more data.
from kafka import KafkaConsumer
consumer = KafkaConsumer(
'test_topic', # topic we want to read from
bootstrap_servers=[server1, server2], # list of Broker servers
api_version=(2, 8, 1), # Kafka version running on Broker
value_deserializer=lambda x: json.loads(x.decode('utf-8')), # how to parse the data
auto_offset_reset='earliest' # determine where in the topic to start reading from
)There are multiple ways to secure the Kafka Broker using either Mutual TLS (mTLS) or some form of Simple Authentication and Security Layer (SASL). A broker can use more than one form of SASL authentication, but cannot use both mTLS and SASL. The three suggested methods are:
- Mutual TLS (mTLS)
- rely on SSL/TLS certificates
- Configuration uses the term SSL for historical purposes
- 2 methods of doing this
- Truststore contains one or many certificates, we trust any certificate listed in the truststore - easier to block authentication, just simply remove the certificate from the truststore
- Truststore contains a Certificate Authority (CA), we trust any certificate signed by the CA - more convenient since we don’t have to change the truststore
- https://docs.confluent.io/platform/current/kafka/authentication_ssl.html
- SASL SCRAM - Salted Challenge Response Authentication Mechanism
- Stores credentials in Zookeeper, thus Zookeeper should be on a private network
- Should be used with TLS encryption
- This might require an enterprise license to use; however, I’m not sure
- https://docs.confluent.io/platform/current/kafka/authentication_sasl/authentication_sasl_scram.html
- SASL GSSAPI Only for Kerberos, so probably not for us
The following code chunks are various commands I used to try and set up mutual TLS. The keytool -keystore server.keystore.jks -alias localhost -import -file cert-signed command fails because no file named cert-signed is found. The guide I followed for these commands are a mix of the official confluent documentation and the following link:
https://www.ibm.com/docs/en/cloud-paks/cp-biz-automation/20.0.x?topic=emitter-preparing-ssl-certificates-kafka
keytool -keystore server.keystore.jks -alias localhost -validity 365 -genkey -keyalg RSA
keytool -keystore client.keystore.jks -alias localhost -validity 365 -genkey -keyalg RSA
openssl req -new -x509 -keyout ca-key -out ca-cert -days 365
keytool -keystore server.keystore.jks -alias localhost -certreq -file cert-file
keytool -keystore client.keystore.jks -alias localhost -certreq -file cert-file
openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 365 -CAcreateserial -passin pass:whitegum
keytool -keystore server.keystore.jks -alias CARoot -import -file ca-cert
keytool -keystore server.keystore.jks -alias localhost -import -file cert-signed
keytool -keystore client.keystore.jks -alias CARoot -import -file ca-cert
keytool -keystore client.keystore.jks -alias localhost -import -file cert-signed
keytool -keystore server.truststore.jks -alias CARoot -import -file ca-cert
keytool -keystore client.truststore.jks -alias CARoot -import -file ca-cert
/home/kafka/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic test --from-beginning --consumer.config /home/kafka/kafka/config/client-ssl.propertiesconfig/server.properties
listeners=PLAINTEXT://localhost:9092,SSL://localhost:9093
advertised.listeners=PLAINTEXT://localhost:9092,SSL://localhost:9093
ssl.keystore.location=/home/kafka/server.keystore.jks
ssl.keystore.password=whitegum
ssl.key.password=whitegum
ssl.truststore.location=/home/kafka/server.truststore.jks
ssl.truststore.password=whitegum
ssl.client.auth=required
ssl.enabled.protocols=TLSv1.2
ssl.truststore.type=JKS
ssl.keystore.type=JKS
ssl.secure.random.implementation=SHA1PRNG
ssl.endpoint.identification.algorithm=
config/client-ssl.properties
security.protocol=SSL
ssl.truststore.location=/home/kafka/client.truststore.jks
ssl.truststore.password=whitegum
ssl.keystore.location=/home/kafka/client.keystore.jks
ssl.keystore.password=whitegum
ssl.key.password=whitegum
ssl.enabled.protocols=TLSv1.2
ssl.truststore.type=JKS
ssl.keystore.type=JKS
ssl.endpoint.identification.algorithm=
Kafka can handle topic deletion. There is some work that needs to be done for it to happen though. We need to make sure that the Broker is set up to allow for topic deletion. It does not appear to be possible to remove specific keys from a given topic. We can delete up to a given offset though.
There exists a product for handling the backup and recovery process for the Kafka brokers, called kafka-backup. This is possible through the use of Kafka Connect, a process for connecting Kafka to external sources like Databases or in our case, the local file system. For information can be found in the following Medium article: https://medium.com/@anatolyz/introducing-kafka-backup-9dc0677ea7ee
- Topic for each student, partitions are not really used well here
- Topic for each classroom, keys are specific to each student
- Topic for overall
- Topic for last 30 days (more recent analysis)
- Topics for general system related things (one for each server WO is running on)
- Topic for removal of data
- Topic for each of our current main logs
Kafka Streams are a standalone system to the Kafka brokers. They are helpers to the consumers and provide filtering, aggregation, grouping, joining, etc. This is accessed through a functional Java API. Depending on our full use cases, this might be a component we wish to explore in the future.
This is a highly specialized database for consuming events. This is essentially a more abstract version of the Kafka Streams as it does not depend on the Java API. Instead, this framework uses SQL queries to accomplish tasks.
- Confluent Docker quickstart
- Installs entire Confluent platform
- Just used step 1
- https://docs.confluent.io/platform/current/quickstart/ce-docker-quickstart.html#ce-docker-quickstart
- Installing Apache Kafka on CentOS 7
- Only installs base Kafka, none of the other bells and whistles
- The download link for the tar used in the guide is not available anymore
- Use this instead: kafka_2.13-2.8.1.tgz
- Additionally, we want to enable topic creation from places other than the server
- During step 3, also add the following line to
server.properties auto.create.topics.enable = true
- During step 3, also add the following line to
- https://www.digitalocean.com/community/tutorials/how-to-install-apache-kafka-on-centos-7
- Reading/Writing to Kafka with Python
- Uses simple Producer and Consumer to publish and subscribe to messages
- https://www.javatpoint.com/kafka-in-python
- Confluent Kafka python overview
Apache Spark is a product used for data analysis. Spark is built on Hadoop, a distributed file system management system also by Apache. Spark can be run in a distributed environment. This is to provide better performance and not rely solely on your own machine’s resources. Spark also provides a data storage aspect using Hadoop.
This might be useful in some of the post-hoc analysis, where we need to pull a mass amount of data and run some of the reducers on them.
We can however read data from Kafka. We can use Kafka as our overall data management system and when we get to the point where we need to process BIG data, we can use Spark and read directly from Kafka topics to reduce the data.
A couple ideas:
- Language tools could process data with Spark, while reading it from the Kafka topics
- Spark is used as a reducer that processes data from the Kafka topics and passes the processed data to language tools