Skip to content

Commit 33dcb42

Browse files
committed
Rest api added to store and fetch data from elasticsearch.
Kafka sends a message when there is an exception
1 parent 0d2ae36 commit 33dcb42

25 files changed

+359
-144
lines changed

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ Alternatively we can download the tar.gz package from the Download page and run
4545
## 5. Start Elasticsearch server
4646
Elasticsearch is a distributed, real-time, search analysis platform.
4747
Elasticsearch can store data in json format, and hence can be used as NoSQL database.
48-
> elasticsearch-7.11.1/bin/elasticsearch
48+
> bin/elasticsearch.bat
4949
5050
- index: An index is equivalent to database in relational database
5151
- mapping: A mapping is equivalent to schema in relational database
@@ -55,7 +55,7 @@ Elasticsearch can store data in json format, and hence can be used as NoSQL data
5555
> http://localhost:9200
5656

5757
6.2 Index APIs
58-
6.2.1 Create an index with name quick_data_index
58+
6.2.1 Create an index with name articles
5959
> PUT http://localhost:9200/articles
6060

6161
6.2.2 Query an index with name articles
File renamed without changes.

article-search/README.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
## article-search service
2+
3+
- This is a demo application to save, and search the article information using elasticsearch.
4+
- kafka is used to send error message in case of exception as an alarm.
5+
- article-service is made restful using FastAPI and uvicorn.
6+
- Read the [README](https://github.com/smallintro/python-elasticsearch-with-kafka/README) file to know how to set up the environment to run and test this application.
7+
- Run the main.py file to start the service.
8+
- Access the rest service at [127.0.0.1:8080/docs](http://127.0.0.1:8080/docs)
9+
File renamed without changes.

quick-data/app/requirements.txt renamed to article-search/app/requirements.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ starlette
33
pydantic
44
fastapi
55
uvicorn
6-
kafka-python
76
urllib3
87
certifi
8+
kafka-python
99
elasticsearch
File renamed without changes.

article-search/app/src/article_api.py

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
from fastapi import FastAPI
2+
from starlette import status
3+
import service.app_service as app_service
4+
from model.data_model import ArticleInfo, AppResponse
5+
from config.es_config import es_obj
6+
7+
app_v1 = FastAPI()
8+
9+
10+
@app_v1.post("/v1/article/save", status_code=status.HTTP_201_CREATED, response_model=AppResponse)
11+
def save_article_info(article: ArticleInfo):
12+
print(f"save_article_info: {article}")
13+
msg, data = app_service.save_article_info(article)
14+
return AppResponse(status=msg, data=data)
15+
16+
17+
@app_v1.get("/v1/article/{article_id}", status_code=status.HTTP_200_OK, response_model=AppResponse)
18+
def get_article_by_id(article_id: str):
19+
print(f"get_article_by_id: {article_id}")
20+
msg, data = app_service.get_article_by_id(article_id)
21+
return AppResponse(status=msg, data=data)
22+
23+
24+
@app_v1.post("/v1/article/find", status_code=status.HTTP_200_OK, response_model=AppResponse)
25+
def get_article_by_condition(article: ArticleInfo):
26+
print(f"get_all_article_info")
27+
msg, data = app_service.get_article_by_condition(article)
28+
return AppResponse(status=msg, data={'articles': data})
29+
30+
31+
@app_v1.get("/v1/article/", status_code=status.HTTP_200_OK, response_model=AppResponse)
32+
def get_all_articles():
33+
print(f"get_all_articles")
34+
msg, data = app_service.get_all_articles()
35+
return AppResponse(status=msg, data={'articles': data})
36+
37+
38+
@app_v1.delete("/v1/article/{article_id}", status_code=status.HTTP_200_OK, response_model=AppResponse)
39+
def del_article_by_id(article_id: str):
40+
print(f"del_article_by_id: {article_id}")
41+
msg, data = app_service.del_article_by_id(article_id)
42+
return AppResponse(status=msg, data={'result': data})
43+
44+
45+
# This gets called once the app is shutting down.
46+
@app_v1.on_event("shutdown")
47+
async def app_shutdown():
48+
print('closing elasticsearch connection')
49+
es_obj.close()
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
11
from elasticsearch import Elasticsearch
2-
import json
3-
import logging
42
import requests
53

64
_es_obj = None
5+
default_index = 'articles'
76

87

98
def init_elasticsearch():
@@ -13,6 +12,7 @@ def init_elasticsearch():
1312
print('Elasticsearch Connected')
1413
res = requests.get('http://localhost:9200')
1514
print(res.content)
15+
create_index()
1616
return True
1717
else:
1818
print('Elasticsearch not connected')
@@ -26,9 +26,8 @@ def es_obj():
2626
return _es_obj
2727

2828

29-
def create_index(index_name):
29+
def create_index(index_name=default_index):
3030
result = False
31-
# index settings
3231
settings = {
3332
"settings": {
3433
"number_of_shards": 1,
@@ -50,50 +49,11 @@ def create_index(index_name):
5049
# Ignore 400 means to ignore "Index Already Exist" error.
5150
es_obj().indices.create(index=index_name, ignore=400, body=settings)
5251
print(f'Created Index {index_name}')
52+
else:
53+
es_obj().indices.refresh(index=index_name)
5354
result = True
5455
except Exception as ex:
5556
print('Error while creating index: %s' % str(ex))
5657
finally:
5758
return result
5859

59-
60-
def add_doc_to_index(index_name, document_data):
61-
is_stored = True
62-
try:
63-
response = es_obj().index(index=index_name, doc_type='_doc', body=json.dumps(document_data))
64-
print(response)
65-
except Exception as ex:
66-
print('Error in indexing data: %s' % str(ex))
67-
is_stored = False
68-
finally:
69-
return is_stored
70-
71-
72-
def get_doc_from_index(index_name):
73-
response = None
74-
# https://www.elastic.co/guide/en/elasticsearch/reference/current/full-text-queries.html
75-
search_object = {'query': {'match': {'website': 'smallintro.github.io'}}}
76-
search_data = json.dumps(search_object)
77-
try:
78-
response = es_obj().search(index=index_name, body=search_data)
79-
print(response)
80-
except Exception as ex:
81-
print('Error in indexing data: %s' % str(ex))
82-
finally:
83-
return response
84-
85-
86-
if __name__ == "__main__":
87-
logging.basicConfig(level=logging.ERROR)
88-
_index_name = 'articles'
89-
article_data = {
90-
"author": "Sushil",
91-
"title": "Small intro to elasticsearch python API",
92-
"website": "smallintro.github.io",
93-
"publish_date": "2021-11-14",
94-
"has_video": True,
95-
}
96-
if init_elasticsearch:
97-
create_index(_index_name)
98-
add_doc_to_index(_index_name, article_data)
99-
get_doc_from_index(_index_name)
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
from kafka import KafkaProducer
2+
from kafka import KafkaConsumer
3+
import json
4+
5+
bootstrap_servers = ['localhost:9092']
6+
default_topic = 'python-kafka-topic'
7+
group_id = 'kafka-group-id'
8+
9+
_text_producer = None
10+
_text_consumer = None
11+
_producer = None
12+
_consumer = None
13+
14+
15+
def init_kafka_producer():
16+
global _text_producer, _producer
17+
_text_producer = KafkaProducer(bootstrap_servers=bootstrap_servers)
18+
_producer = KafkaProducer(bootstrap_servers=bootstrap_servers, value_serializer=lambda v: json.dumps(v).encode('utf-8'))
19+
print(f"init_kafka_producer finished")
20+
21+
22+
def init_kafka_consumer(topic_name=default_topic):
23+
global _text_consumer, _consumer
24+
_text_consumer = KafkaConsumer(topic_name, group_id=group_id, bootstrap_servers=bootstrap_servers)
25+
_consumer = KafkaConsumer(topic_name, group_id=group_id, bootstrap_servers=bootstrap_servers,
26+
auto_offset_reset='earliest', enable_auto_commit=True,
27+
value_deserializer=lambda x: json.loads(x.decode('utf-8')))
28+
print(f"init_kafka_consumer finished")
29+
30+
31+
def producer():
32+
global _producer
33+
if _producer is None:
34+
init_kafka_producer()
35+
return _producer
36+
37+
38+
def text_producer():
39+
global _text_producer
40+
if _text_producer is None:
41+
init_kafka_producer()
42+
return _text_producer
43+
44+
45+
def consumer():
46+
global _consumer
47+
if _consumer is None:
48+
init_kafka_consumer()
49+
return _consumer
50+
51+
52+
def text_consumer():
53+
global _text_consumer
54+
if _text_consumer is None:
55+
init_kafka_consumer()
56+
return _text_consumer

article-search/app/src/main.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
"""
2+
Created on 14-Nov-2021
3+
@author: Sushil Prasad
4+
"""
5+
import logging
6+
import uvicorn
7+
import threading
8+
from service.kafka_consumer import consume_message
9+
10+
11+
def init_app():
12+
logging.basicConfig(level=logging.ERROR)
13+
try:
14+
threading.Thread(target=consume_message).start()
15+
print('Hello World!')
16+
except Exception as ex:
17+
print(str(ex))
18+
19+
20+
if __name__ == "__main__":
21+
init_app()
22+
# uvicorn article_api:app --port 8080 --reload
23+
uvicorn.run("article_api:app_v1", host="127.0.0.1", port=8080, log_level="info")
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
from pydantic import BaseModel
2+
3+
4+
class ArticleInfo(BaseModel):
5+
author: str = None
6+
title: str = None
7+
website: str = None
8+
publish_date: str = None
9+
has_video: bool = False
10+
11+
12+
class AppResponse(BaseModel):
13+
status: str
14+
data: dict
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
import service.elasticsearch_service as es
2+
import service.kafka_producer as kfk
3+
from model.data_model import ArticleInfo
4+
5+
6+
def save_article_info(article: ArticleInfo):
7+
try:
8+
response = es.add_doc_to_index(article)
9+
if response.get('result') == 'created':
10+
return "success", {response['result']: response['_id']}
11+
else:
12+
return "failed", {response['result']: None}
13+
except Exception as ex:
14+
# send a failure message to generate alarm
15+
kfk.publish_message(str(ex))
16+
return "failed", str(ex)
17+
18+
19+
def get_article_by_id(article_id):
20+
try:
21+
data = {}
22+
response = es.get_article_by_id(article_id)
23+
for hits in response['hits']['hits']:
24+
data[hits['_id']] = hits['_source']
25+
return "success", data
26+
except Exception as ex:
27+
# send a failure message to generate alarm
28+
kfk.publish_message(str(ex))
29+
return "failed", str(ex)
30+
31+
32+
def get_all_articles():
33+
try:
34+
data = {}
35+
response = es.get_all_articles()
36+
for hits in response['hits']['hits']:
37+
data[hits['_id']] = hits['_source']
38+
return "success", data
39+
except Exception as ex:
40+
# send a failure message to generate alarm
41+
kfk.publish_message(str(ex))
42+
return "failed", str(ex)
43+
44+
45+
def del_article_by_id(article_id):
46+
try:
47+
response = es.delete_article_by_id(article_id)
48+
return "success", response['result']
49+
except Exception as ex:
50+
# send a failure message to generate alarm
51+
kfk.publish_message(str(ex))
52+
return "failed", str(ex)
53+
54+
55+
def get_article_by_condition(article: ArticleInfo):
56+
try:
57+
data = {}
58+
response = es.get_article_by_author(article.author)
59+
for hits in response['hits']['hits']:
60+
data[hits['_id']] = hits['_source']
61+
return "success", data
62+
except Exception as ex:
63+
# send a failure message to generate alarm
64+
kfk.publish_message(str(ex))
65+
return "failed", str(ex)

0 commit comments

Comments
 (0)