This repository has been archived by the owner on Jul 23, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathutil.py
234 lines (186 loc) · 10.2 KB
/
util.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
#!/usr/bin/python3
from __future__ import annotations
from typing import Dict
from functools import reduce
from itertools import chain
from os.path import dirname, exists
from os import mkdir
from time import localtime, time
from re import compile as reg_compile
try:
from model.post import PostOfficeGraph
from matplotlib import pyplot as plt
from matplotlib.ticker import MultipleLocator, FormatStrFormatter
except ImportError as e:
print('[!]Module Unavailable : {}'.format(str(e)))
exit(1)
'''
Takes a Dict[str, int], which is generated by function(s) defined below
( categorizes company dataset, for a certain State in India, using various parameters )
& a targetPath on local file system ( an image of `*.png` form ),
where to store this generated PIE chart.
'''
def plotCategorizedCompanyDataForACertainState(dataSet: Dict[str, int], targetPath: str, title: str) -> bool:
try:
if(not exists(dirname(targetPath))):
# creating target directory if not existing already
mkdir(dirname(targetPath))
font = {
'family': 'serif',
'color': '#264040',
'weight': 'normal',
'size': 12
}
# calculating total # of companies we're considering here, for a certain state
total = sum([dataSet[i] for i in dataSet])
_tmpLabels = sorted(dataSet, key=lambda e:
dataSet[e], reverse=True)
# now PIE chart's labels will include a percentage field too, in its legend
labels = ['{} ( {:.4f} % )'.format(
i, dataSet[i]*100/total) for i in _tmpLabels]
# this is the actual data to be plotted
data = [dataSet[i] for i in _tmpLabels]
# figure on which pie chart to be drawn ( of size 2400x1200 )
plt.figure(figsize=(24, 12), dpi=100)
patches, _ = plt.pie(data) # plotting pie chart
plt.legend(patches, labels, loc='best', fontsize='medium')
plt.title(title, fontdict=font)
plt.axis('equal')
plt.tight_layout()
plt.savefig(targetPath, bbox_inches='tight',
pad_inches=.5) # exporting plotted PIE chart
plt.close() # closing this figure on which we just plotted a PIE chart
return True
except Exception:
return False
'''
Takes a list of all companies present in one State ( instances of model.corporateStat.Company )
as argument & returns a Dict[str, int] holding count of all companies of a
certain state, categorzied as per their STATUS, which is to be used for plotting a PIE chart.
'''
def categorizeAsPerCompanyStatus(dataSet) -> Dict[str, int]:
return reduce(lambda acc, cur: dict([(cur.status, 1)] + [(k, v) for k, v in acc.items()]) if cur.status not in acc else dict(((k, v + 1) if k == cur.status else (k, v) for k, v in acc.items())), dataSet, {})
'''
Takes a list of all companies present in one State ( instances of model.corporateStat.Company )
as argument & returns a Dict[str, int] holding count of all companies of a
certain state, categorzied as per their CLASS, which is to be used for plotting a PIE chart.
'''
def categorizeAsPerCompanyClass(dataSet) -> Dict[str, int]:
return reduce(lambda acc, cur: dict([(cur.companyClass, 1)] + [(k, v) for k, v in acc.items()]) if cur.companyClass not in acc else dict(((k, v + 1) if k == cur.companyClass else (k, v) for k, v in acc.items())), dataSet, {})
'''
Takes a list of all companies present in one State ( instances of model.corporateStat.Company )
as argument & returns a Dict[str, int] holding count of all companies of a
certain state, categorzied as per their CATEGORY, which is to be used for plotting a PIE chart.
'''
def categorizeAsPerCompanyCategory(dataSet) -> Dict[str, int]:
return reduce(lambda acc, cur: dict([(cur.category, 1)] + [(k, v) for k, v in acc.items()]) if cur.category not in acc else dict(((k, v + 1) if k == cur.category else (k, v) for k, v in acc.items())), dataSet, {})
'''
Takes a list of all companies present in one State ( instances of model.corporateStat.Company )
as argument & returns a Dict[str, int] holding count of all companies of a
certain state, categorzied as per their SUB_CATEGORY, which is to be used for plotting a PIE chart.
'''
def categorizeAsPerCompanySubCategory(dataSet) -> Dict[str, int]:
return reduce(lambda acc, cur: dict([(cur.subCategory, 1)] + [(k, v) for k, v in acc.items()]) if cur.subCategory not in acc else dict(((k, v + 1) if k == cur.subCategory else (k, v) for k, v in acc.items())), dataSet, {})
'''
Takes a list of all companies present in one State ( instances of model.corporateStat.Company )
as argument & returns a Dict[str, int] holding count of all companies of a
certain state, categorzied as per their PRINCIPAL_BUSINESS_ACTIVITY, which is to be used for plotting a PIE chart.
'''
def categorizeAsPerCompanyPrincipalBusinessActivity(dataSet) -> Dict[str, int]:
return reduce(lambda acc, cur: dict([(cur.principalBusinessActivity, 1)] + [(k, v) for k, v in acc.items()]) if cur.principalBusinessActivity not in acc else dict(((k, v + 1) if k == cur.principalBusinessActivity else (k, v) for k, v in acc.items())), dataSet, {})
'''
Plots a graph of year of registration vs. #-of companies registered
in that certain year, while using dataset obtained from function defined just below it.
'''
def plotCompanyRegistrationDateWiseCategorizedData(dataSet: Dict[int, int], targetPath: str, title: str) -> bool:
try:
if(not exists(dirname(targetPath))):
# creating target directory if not existing already
mkdir(dirname(targetPath))
# style `ggplot` is in use
with plt.style.context('ggplot'):
font = {
'family': 'serif',
'color': '#264040',
'weight': 'normal',
'size': 12
}
# a range from `first when a company was registered` to `nearest year upto which we have any status`
# filtering out improper years ( may be higher than current year ), lets us clean dataset, so that things go smooth
x = range(min(dataSet),
max(filter(lambda v: v < (
localtime(time()).tm_year + 1), dataSet)) + 1)
y = [dataSet.get(i, 0) for i in x]
plt.figure(figsize=(24, 12), dpi=100)
# creating major x-tick locator every 10 years
plt.gca().xaxis.set_major_locator(MultipleLocator(10))
# creating x-tick formatter using only year name
plt.gca().xaxis.set_major_formatter(FormatStrFormatter('%d'))
# setting minor x-tick locator every 1 year
plt.gca().xaxis.set_minor_locator(MultipleLocator(1))
plt.plot(x, y, 'r-', lw=1.5)
plt.xlabel('Year', fontdict=font, labelpad=16)
plt.ylabel('# of Companies Registered', fontdict=font, labelpad=16)
plt.title(title, fontdict=font)
plt.tight_layout()
plt.savefig(targetPath, bbox_inches='tight', pad_inches=.5)
plt.close()
return True
except Exception:
return False
'''
Filters out those companies which has `dateOfRegistration` field None
& classifies remaining ones using year of registration
So finally we get a Dict[int, int], holding a mapping between
year of registration & #-of companies registered in that year,
which is going to be used by above function for plotting a graph.
This function is used in both case of processing individual states
& companies from all states across India
( actually we just chain them before invoking this function )
'''
def categorizeAsPerCompanyDateOfRegistration(dataSet) -> Dict[int, int]:
return reduce(lambda acc, cur: dict([(cur.dateOfRegistration.year, 1)] + [(k, v) for k, v in acc.items()]) if cur.dateOfRegistration.year not in acc else dict(((k, v + 1) if k == cur.dateOfRegistration.year else (k, v) for k, v in acc.items())),
filter(lambda v: v.dateOfRegistration is not None, dataSet), {})
'''
Takes an iterable of model.corporateStat.Company & classifies their
count using `Pincode of their Registered Address` ( extracted from Address field )
Finally a Dict[str, int], holding count of companies registered in different PinCode(s)
is returned
'''
def classifyCompaniesUsingPinCodeOfRegisteredAddress(dataStream: chain) -> Dict[str, int]:
def updateCounter(key: str, holder: Dict[str, int]) -> Dict[str, int]:
if key:
holder.update({key: holder.get(key, 0) + 1})
return holder
def extractPinCodeFromAddress(address: str) -> str:
matchObj = reg.search(address)
return matchObj.group() if matchObj else None
reg = reg_compile(r'(\d{6})') # pincode extraction regular expression
return reduce(lambda acc, cur:
updateCounter(
extractPinCodeFromAddress(
cur.registeredOfficeAddress), acc), dataStream, {})
'''
Converts a `Companies registered under a PinCode record` to
`Companies registered under each District of a certain State
( or may be for whole country ) based record`
'''
def pincodeToDistrictNameMapper(pincodes: Dict[str, int], poGraph: PostOfficeGraph) -> Dict[str, Dict[str, int]]:
def __updateCounter__(holder: Dict[str, Dict[str, int]], key: str) -> Dict[str, Dict[str, int]]:
postOffice = poGraph.findPostOfficeUsingPin(key)
if postOffice:
holder.update(
{
postOffice.stateName: holder.get(postOffice.stateName, {}).update(
{
postOffice.districtName: holder.get(postOffice.stateName, {}).get(
postOffice.districtName, 0) + pincodes.get(key, 0)
}
) # updating each district under each state, holding count of companies registered in that district under that certain state
}
) # updating parent dictionary, holding a dictionary for each state
return holder
return reduce(lambda acc, cur: __updateCounter__(acc, cur), pincodes, {})
if __name__ == '__main__':
print('[!]This module is expected to be used as a backend handler')
exit(0)