-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathapp.py
226 lines (181 loc) · 7.39 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
import json
import boto3
import botocore
import pandas as pd
import wikipedia
import boto3
from io import StringIO
from botocore.exceptions import ClientError
#S3 BUCKET
REGION = "us-east-2"
BUCKET = "faangsentiment"
#SETUP LOGGING
import logging
from pythonjsonlogger import jsonlogger
LOG = logging.getLogger()
LOG.setLevel(logging.DEBUG)
logHandler = logging.StreamHandler()
formatter = jsonlogger.JsonFormatter()
logHandler.setFormatter(formatter)
LOG.addHandler(logHandler)
### SQS Utils###
def sqs_queue_resource(queue_name):
"""Returns an SQS queue resource connection
Usage example:
In [2]: queue = sqs_queue_resource("dev-job-24910")
In [4]: queue.attributes
Out[4]:
{'ApproximateNumberOfMessages': '0',
'ApproximateNumberOfMessagesDelayed': '0',
'ApproximateNumberOfMessagesNotVisible': '0',
'CreatedTimestamp': '1476240132',
'DelaySeconds': '0',
'LastModifiedTimestamp': '1476240132',
'MaximumMessageSize': '262144',
'MessageRetentionPeriod': '345600',
'QueueArn': 'arn:aws:sqs:us-west-2:414930948375:dev-job-24910',
'ReceiveMessageWaitTimeSeconds': '0',
'VisibilityTimeout': '120'}
"""
sqs_resource = boto3.resource('sqs', region_name=REGION)
log_sqs_resource_msg = "Creating SQS resource conn with qname: [%s] in region: [%s]" %\
(queue_name, REGION)
LOG.info(log_sqs_resource_msg)
queue = sqs_resource.get_queue_by_name(QueueName=queue_name)
return queue
def sqs_connection():
"""Creates an SQS Connection which defaults to global var REGION"""
sqs_client = boto3.client("sqs", region_name=REGION)
log_sqs_client_msg = "Creating SQS connection in Region: [%s]" % REGION
LOG.info(log_sqs_client_msg)
return sqs_client
def sqs_approximate_count(queue_name):
"""Return an approximate count of messages left in queue"""
queue = sqs_queue_resource(queue_name)
attr = queue.attributes
num_message = int(attr['ApproximateNumberOfMessages'])
num_message_not_visible = int(attr['ApproximateNumberOfMessagesNotVisible'])
queue_value = sum([num_message, num_message_not_visible])
sum_msg = """'ApproximateNumberOfMessages' and 'ApproximateNumberOfMessagesNotVisible' = *** [%s] *** for QUEUE NAME: [%s]""" %\
(queue_value, queue_name)
LOG.info(sum_msg)
return queue_value
def delete_sqs_msg(queue_name, receipt_handle):
sqs_client = sqs_connection()
try:
queue_url = sqs_client.get_queue_url(QueueName=queue_name)["QueueUrl"]
delete_log_msg = "Deleting msg with ReceiptHandle %s" % receipt_handle
LOG.info(delete_log_msg)
response = sqs_client.delete_message(QueueUrl=queue_url, ReceiptHandle=receipt_handle)
except botocore.exceptions.ClientError as error:
exception_msg = "FAILURE TO DELETE SQS MSG: Queue Name [%s] with error: [%s]" %\
(queue_name, error)
LOG.exception(exception_msg)
return None
delete_log_msg_resp = "Response from delete from queue: %s" % response
LOG.info(delete_log_msg_resp)
return response
### Wikipedia ###
def names_to_wikipedia(names):
wikipedia_snippit = []
for name in names:
wikipedia_snippit.append(wikipedia.summary(name, sentences=1, auto_suggest=False, redirect=True))
df = pd.DataFrame(
{
'names':names,
'wikipedia_snippit': wikipedia_snippit
}
)
return df
### Comprehend ###
def create_sentiment(row):
"""Uses AWS Comprehend to Create Sentiments on a DataFrame"""
LOG.info(f"Sentiment Detection Processing {row}")
comprehend = boto3.client(service_name='comprehend')
payload = comprehend.detect_sentiment(Text=row, LanguageCode='en')
LOG.debug(f"Found Sentiment: {payload}")
sentiment = payload['Sentiment']
return sentiment
def apply_sentiment(df, column="wikipedia_snippit"):
"""Uses Pandas Apply to Create Sentiment Analysis"""
df['Sentiment'] = df[column].apply(create_sentiment)
return df
def create_keyphrases(row):
"""Uses AWS Comprehend to Detect Key Phrases on a DataFrame"""
LOG.info(f"Key Phrases Detection Processing {row}")
comprehend = boto3.client(service_name='comprehend')
payload = comprehend.detect_key_phrases(Text=row, LanguageCode='en')
LOG.debug(f"Found Key Phrases: {payload}")
keyphrases = payload['KeyPhrases']
texts = []
for obj in keyphrases:
texts.append(obj['Text'])
LOG.debug(f"Found Text in Key Phrases: {texts}")
return texts
def apply_keyphrases(df, column="wikipedia_snippit"):
"""Uses Pandas Apply to Create Key Phrases Analysis"""
df['KeyPhrases'] = df[column].apply(create_keyphrases)
return df
def create_entities(row):
"""Uses AWS Comprehend to Detect Entities on a DataFrame"""
LOG.info(f"Entities Detection Processing {row}")
comprehend = boto3.client(service_name='comprehend')
payload = comprehend.detect_entities(Text=row, LanguageCode='en')
LOG.debug(f"Found Entities: {payload}")
entities = payload['Entities']
texts = []
for obj in entities:
texts.append(obj['Text'])
LOG.debug(f"Found Text in Entities: {texts}")
return texts
def apply_entities(df, column="wikipedia_snippit"):
"""Uses Pandas Apply to Create Entities Analysis"""
df['Entities'] = df[column].apply(create_entities)
return df
def apply_all_analysis(df):
apply_sentiment(df)
apply_keyphrases(df)
apply_entities(df)
return df
### S3 ###
def write_s3(df, bucket, name):
"""Write S3 Bucket"""
LOG.info(f"writing to s3 with name: {name}")
try:
csv_buffer = StringIO()
df.to_csv(csv_buffer)
s3_resource = boto3.resource('s3')
filename = f"{name}_sentiment.csv"
res = s3_resource.Object(bucket, filename).\
put(Body=csv_buffer.getvalue())
LOG.info(f"result of write to bucket: {bucket} with:\n {res}")
except ClientError as e:
LOG.info(f"Unexpected error {e}")
def lambda_handler(event, context):
"""Entry Point for Lambda"""
LOG.info(f"SURVEYJOB LAMBDA, event {event}, context {context}")
receipt_handle = event['Records'][0]['receiptHandle'] #sqs message
#'eventSourceARN': 'arn:aws:sqs:us-east-1:561744971673:producer'
event_source_arn = event['Records'][0]['eventSourceARN']
names = [] #Captured from Queue
# Process Queue
for record in event['Records']:
body = json.loads(record['body'])
company_name = body['name']
#Capture for processing
names.append(company_name)
extra_logging = {"body": body, "company_name":company_name}
LOG.info(f"SQS CONSUMER LAMBDA, splitting sqs arn with value: {event_source_arn}",extra=extra_logging)
qname = event_source_arn.split(":")[-1]
extra_logging["queue"] = qname
LOG.info(f"Attemping Deleting SQS receiptHandle {receipt_handle} with queue_name {qname}", extra=extra_logging)
res = delete_sqs_msg(queue_name=qname, receipt_handle=receipt_handle)
LOG.info(f"Deleted SQS receipt_handle {receipt_handle} with res {res}", extra=extra_logging)
# Make Pandas dataframe with wikipedia snippts
LOG.info(f"Creating dataframe with values: {names}")
df = names_to_wikipedia(names)
# Perform Comprehend Analysis
df = apply_all_analysis(df)
LOG.info(f"Comprehend analysis from FANG companies: {df.to_dict()}")
# Write result to S3
write_s3(df=df, bucket=BUCKET, name=names)