Skip to content

Commit d5e2fa0

Browse files
committed
now working with images(webcam streaming)
1 parent a85ed1c commit d5e2fa0

File tree

8 files changed

+169
-34
lines changed

8 files changed

+169
-34
lines changed

.gitignore

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
11
.DS_Store
22

3-
myenv
3+
myenv
4+
processed_frames
5+
videos

README.md

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,9 @@
66

77
Проект включает три основных компонента:
88

9-
1. **Producer**: Генерирует данные заказов и отправляет их в Kafka.
10-
2. **Consumer**: Читает данные заказов из Kafka и сохраняет их в PostgreSQL.
11-
3. **Analyzer**: Анализирует данные заказов, хранящиеся в PostgreSQL.
9+
1. **Producer**: Захватывает кадры с веб-камеры и отправляет их в Kafka.
10+
2. **Consumer**: Читает кадры из Kafka, обрабатывает их (преобразует в оттенки серого) и сохраняет в PostgreSQL.
11+
3. **Analyzer**: Анализирует кадры, хранящиеся в PostgreSQL, и может создавать видео из этих кадров.
1212

1313
## Установка
1414

@@ -36,4 +36,5 @@ source myenv/bin/activate && python consumer.py
3636

3737
```sh
3838
source myenv/bin/activate && python analyzer.py
39-
```
39+
```
40+

analyzer.py

Lines changed: 100 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,11 @@
11
import psycopg2
2+
import base64
3+
from PIL import Image
4+
from io import BytesIO
5+
import cv2
6+
import numpy as np
7+
import os
8+
from datetime import datetime
29

310
conn = psycopg2.connect(
411
dbname='orders_db',
@@ -9,6 +16,96 @@
916
)
1017
cursor = conn.cursor()
1118

12-
cursor.execute("SELECT COUNT(*), SUM(amount) FROM orders")
13-
count, total_amount = cursor.fetchone()
14-
print(f"Total Orders: {count}, Total Amount: {total_amount}")
19+
def list_frames():
20+
cursor.execute("SELECT frame_id, frame_data, timestamp FROM frames")
21+
rows = cursor.fetchall()
22+
23+
for row in rows:
24+
new_row = []
25+
for value in row:
26+
if isinstance(value, str) and len(value) > 10:
27+
new_row.append(value[-10:] + "...")
28+
else:
29+
new_row.append(value)
30+
print(new_row)
31+
32+
print("The number of frames: ", cursor.rowcount)
33+
34+
def display_frame(frame_id):
35+
cursor.execute("SELECT frame_data FROM frames WHERE frame_id = %s", (frame_id,))
36+
row = cursor.fetchone()
37+
38+
if row is not None:
39+
frame_data = row[0]
40+
frame_bytes = base64.b64decode(frame_data)
41+
42+
nparr = np.frombuffer(frame_bytes, np.uint8)
43+
img = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
44+
45+
img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
46+
img_pil = Image.fromarray(img_rgb)
47+
48+
img_pil.show()
49+
else:
50+
print("No frame found with the given frame_id")
51+
52+
def create_video_from_frames(output_path, fps=1):
53+
cursor.execute("SELECT frame_data FROM frames ORDER BY timestamp ASC")
54+
rows = cursor.fetchall()
55+
56+
if not rows:
57+
print("No frames found in the database.")
58+
return
59+
60+
frame_list = []
61+
for row in rows:
62+
frame_data = row[0]
63+
frame_bytes = base64.b64decode(frame_data)
64+
65+
nparr = np.frombuffer(frame_bytes, np.uint8)
66+
img = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
67+
68+
frame_list.append(img)
69+
70+
height, width = frame_list[0].shape[:2]
71+
# добавить когда-то проверку на размеры фреймов. если вдруг разные разрешения кадров будут
72+
# print(frame_list[1].shape)
73+
74+
out = cv2.VideoWriter(output_path, cv2.VideoWriter_fourcc(*'mp4v'), fps, (width, height))
75+
76+
for frame in frame_list:
77+
out.write(frame)
78+
79+
out.release()
80+
print(f"Video created at {output_path}")
81+
82+
def main():
83+
while True:
84+
print("Options: ")
85+
print("1. List frames")
86+
print("2. Display a frame")
87+
print("3. Create video from frames")
88+
print("4. Exit")
89+
90+
choice = input("Enter your choice: ")
91+
92+
if choice == '1':
93+
list_frames()
94+
elif choice == '2':
95+
frame_id = input("Enter frame_id: ")
96+
display_frame(frame_id)
97+
elif choice == '3':
98+
if not os.path.isdir('videos'):
99+
os.makedirs('videos')
100+
101+
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
102+
video_path = os.path.join(os.getcwd(), 'videos', f'output_video_{timestamp}.mp4')
103+
104+
create_video_from_frames(video_path)
105+
elif choice == '4':
106+
break
107+
else:
108+
print("Invalid choice. Try again.")
109+
110+
if __name__ == '__main__':
111+
main()

check.txt

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,12 @@ docker-compose up -d
77

88
docker exec -it kafka_postgres_learning-postgres-1 psql -U user -d orders_db
99

10+
docker exec -it kafka_postgres_learning-kafka-1 /bin/bash
11+
12+
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic webcam-frames --from-beginning
13+
14+
15+
1016
CREATE TABLE orders (
1117
order_id SERIAL PRIMARY KEY,
1218
user_id INT NOT NULL,
@@ -26,6 +32,9 @@ docker-compose down
2632
docker-compose ps
2733

2834
DROP TABLE orders;
35+
DROP TABLE frames;
36+
TRUNCATE frames;
37+
2938

3039
docker images -a -q
3140
docker images

consumer.py

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,17 @@
11
from kafka import KafkaConsumer
2+
import cv2
3+
import base64
24
import json
5+
import numpy as np
36
import psycopg2
7+
from io import BytesIO
48

59
consumer = KafkaConsumer(
6-
'orders',
10+
'frames',
711
bootstrap_servers='localhost:9092',
812
auto_offset_reset='earliest',
913
enable_auto_commit=True,
10-
group_id='order-group',
14+
group_id='frame-group',
1115
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
1216
)
1317

@@ -20,14 +24,25 @@
2024
)
2125
cursor = conn.cursor()
2226

23-
def save_order(order):
27+
def save_frame(frame_data, timestamp):
2428
cursor.execute(
25-
"INSERT INTO orders (user_id, amount, timestamp) VALUES (%s, %s, %s)",
26-
(order['user_id'], order['amount'], order['timestamp'])
29+
"INSERT INTO frames (frame_data, timestamp) VALUES (%s, %s)",
30+
(frame_data, timestamp)
2731
)
2832
conn.commit()
2933

3034
for message in consumer:
31-
order = message.value
32-
save_order(order)
33-
print(f"Consumed: {order}")
35+
frame_str = message.value['frame']
36+
frame_bytes = base64.b64decode(frame_str)
37+
38+
nparr = np.frombuffer(frame_bytes, np.uint8)
39+
img = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
40+
41+
gray_img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
42+
43+
_, buffer = cv2.imencode('.jpg', gray_img)
44+
frame_data = base64.b64encode(buffer).decode('utf-8')
45+
timestamp = message.value['timestamp']
46+
47+
save_frame(frame_data, timestamp)
48+
print(f"Consumed and saved frame with timestamp: {timestamp}")

producer.py

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,32 @@
11
from kafka import KafkaProducer
2-
import json
2+
import cv2
3+
import base64
34
import time
4-
import random
5+
import json
56

67
producer = KafkaProducer(
78
bootstrap_servers='localhost:9092',
89
value_serializer=lambda v: json.dumps(v).encode('utf-8')
910
)
1011

11-
def generate_order():
12-
order = {
13-
'user_id': random.randint(1, 100),
14-
'amount': round(random.uniform(10.0, 1000.0), 2),
15-
'timestamp': int(time.time())
16-
}
17-
return order
12+
cap = cv2.VideoCapture(0)
1813

19-
while True:
20-
order = generate_order()
21-
producer.send('orders', order)
22-
print(f"Produced: {order}")
14+
if not cap.isOpened():
15+
print("Error: Could not open video stream.")
16+
exit()
17+
18+
while cap.isOpened():
19+
ret, frame = cap.read()
20+
if not ret:
21+
print("Error: Could not read frame.")
22+
break
23+
24+
_, buffer = cv2.imencode('.jpg', frame)
25+
frame_bytes = base64.b64encode(buffer).decode('utf-8')
26+
message = {'frame': frame_bytes, 'timestamp': int(time.time())}
27+
28+
producer.send('frames', message)
29+
print(f"Produced: {message}")
2330
time.sleep(1)
31+
32+
cap.release()

requirements.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
11
kafka-python==2.0.2
2+
numpy==1.26.4
3+
opencv-python==4.10.0.82
4+
pillow==10.3.0
25
psycopg2-binary==2.9.9
36
six==1.16.0
47
SQLAlchemy==2.0.30

setup.sh

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,13 @@ done
2222
sleep 3
2323

2424
docker exec -i kafka_postgres_learning-postgres-1 psql -U user -d orders_db << EOF
25-
CREATE TABLE IF NOT EXISTS orders (
26-
order_id SERIAL PRIMARY KEY,
27-
user_id INT NOT NULL,
28-
amount DECIMAL NOT NULL,
25+
CREATE TABLE IF NOT EXISTS frames (
26+
frame_id SERIAL PRIMARY KEY,
27+
frame_data TEXT NOT NULL,
2928
timestamp BIGINT NOT NULL
3029
);
3130
EOF
3231

3332
# python producer.py &
3433
# python consumer.py &
35-
# python analyzer.py &
34+
# python analyzer.py &

0 commit comments

Comments
 (0)