Skip to content

Commit 9b62aff

Browse files
committed
New data process api; adding indicators
1 parent b216afb commit 9b62aff

File tree

8 files changed

+181
-4
lines changed

8 files changed

+181
-4
lines changed

atx-data-processor.postman_collection.json

Lines changed: 43 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,7 @@
239239
"header": [],
240240
"body": {
241241
"mode": "raw",
242-
"raw": "{\n\t\"localCsvUrl\": \"~/Downloads/28-02-2020-TO-13-03-2020HDFCALLN.csv\",\n\t\"useWorkers\": true\n}",
242+
"raw": "{\n\t\"localCsvUrl\": \"~/Downloads/29-03-2018-TO-27-03-2020HDFCALL.csv\",\n\t\"useWorkers\": true\n}",
243243
"options": {
244244
"raw": {
245245
"language": "json"
@@ -260,6 +260,46 @@
260260
},
261261
"response": []
262262
},
263+
{
264+
"name": "post nse/process",
265+
"event": [
266+
{
267+
"listen": "prerequest",
268+
"script": {
269+
"id": "9c74e120-8216-44d4-80d3-b9ef99a6a87a",
270+
"exec": [
271+
""
272+
],
273+
"type": "text/javascript"
274+
}
275+
}
276+
],
277+
"request": {
278+
"method": "POST",
279+
"header": [],
280+
"body": {
281+
"mode": "raw",
282+
"raw": "{\n\t\"symbol\": \"HDFC\"\n}",
283+
"options": {
284+
"raw": {
285+
"language": "json"
286+
}
287+
}
288+
},
289+
"url": {
290+
"raw": "{{atx-data-processor-url}}/{{atx-data-processor-api}}/nse/process",
291+
"host": [
292+
"{{atx-data-processor-url}}"
293+
],
294+
"path": [
295+
"{{atx-data-processor-api}}",
296+
"nse",
297+
"process"
298+
]
299+
}
300+
},
301+
"response": []
302+
},
263303
{
264304
"name": "job/all/failed/remove",
265305
"event": [
@@ -526,13 +566,13 @@
526566
],
527567
"variable": [
528568
{
529-
"id": "a2ccee22-174b-4dac-9cb9-e2c9c28a5d76",
569+
"id": "b5a8fa65-5a22-44b0-ac95-8b2413146ec3",
530570
"key": "atx-data-processor-api",
531571
"value": "",
532572
"type": "string"
533573
},
534574
{
535-
"id": "d8562c3a-528c-4549-b9e2-4c9218b2dbb8",
575+
"id": "be297e39-4431-41d2-bf60-32633fa82fed",
536576
"key": "atx-data-processor-url",
537577
"value": "",
538578
"type": "string"

src/lib/__init__.py

Whitespace-only changes.

src/lib/technical_indicators.py

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
"""
2+
Indicators as shown by Peter Bakker at:
3+
https://www.quantopian.com/posts/technical-analysis-indicators-without-talib-code
4+
"""
5+
6+
# Import Built-Ins
7+
import logging
8+
9+
# Import Third-Party
10+
import pandas as pd
11+
import numpy as np
12+
13+
# Init Logging Facilities
14+
log = logging.getLogger(__name__)
15+
16+
17+
def moving_average(df, p, n):
18+
"""Calculate the moving average for the given data.
19+
20+
:param df: pandas.DataFrame
21+
:param p: String
22+
:param n: Integer
23+
:return: pandas.DataFrame
24+
"""
25+
MA = pd.Series(df[p].rolling(n, min_periods=n).mean(), name="MA_" + str(n))
26+
df = df.join(MA)
27+
return df
28+
29+
30+
def exponential_moving_average(df, p, n):
31+
"""
32+
33+
:param df: pandas.DataFrame
34+
:param p: String
35+
:param n: Integer
36+
:return: pandas.DataFrame
37+
"""
38+
EMA = pd.Series(df[p].ewm(span=n, min_periods=n).mean(), name="EMA_" + str(n))
39+
df = df.join(EMA)
40+
return df
41+
42+
43+
def momentum(df, p, n):
44+
"""
45+
46+
:param df: pandas.DataFrame
47+
:param p: String
48+
:param n: Integer
49+
:return: pandas.DataFrame
50+
"""
51+
M = pd.Series(df[p].diff(n), name="Momentum_" + str(n))
52+
df = df.join(M)
53+
return df

src/model/nse_processed_model.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
from sqlalchemy import Column, Float, TIMESTAMP, VARCHAR
2+
from sqlalchemy.ext.declarative import declarative_base
3+
4+
from src.model.nse_model import NseDailyDataModel
5+
6+
Base = declarative_base()
7+
8+
9+
class NseDailyDataProcessedModel(NseDailyDataModel):
10+
"""Model for Processed NSE Daily Data"""
11+
12+
__tablename__ = "nse_data_daily_processed"
13+
14+
ma = Column(Float)
15+
16+
def __repr__(self):
17+
return (
18+
"<NseDailyDataProcessedModel(timestamp='%s', open='%s', close='%s', high='%s', low='%s', volume='%s',"
19+
" symbol='%s', createdTime='%s', updatedTime='%s')>"
20+
% (
21+
self.timestamp,
22+
self.open,
23+
self.close,
24+
self.high,
25+
self.low,
26+
self.volume,
27+
self.symbol,
28+
self.created_time,
29+
self.updated_time,
30+
)
31+
)

src/repository/nse_repository.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import pandas as pd
2+
13
from datetime import datetime
24

35
from src.app import server
@@ -6,6 +8,7 @@
68
from src.repository.repository_response import generate_response
79

810
postgres = server.get_postgres()
11+
engine = postgres.get_engine()
912
session = postgres.get_session()
1013
log = Logger()
1114

@@ -75,3 +78,15 @@ def delete_one(data_id):
7578
return generate_response(200, "delete", True, repr(nse_data_to_be_deleted))
7679
else:
7780
return generate_response(200, "delete", False, None)
81+
82+
83+
def find_dataframe_by_symbol(symbol):
84+
return pd.read_sql_query(
85+
"select * from nse_data_daily where nse_data_daily.symbol = %(symbol)s",
86+
params={"symbol": symbol},
87+
con=engine,
88+
)
89+
90+
91+
def write_dataframe_in_sql(df):
92+
df.to_sql("book_details", con=engine, if_exists="append", chunksize=1000)

src/routes/nse/__init__.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,14 @@
66
NsePriceVolumeDeliverableData,
77
NseDataCsvParseRequest,
88
NseDataDeleteRequest,
9+
ProcessNseDataRequest,
910
)
1011
from src.services.nse_service import (
1112
create_nse_data_from_nse_pvd_data,
1213
create_nse_data_from_csv,
1314
delete_nse_data,
1415
upsert_nse_data_from_nse_pvd_data,
16+
process_nse_data,
1517
)
1618
from src.util import parse_request_using_schema
1719

@@ -46,3 +48,12 @@ def set_nse_pvd_data_csv():
4648
nse_data_csv_req = parse_request_using_schema(request, NseDataCsvParseRequest())
4749
service_resp = create_nse_data_from_csv(nse_data_csv_req)
4850
return make_response(jsonify(service_resp), 200)
51+
52+
53+
@api.route("/process", methods=["POST"])
54+
def process_nse_pvd_data():
55+
process_nse_pvd_data_request = parse_request_using_schema(
56+
request, ProcessNseDataRequest()
57+
)
58+
service_resp = process_nse_data(process_nse_pvd_data_request)
59+
return make_response(jsonify(service_resp), service_resp["status"])

src/routes/nse/schema.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,3 +57,10 @@ class NseDataCsvParseRequest(Schema):
5757

5858
class Meta:
5959
strict = True
60+
61+
62+
class ProcessNseDataRequest(Schema):
63+
symbol = fields.Str(required=True)
64+
65+
class Meta:
66+
strict = True

src/services/nse_service.py

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import src.repository.nse_repository as nse_repo
44
from src.common import Logger
5+
from src.lib.technical_indicators import moving_average, exponential_moving_average
56
from src.routes.nse import NsePriceVolumeDeliverableData
67
from src.routes.nse.transformer import get_nse_daily_data_model_from_nse_pvd_data
78
from src.services.job_service import enqueue_job, create_batch_job
@@ -77,4 +78,23 @@ def parse_one_row_of_nse_data_csv(index, row):
7778
nse_data_dict["is_valid_data"] = False
7879

7980
nse_data = nse_pvd_schema.dump(nse_data_dict)
80-
upsert_nse_data_from_nse_pvd_data(nse_data)
81+
return upsert_nse_data_from_nse_pvd_data(nse_data)
82+
83+
84+
def process_nse_data(process_nse_pvd_data_request):
85+
symbol = process_nse_pvd_data_request["symbol"]
86+
df = nse_repo.find_dataframe_by_symbol(symbol)
87+
enqueue_job(process_and_insert_dataframe_in_database, "high", tuple([df]))
88+
return {"success": True, "status": 200}
89+
90+
91+
def process_and_insert_dataframe_in_database(df):
92+
log.info(f"process_and_insert_dataframe_in_database")
93+
# TODO: schema
94+
# TODO: add data in schema
95+
df = moving_average(df, "close", 20)
96+
df = exponential_moving_average(df, "close", 20)
97+
df = df.fillna(0)
98+
print(df)
99+
# TODO: save to database
100+
return 1

0 commit comments

Comments
 (0)