-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
docs: Initial commit with the template of the project
See the TODOs to complete the project
- Loading branch information
0 parents
commit 0989db4
Showing
43 changed files
with
2,274 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,233 @@ | ||
# Public Transit Status with Apache Kafka | ||
|
||
In this project, you will construct a streaming event pipeline around Apache Kafka and its ecosystem. Using public data from the [Chicago Transit Authority](https://www.transitchicago.com/data/) we will construct an event pipeline around Kafka that allows us to simulate and display the status of train lines in real time. | ||
|
||
When the project is complete, you will be able to monitor a website to watch trains move from station to station. | ||
|
||
![Final User Interface](images/ui.png) | ||
|
||
|
||
## Prerequisites | ||
|
||
The following are required to complete this project: | ||
|
||
* Docker | ||
* Python 3.7 | ||
* Access to a computer with a minimum of 16gb+ RAM and a 4-core CPU to execute the simulation | ||
|
||
## Description | ||
|
||
The Chicago Transit Authority (CTA) has asked us to develop a dashboard displaying system status for its commuters. We have decided to use Kafka and ecosystem tools like REST Proxy and Kafka Connect to accomplish this task. | ||
|
||
Our architecture will look like so: | ||
|
||
![Project Architecture](images/diagram.png) | ||
|
||
### Step 1: Create Kafka Producers | ||
The first step in our plan is to configure the train stations to emit some of the events that we need. The CTA has placed a sensor on each side of every train station that can be programmed to take an action whenever a train arrives at the station. | ||
|
||
To accomplish this, you must complete the following tasks: | ||
|
||
1. Complete the code in `producers/models/producer.py` | ||
1. Define a `value` schema for the arrival event in `producers/models/schemas/arrival_value.json` with the following attributes | ||
* `station_id` | ||
* `train_id` | ||
* `direction` | ||
* `line` | ||
* `train_status` | ||
* `prev_station_id` | ||
* `prev_direction` | ||
1. Complete the code in `producers/models/station.py` so that: | ||
* A topic is created for each station in Kafka to track the arrival events | ||
* The station emits an `arrival` event to Kafka whenever the `Station.run()` function is called. | ||
* Ensure that events emitted to kafka are paired with the Avro `key` and `value` schemas | ||
1. Define a `value` schema for the turnstile event in `producers/models/schemas/turnstile_value.json` with the following attributes | ||
* `station_id` | ||
* `station_name` | ||
* `line` | ||
1. Complete the code in `producers/models/turnstile.py` so that: | ||
* A topic is created for each turnstile for each station in Kafka to track the turnstile events | ||
* The station emits a `turnstile` event to Kafka whenever the `Turnstile.run()` function is called. | ||
* Ensure that events emitted to kafka are paired with the Avro `key` and `value` schemas | ||
|
||
### Step 2: Configure Kafka REST Proxy Producer | ||
Our partners at the CTA have asked that we also send weather readings into Kafka from their weather hardware. Unfortunately, this hardware is old and we cannot use the Python Client Library due to hardware restrictions. Instead, we are going to use HTTP REST to send the data to Kafka from the hardware using Kafka's REST Proxy. | ||
|
||
To accomplish this, you must complete the following tasks: | ||
|
||
1. Define a `value` schema for the weather event in `producers/models/schemas/weather_value.json` with the following attributes | ||
* `temperature` | ||
* `status` | ||
1. Complete the code in `producers/models/weather.py` so that: | ||
* A topic is created for weather events | ||
* The weather model emits `weather` event to Kafka REST Proxy whenever the `Weather.run()` function is called. | ||
* **NOTE**: When sending HTTP requests to Kafka REST Proxy, be careful to include the correct `Content-Type`. Pay close attention to the [examples in the documentation](https://docs.confluent.io/current/kafka-rest/api.html#post--topics-(string-topic_name)) for more information. | ||
* Ensure that events emitted to REST Proxy are paired with the Avro `key` and `value` schemas | ||
|
||
### Step 3: Configure Kafka Connect | ||
Finally, we need to extract station information from our PostgreSQL database into Kafka. We've decided to use the [Kafka JDBC Source Connector](https://docs.confluent.io/current/connect/kafka-connect-jdbc/source-connector/index.html). | ||
|
||
To accomplish this, you must complete the following tasks: | ||
|
||
1. Complete the code and configuration in `producers/connectors.py` | ||
* Please refer to the [Kafka Connect JDBC Source Connector Configuration Options](https://docs.confluent.io/current/connect/kafka-connect-jdbc/source-connector/source_config_options.html) for documentation on the options you must complete. | ||
* You can run this file directly to test your connector, rather than running the entire simulation. | ||
* Make sure to use the [Landoop Kafka Connect UI](http://localhost:8084) and [Landoop Kafka Topics UI](http://localhost:8085) to check the status and output of the Connector. | ||
* To delete a misconfigured connector: `CURL -X DELETE localhost:8083/connectors/stations` | ||
|
||
### Step 4: Configure the Faust Stream Processor | ||
We will leverage Faust Stream Processing to transform the raw Stations table that we ingested from Kafka Connect. The raw format from the database has more data than we need, and the line color information is not conveniently configured. To remediate this, we're going to ingest data from our Kafka Connect topic, and transform the data. | ||
|
||
To accomplish this, you must complete the following tasks: | ||
|
||
1. Complete the code and configuration in `consumers/faust_stream.py | ||
|
||
#### Watch Out! | ||
|
||
You must run this Faust processing application with the following command: | ||
|
||
`faust -A faust_stream worker -l info` | ||
|
||
### Step 5: Configure the KSQL Table | ||
Next, we will use KSQL to aggregate turnstile data for each of our stations. Recall that when we produced turnstile data, we simply emitted an event, not a count. What would make this data more useful would be to summarize it by station so that downstream applications always have an up-to-date count | ||
|
||
To accomplish this, you must complete the following tasks: | ||
|
||
1. Complete the queries in `consumers/ksql.py` | ||
|
||
#### Tips | ||
|
||
* The KSQL CLI is the best place to build your queries. Try `ksql` in your workspace to enter the CLI. | ||
* You can run this file on its own simply by running `python ksql.py` | ||
* Made a mistake in table creation? `DROP TABLE <your_table>`. If the CLI asks you to terminate a running query, you can `TERMINATE <query_name>` | ||
|
||
|
||
### Step 6: Create Kafka Consumers | ||
With all of the data in Kafka, our final task is to consume the data in the web server that is going to serve the transit status pages to our commuters. | ||
|
||
To accomplish this, you must complete the following tasks: | ||
|
||
1. Complete the code in `consumers/consumer.py` | ||
1. Complete the code in `consumers/models/line.py` | ||
1. Complete the code in `consumers/models/weather.py` | ||
1. Complete the code in `consumers/models/station.py` | ||
|
||
### Documentation | ||
In addition to the course content you have already reviewed, you may find the following examples and documentation helpful in completing this assignment: | ||
|
||
* [Confluent Python Client Documentation](https://docs.confluent.io/current/clients/confluent-kafka-python/#) | ||
* [Confluent Python Client Usage and Examples](https://github.com/confluentinc/confluent-kafka-python#usage) | ||
* [REST Proxy API Reference](https://docs.confluent.io/current/kafka-rest/api.html#post--topics-(string-topic_name)) | ||
* [Kafka Connect JDBC Source Connector Configuration Options](https://docs.confluent.io/current/connect/kafka-connect-jdbc/source-connector/source_config_options.html) | ||
|
||
## Directory Layout | ||
The project consists of two main directories, `producers` and `consumers`. | ||
|
||
The following directory layout indicates the files that the student is responsible for modifying by adding a `*` indicator. Instructions for what is required are present as comments in each file. | ||
|
||
``` | ||
* - Indicates that the student must complete the code in this file | ||
├── consumers | ||
│ ├── consumer.py * | ||
│ ├── faust_stream.py * | ||
│ ├── ksql.py * | ||
│ ├── models | ||
│ │ ├── lines.py | ||
│ │ ├── line.py * | ||
│ │ ├── station.py * | ||
│ │ └── weather.py * | ||
│ ├── requirements.txt | ||
│ ├── server.py | ||
│ ├── topic_check.py | ||
│ └── templates | ||
│ └── status.html | ||
└── producers | ||
├── connector.py * | ||
├── models | ||
│ ├── line.py | ||
│ ├── producer.py * | ||
│ ├── schemas | ||
│ │ ├── arrival_key.json | ||
│ │ ├── arrival_value.json * | ||
│ │ ├── turnstile_key.json | ||
│ │ ├── turnstile_value.json * | ||
│ │ ├── weather_key.json | ||
│ │ └── weather_value.json * | ||
│ ├── station.py * | ||
│ ├── train.py | ||
│ ├── turnstile.py * | ||
│ ├── turnstile_hardware.py | ||
│ └── weather.py * | ||
├── requirements.txt | ||
└── simulation.py | ||
``` | ||
|
||
## Running and Testing | ||
|
||
To run the simulation, you must first start up the Kafka ecosystem on their machine utilizing Docker Compose. | ||
|
||
```%> docker-compose up``` | ||
|
||
Docker compose will take a 3-5 minutes to start, depending on your hardware. Please be patient and wait for the docker-compose logs to slow down or stop before beginning the simulation. | ||
|
||
Once docker-compose is ready, the following services will be available: | ||
|
||
| Service | Host URL | Docker URL | Username | Password | | ||
| --- | --- | --- | --- | --- | | ||
| Public Transit Status | [http://localhost:8888](http://localhost:8888) | n/a | || | ||
| Landoop Kafka Connect UI | [http://localhost:8084](http://localhost:8084) | http://connect-ui:8084 | | ||
| Landoop Kafka Topics UI | [http://localhost:8085](http://localhost:8085) | http://topics-ui:8085 | | ||
| Landoop Schema Registry UI | [http://localhost:8086](http://localhost:8086) | http://schema-registry-ui:8086 | | ||
| Kafka | PLAINTEXT://localhost:9092,PLAINTEXT://localhost:9093,PLAINTEXT://localhost:9094 | PLAINTEXT://kafka0:9092,PLAINTEXT://kafka1:9093,PLAINTEXT://kafka2:9094 | | ||
| REST Proxy | [http://localhost:8082](http://localhost:8082/) | http://rest-proxy:8082/ | | ||
| Schema Registry | [http://localhost:8081](http://localhost:8081/ ) | http://schema-registry:8081/ | | ||
| Kafka Connect | [http://localhost:8083](http://localhost:8083) | http://kafka-connect:8083 | | ||
| KSQL | [http://localhost:8088](http://localhost:8088) | http://ksql:8088 | | ||
| PostgreSQL | `jdbc:postgresql://localhost:5432/cta` | `jdbc:postgresql://postgres:5432/cta` | `cta_admin` | `chicago` | | ||
|
||
Note that to access these services from your own machine, you will always use the `Host URL` column. | ||
|
||
When configuring services that run within Docker Compose, like **Kafka Connect you must use the Docker URL**. When you configure the JDBC Source Kafka Connector, for example, you will want to use the value from the `Docker URL` column. | ||
|
||
### Running the Simulation | ||
|
||
There are two pieces to the simulation, the `producer` and `consumer`. As you develop each piece of the code, it is recommended that you only run one piece of the project at a time. | ||
|
||
However, when you are ready to verify the end-to-end system prior to submission, it is critical that you open a terminal window for each piece and run them at the same time. **If you do not run both the producer and consumer at the same time you will not be able to successfully complete the project**. | ||
|
||
#### To run the `producer`: | ||
|
||
1. `cd producers` | ||
2. `virtualenv venv` | ||
3. `. venv/bin/activate` | ||
4. `pip install -r requirements.txt` | ||
5. `python simulation.py` | ||
|
||
Once the simulation is running, you may hit `Ctrl+C` at any time to exit. | ||
|
||
#### To run the Faust Stream Processing Application: | ||
1. `cd consumers` | ||
2. `virtualenv venv` | ||
3. `. venv/bin/activate` | ||
4. `pip install -r requirements.txt` | ||
5. `faust -A faust_stream worker -l info` | ||
|
||
|
||
#### To run the KSQL Creation Script: | ||
1. `cd consumers` | ||
2. `virtualenv venv` | ||
3. `. venv/bin/activate` | ||
4. `pip install -r requirements.txt` | ||
5. `python ksql.py` | ||
|
||
#### To run the `consumer`: | ||
|
||
** NOTE **: Do not run the consumer until you have reached Step 6! | ||
1. `cd consumers` | ||
2. `virtualenv venv` | ||
3. `. venv/bin/activate` | ||
4. `pip install -r requirements.txt` | ||
5. `python server.py` | ||
|
||
Once the server is running, you may hit `Ctrl+C` at any time to exit. |
Empty file.
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,104 @@ | ||
"""Defines core consumer functionality""" | ||
import logging | ||
|
||
import confluent_kafka | ||
from confluent_kafka import Consumer | ||
from confluent_kafka.avro import AvroConsumer | ||
from confluent_kafka.avro.serializer import SerializerError | ||
from tornado import gen | ||
|
||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
class KafkaConsumer: | ||
"""Defines the base kafka consumer class""" | ||
|
||
def __init__( | ||
self, | ||
topic_name_pattern, | ||
message_handler, | ||
is_avro=True, | ||
offset_earliest=False, | ||
sleep_secs=1.0, | ||
consume_timeout=0.1, | ||
): | ||
"""Creates a consumer object for asynchronous use""" | ||
self.topic_name_pattern = topic_name_pattern | ||
self.message_handler = message_handler | ||
self.sleep_secs = sleep_secs | ||
self.consume_timeout = consume_timeout | ||
self.offset_earliest = offset_earliest | ||
|
||
# | ||
# | ||
# TODO: Configure the broker properties below. Make sure to reference the project README | ||
# and use the Host URL for Kafka and Schema Registry! | ||
# | ||
# | ||
self.broker_properties = { | ||
# | ||
# TODO | ||
# | ||
} | ||
|
||
# TODO: Create the Consumer, using the appropriate type. | ||
if is_avro is True: | ||
self.broker_properties["schema.registry.url"] = "http://localhost:8081" | ||
#self.consumer = AvroConsumer(...) | ||
else: | ||
#self.consumer = Consumer(...) | ||
pass | ||
|
||
# | ||
# | ||
# TODO: Configure the AvroConsumer and subscribe to the topics. Make sure to think about | ||
# how the `on_assign` callback should be invoked. | ||
# | ||
# | ||
# self.consumer.subscribe( TODO ) | ||
|
||
def on_assign(self, consumer, partitions): | ||
"""Callback for when topic assignment takes place""" | ||
# TODO: If the topic is configured to use `offset_earliest` set the partition offset to | ||
# the beginning or earliest | ||
logger.info("on_assign is incomplete - skipping") | ||
for partition in partitions: | ||
pass | ||
# | ||
# | ||
# TODO | ||
# | ||
# | ||
|
||
logger.info("partitions assigned for %s", self.topic_name_pattern) | ||
consumer.assign(partitions) | ||
|
||
async def consume(self): | ||
"""Asynchronously consumes data from kafka topic""" | ||
while True: | ||
num_results = 1 | ||
while num_results > 0: | ||
num_results = self._consume() | ||
await gen.sleep(self.sleep_secs) | ||
|
||
def _consume(self): | ||
"""Polls for a message. Returns 1 if a message was received, 0 otherwise""" | ||
# | ||
# | ||
# TODO: Poll Kafka for messages. Make sure to handle any errors or exceptions. | ||
# Additionally, make sure you return 1 when a message is processed, and 0 when no message | ||
# is retrieved. | ||
# | ||
# | ||
logger.info("_consume is incomplete - skipping") | ||
return 0 | ||
|
||
|
||
def close(self): | ||
"""Cleans up any open kafka consumers""" | ||
# | ||
# | ||
# TODO: Cleanup the kafka consumer | ||
# | ||
# |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,58 @@ | ||
"""Defines trends calculations for stations""" | ||
import logging | ||
|
||
import faust | ||
|
||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
# Faust will ingest records from Kafka in this format | ||
class Station(faust.Record): | ||
stop_id: int | ||
direction_id: str | ||
stop_name: str | ||
station_name: str | ||
station_descriptive_name: str | ||
station_id: int | ||
order: int | ||
red: bool | ||
blue: bool | ||
green: bool | ||
|
||
|
||
# Faust will produce records to Kafka in this format | ||
class TransformedStation(faust.Record): | ||
station_id: int | ||
station_name: str | ||
order: int | ||
line: str | ||
|
||
|
||
# TODO: Define a Faust Stream that ingests data from the Kafka Connect stations topic and | ||
# places it into a new topic with only the necessary information. | ||
app = faust.App("stations-stream", broker="kafka://localhost:9092", store="memory://") | ||
# TODO: Define the input Kafka Topic. Hint: What topic did Kafka Connect output to? | ||
# topic = app.topic("TODO", value_type=Station) | ||
# TODO: Define the output Kafka Topic | ||
# out_topic = app.topic("TODO", partitions=1) | ||
# TODO: Define a Faust Table | ||
#table = app.Table( | ||
# # "TODO", | ||
# # default=TODO, | ||
# partitions=1, | ||
# changelog_topic=out_topic, | ||
#) | ||
|
||
|
||
# | ||
# | ||
# TODO: Using Faust, transform input `Station` records into `TransformedStation` records. Note that | ||
# "line" is the color of the station. So if the `Station` record has the field `red` set to true, | ||
# then you would set the `line` of the `TransformedStation` record to the string `"red"` | ||
# | ||
# | ||
|
||
|
||
if __name__ == "__main__": | ||
app.main() |
Oops, something went wrong.