Skip to content

Commit ac087e4

Browse files
committed
kafka-producer-consumer basics
1 parent c8fd40b commit ac087e4

File tree

14 files changed

+218
-128
lines changed

14 files changed

+218
-128
lines changed

.gitignore

Lines changed: 1 addition & 128 deletions
Original file line numberDiff line numberDiff line change
@@ -1,129 +1,2 @@
1-
# Byte-compiled / optimized / DLL files
2-
__pycache__/
3-
*.py[cod]
4-
*$py.class
1+
.idea
52

6-
# C extensions
7-
*.so
8-
9-
# Distribution / packaging
10-
.Python
11-
build/
12-
develop-eggs/
13-
dist/
14-
downloads/
15-
eggs/
16-
.eggs/
17-
lib/
18-
lib64/
19-
parts/
20-
sdist/
21-
var/
22-
wheels/
23-
pip-wheel-metadata/
24-
share/python-wheels/
25-
*.egg-info/
26-
.installed.cfg
27-
*.egg
28-
MANIFEST
29-
30-
# PyInstaller
31-
# Usually these files are written by a python script from a template
32-
# before PyInstaller builds the exe, so as to inject date/other infos into it.
33-
*.manifest
34-
*.spec
35-
36-
# Installer logs
37-
pip-log.txt
38-
pip-delete-this-directory.txt
39-
40-
# Unit test / coverage reports
41-
htmlcov/
42-
.tox/
43-
.nox/
44-
.coverage
45-
.coverage.*
46-
.cache
47-
nosetests.xml
48-
coverage.xml
49-
*.cover
50-
*.py,cover
51-
.hypothesis/
52-
.pytest_cache/
53-
54-
# Translations
55-
*.mo
56-
*.pot
57-
58-
# Django stuff:
59-
*.log
60-
local_settings.py
61-
db.sqlite3
62-
db.sqlite3-journal
63-
64-
# Flask stuff:
65-
instance/
66-
.webassets-cache
67-
68-
# Scrapy stuff:
69-
.scrapy
70-
71-
# Sphinx documentation
72-
docs/_build/
73-
74-
# PyBuilder
75-
target/
76-
77-
# Jupyter Notebook
78-
.ipynb_checkpoints
79-
80-
# IPython
81-
profile_default/
82-
ipython_config.py
83-
84-
# pyenv
85-
.python-version
86-
87-
# pipenv
88-
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
89-
# However, in case of collaboration, if having platform-specific dependencies or dependencies
90-
# having no cross-platform support, pipenv may install dependencies that don't work, or not
91-
# install all needed dependencies.
92-
#Pipfile.lock
93-
94-
# PEP 582; used by e.g. github.com/David-OConnor/pyflow
95-
__pypackages__/
96-
97-
# Celery stuff
98-
celerybeat-schedule
99-
celerybeat.pid
100-
101-
# SageMath parsed files
102-
*.sage.py
103-
104-
# Environments
105-
.env
106-
.venv
107-
env/
108-
venv/
109-
ENV/
110-
env.bak/
111-
venv.bak/
112-
113-
# Spyder project settings
114-
.spyderproject
115-
.spyproject
116-
117-
# Rope project settings
118-
.ropeproject
119-
120-
# mkdocs documentation
121-
/site
122-
123-
# mypy
124-
.mypy_cache/
125-
.dmypy.json
126-
dmypy.json
127-
128-
# Pyre type checker
129-
.pyre/

README.md

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,21 @@
11
# python-elasticsearch-with-kafka
22
Python Restful service with elasticsearch and kafka integration
3+
4+
5+
1) Start Zookeeper node instance
6+
linux > bin/zookeeper-server-start.sh config/zookeeper.properties
7+
windows > bin\windows\zookeeper-server-start.bat config\zookeeper.properties
8+
9+
2) Start Kafka server
10+
linux > bin/kafka-server-start.sh config/server.properties
11+
windows > bin\windows\kafka-server-start.bat config\server.properties
12+
13+
3) Once kafka service is up create the kafka topic
14+
> bin/kafka-topic.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kafka-message-topic
15+
16+
4) consuming test message
17+
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic kafka-message-topic --from-beginning
18+
19+
4) produce test message
20+
> bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic kafka-message-topic
21+
> Hello Listeners

