Skip to content

Commit a85ed1c

Browse files
committed
release commit
0 parents  commit a85ed1c

File tree

9 files changed

+212
-0
lines changed

9 files changed

+212
-0
lines changed

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
.DS_Store
2+
3+
myenv

README.md

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
# Kafka-Postgres Learning Project
2+
3+
Этот проект создан для изучения работы с Apache Kafka и PostgreSQL с использованием Python.
4+
5+
## Описание
6+
7+
Проект включает три основных компонента:
8+
9+
1. **Producer**: Генерирует данные заказов и отправляет их в Kafka.
10+
2. **Consumer**: Читает данные заказов из Kafka и сохраняет их в PostgreSQL.
11+
3. **Analyzer**: Анализирует данные заказов, хранящиеся в PostgreSQL.
12+
13+
## Установка
14+
15+
### Bash
16+
17+
Запустите скрипт `setup.sh`, который автоматически создаст виртуальное окружение, установит необходимые зависимости из `requirements.txt`, настроит докер-контейнеры для Kafka, Zookeeper и PostgreSQL, а также проверит их состояние перед созданием таблицы в базе данных PostgreSQL.
18+
19+
Если Python установлен не в /usr/bin/python3, то измените файл `setup.sh`, заменив /usr/bin/python3 на путь к вашему python3.
20+
21+
```sh
22+
./setup.sh
23+
```
24+
25+
### Важно!
26+
27+
Для наглядности рекомендуется запускать `producer.py`, `consumer.py` и `analyzer.py` в отдельных терминалах.
28+
29+
```sh
30+
source myenv/bin/activate && python producer.py
31+
```
32+
33+
```sh
34+
source myenv/bin/activate && python consumer.py
35+
```
36+
37+
```sh
38+
source myenv/bin/activate && python analyzer.py
39+
```

analyzer.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
import psycopg2
2+
3+
conn = psycopg2.connect(
4+
dbname='orders_db',
5+
user='user',
6+
password='password',
7+
host='localhost',
8+
port=5433
9+
)
10+
cursor = conn.cursor()
11+
12+
cursor.execute("SELECT COUNT(*), SUM(amount) FROM orders")
13+
count, total_amount = cursor.fetchone()
14+
print(f"Total Orders: {count}, Total Amount: {total_amount}")

check.txt

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
Python 3.9.6
2+
virtualenv -p /usr/bin/python3 myenv
3+
source myenv/bin/activate
4+
5+
6+
docker-compose up -d
7+
8+
docker exec -it kafka_postgres_learning-postgres-1 psql -U user -d orders_db
9+
10+
CREATE TABLE orders (
11+
order_id SERIAL PRIMARY KEY,
12+
user_id INT NOT NULL,
13+
amount DECIMAL NOT NULL,
14+
timestamp BIGINT NOT NULL
15+
);
16+
17+
source myenv/bin/activate & python producer.py
18+
python consumer.py
19+
python analize.py
20+
21+
chmod +x setup.sh
22+
23+
24+
25+
docker-compose down
26+
docker-compose ps
27+
28+
DROP TABLE orders;
29+
30+
docker images -a -q
31+
docker images
32+
docker rmi $(docker images -a -q)

consumer.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
from kafka import KafkaConsumer
2+
import json
3+
import psycopg2
4+
5+
consumer = KafkaConsumer(
6+
'orders',
7+
bootstrap_servers='localhost:9092',
8+
auto_offset_reset='earliest',
9+
enable_auto_commit=True,
10+
group_id='order-group',
11+
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
12+
)
13+
14+
conn = psycopg2.connect(
15+
dbname='orders_db',
16+
user='user',
17+
password='password',
18+
host='localhost',
19+
port=5433
20+
)
21+
cursor = conn.cursor()
22+
23+
def save_order(order):
24+
cursor.execute(
25+
"INSERT INTO orders (user_id, amount, timestamp) VALUES (%s, %s, %s)",
26+
(order['user_id'], order['amount'], order['timestamp'])
27+
)
28+
conn.commit()
29+
30+
for message in consumer:
31+
order = message.value
32+
save_order(order)
33+
print(f"Consumed: {order}")

docker-compose.yml

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
version: '3.8'
2+
services:
3+
zookeeper:
4+
image: bitnami/zookeeper:latest
5+
environment:
6+
- ALLOW_ANONYMOUS_LOGIN=yes
7+
ports:
8+
- "2181:2181"
9+
10+
kafka:
11+
image: bitnami/kafka:latest
12+
environment:
13+
- KAFKA_BROKER_ID=1
14+
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
15+
- KAFKA_LISTENERS=PLAINTEXT://:9092
16+
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
17+
- ALLOW_PLAINTEXT_LISTENER=yes
18+
ports:
19+
- "9092:9092"
20+
21+
postgres:
22+
image: postgres:latest
23+
environment:
24+
POSTGRES_USER: user
25+
POSTGRES_PASSWORD: password
26+
POSTGRES_DB: orders_db
27+
ports:
28+
- "5433:5432"

producer.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
from kafka import KafkaProducer
2+
import json
3+
import time
4+
import random
5+
6+
producer = KafkaProducer(
7+
bootstrap_servers='localhost:9092',
8+
value_serializer=lambda v: json.dumps(v).encode('utf-8')
9+
)
10+
11+
def generate_order():
12+
order = {
13+
'user_id': random.randint(1, 100),
14+
'amount': round(random.uniform(10.0, 1000.0), 2),
15+
'timestamp': int(time.time())
16+
}
17+
return order
18+
19+
while True:
20+
order = generate_order()
21+
producer.send('orders', order)
22+
print(f"Produced: {order}")
23+
time.sleep(1)

requirements.txt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
kafka-python==2.0.2
2+
psycopg2-binary==2.9.9
3+
six==1.16.0
4+
SQLAlchemy==2.0.30
5+
typing_extensions==4.12.2

setup.sh

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
#!/bin/bash
2+
3+
set -e
4+
5+
virtualenv -p /usr/bin/python3 myenv
6+
source myenv/bin/activate
7+
8+
pip install -r requirements.txt
9+
10+
docker-compose up -d
11+
12+
echo "Waiting for containers to start..."
13+
containers=(kafka_postgres_learning-postgres-1 kafka_postgres_learning-zookeeper-1 kafka_postgres_learning-kafka-1)
14+
15+
for container in "${containers[@]}"; do
16+
while [ "$(docker inspect -f '{{.State.Running}}' $container)" != "true" ]; do
17+
echo "Waiting for container $container..."
18+
sleep 5
19+
done
20+
done
21+
22+
sleep 3
23+
24+
docker exec -i kafka_postgres_learning-postgres-1 psql -U user -d orders_db << EOF
25+
CREATE TABLE IF NOT EXISTS orders (
26+
order_id SERIAL PRIMARY KEY,
27+
user_id INT NOT NULL,
28+
amount DECIMAL NOT NULL,
29+
timestamp BIGINT NOT NULL
30+
);
31+
EOF
32+
33+
# python producer.py &
34+
# python consumer.py &
35+
# python analyzer.py &

0 commit comments

Comments
 (0)