quick-data/app/.gitignore

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
# Byte-compiled / optimized / DLL files
2+
__pycache__/
3+
*.py[cod]
4+
*$py.class
5+
6+
# C extensions
7+
*.so
8+
9+
# Distribution / packaging
10+
.Python
11+
build/
12+
develop-eggs/
13+
dist/
14+
downloads/
15+
eggs/
16+
.eggs/
17+
lib/
18+
lib64/
19+
parts/
20+
sdist/
21+
var/
22+
wheels/
23+
pip-wheel-metadata/
24+
share/python-wheels/
25+
*.egg-info/
26+
.installed.cfg
27+
*.egg
28+
MANIFEST
29+
30+
# PyInstaller
31+
# Usually these files are written by a python script from a template
32+
# before PyInstaller builds the exe, so as to inject date/other infos into it.
33+
*.manifest
34+
*.spec
35+
36+
# Installer logs
37+
pip-log.txt
38+
pip-delete-this-directory.txt
39+
40+
# Unit test / coverage reports
41+
htmlcov/
42+
.tox/
43+
.nox/
44+
.coverage
45+
.coverage.*
46+
.cache
47+
nosetests.xml
48+
coverage.xml
49+
*.cover
50+
*.py,cover
51+
.hypothesis/
52+
.pytest_cache/
53+
54+
# Translations
55+
*.mo
56+
*.pot
57+
58+
# Django stuff:
59+
*.log
60+
local_settings.py
61+
db.sqlite3
62+
db.sqlite3-journal
63+
64+
# Flask stuff:
65+
instance/
66+
.webassets-cache
67+
68+
# Scrapy stuff:
69+
.scrapy
70+
71+
# Sphinx documentation
72+
docs/_build/
73+
74+
# PyBuilder
75+
target/
76+
77+
# Jupyter Notebook
78+
.ipynb_checkpoints
79+
80+
# IPython
81+
profile_default/
82+
ipython_config.py
83+
84+
# pyenv
85+
.python-version
86+
87+
# pipenv
88+
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
89+
# However, in case of collaboration, if having platform-specific dependencies or dependencies
90+
# having no cross-platform support, pipenv may install dependencies that don't work, or not
91+
# install all needed dependencies.
92+
#Pipfile.lock
93+
94+
# PEP 582; used by e.g. github.com/David-OConnor/pyflow
95+
__pypackages__/
96+
97+
# Celery stuff
98+
celerybeat-schedule
99+
celerybeat.pid
100+
101+
# SageMath parsed files
102+
*.sage.py
103+
104+
# Environments
105+
.env
106+
.venv
107+
env/
108+
venv/
109+
ENV/
110+
env.bak/
111+
venv.bak/
112+
*.iml
113+
114+
# Spyder project settings
115+
.spyderproject
116+
.spyproject
117+
118+
# Rope project settings
119+
.ropeproject
120+
121+
# mkdocs documentation
122+
/site
123+
124+
# mypy
125+
.mypy_cache/
126+
.dmypy.json
127+
dmypy.json
128+
129+
# Pyre type checker
130+
.pyre/

quick-data/app/requirements.txt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
requests
2+
starlette
3+
pydantic
4+
fastapi
5+
uvicorn
6+
kafka-python

quick-data/app/src/__init__.py

Whitespace-only changes.

quick-data/app/src/config/__init__.py

Whitespace-only changes.
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
2+
3+
bootstrap_servers = ['localhost:9092']
4+
topic_name = 'python-kafka-topic'
5+
group_id = 'python-kafka-group'

quick-data/app/src/main.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
2+
3+
def init_app():
4+
print('Hello World!')
5+
6+
7+
8+
if __name__ == "__main__":
9+
init_app()

quick-data/app/src/model/__init__.py

Whitespace-only changes.

quick-data/app/src/model/data_model.py

Whitespace-only changes.

quick-data/app/src/service/__init__.py

Whitespace-only changes.
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
from kafka import KafkaConsumer
2+
from kafka.errors import KafkaError
3+
import json
4+
5+
bootstrap_servers = ['localhost:9092']
6+
topic_name = 'python-kafka-topic'
7+
group_id = 'python-kafka-group'
8+
9+
consumer_str = KafkaConsumer(topic_name, group_id=group_id, bootstrap_servers=bootstrap_servers)
10+
consumer_json = KafkaConsumer(topic_name, bootstrap_servers=bootstrap_servers,
11+
value_deserializer=lambda m: json.loads(m.decode('utf-8')))
12+
13+
consumer = KafkaConsumer(topic_name, bootstrap_servers= bootstrap_servers, auto_offset_reset='earliest',
14+
enable_auto_commit=True, group_id=group_id,
15+
value_deserializer=lambda x: json.loads(x.decode('utf-8')))
16+
17+
18+
def consume_str_message():
19+
for msg in consumer_str:
20+
print("Topic Name=%s,Message=%s" % (msg.topic, msg.value))
21+
22+
23+
def consume_json_message():
24+
for msg in consumer_json:
25+
print("Topic Name=%s,Message=%s" % (msg.topic, msg.value))
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
from kafka import KafkaProducer
2+
from kafka.errors import KafkaError
3+
import json
4+
5+
bootstrap_servers = ['localhost:9092']
6+
topic_name = 'python-kafka-topic'
7+
8+
producer_str = KafkaProducer(bootstrap_servers=bootstrap_servers)
9+
producer_json = KafkaProducer(bootstrap_servers=bootstrap_servers,
10+
value_serializer=lambda v: json.dumps(v).encode('utf-8'))
11+
12+
13+
def publish_message():
14+
future = producer_str.send(topic_name, b'Hello from kafka...')
15+
try:
16+
record_metadata = future.get(timeout=10)
17+
except KafkaError as err:
18+
print(f"An exception happened {err}")
19+
20+
21+
def publish_json():
22+
json_data = {'name': 'python-kafka-client', 'website': 'smallintro.github.io'}
23+
producer_json.send(topic_name, json_data)

quick-data/app/test/__init__.py

Whitespace-only changes.

0 commit comments

Comments
 (0)