Skip to content

Commit

Permalink
Temp for sleeping
Browse files Browse the repository at this point in the history
  • Loading branch information
aneeshusa committed Sep 13, 2014
1 parent c09c1d5 commit 9fdff93
Show file tree
Hide file tree
Showing 11 changed files with 98 additions and 30 deletions.
14 changes: 14 additions & 0 deletions extractors/BaseExtractor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
class Extractor(object):
"""
Base class for email extractors. Subclass this class to make new extractors to extract new types of information. Each extractor should be responsible for extracting different pieces of information (no stable conflict resolution mechanism as of yet).
"""

def extract():
"""
Examine email content and metadata to extract information.
Argument: a dict with email metadata and raw email content.
Returns: a new dict with only the extracted data.
"""

raise NotImplementedError
3 changes: 3 additions & 0 deletions extractors/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from . import *

extractors = []
20 changes: 0 additions & 20 deletions helpers.py

This file was deleted.

4 changes: 2 additions & 2 deletions install.sh
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ venv_create() {
exit 0
else
echo "Creating venv"
virtualenv --no-site-packages venv
virtualenv2 --no-site-packages venv
echo "Venv successfully created"
fi
}
Expand All @@ -29,4 +29,4 @@ done

venv_create

pip install -r requirements.txt
pip2 install -r requirements.txt
14 changes: 14 additions & 0 deletions main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import time

from sqs_helpers import get_notifications, process_notifications
from pipeline import process_notification

MAX_NOTIFICATION_BATCH_SIZE=1

if __name__ == '__main__':
while True:
newest_notifications = get_notifications(MAX_NOTIFICATION_BATCH_SIZE)
if len(newest_notifications) > 0:
process_notifications(newest_notifications, process_notification)
time.sleep(20)

26 changes: 26 additions & 0 deletions mongo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from contextlib import contextmanager

from pymongo import MongoClient
from bson.objectid import ObjectId

client = MongoClient(os.environ.get('MONGO_URI'))
db = client[os.environ.get('MONGO_URI').split('/')[-1]]
users = db.users
raw_data = db.raw_data
processed_data = db.processed_data

@contextmanager
def get_raw_email(object_id):
raw_email = raw_data.find_one({'_id':ObjectId(objectId)})
if processed_data.find_one({'email_id':raw_email['id']}):
yield None
return
del raw_email['_id']
try:
yield raw_email
raw_data.remove({'_id':ObjectId(objectId)})
except Exception as e:
raise e

def store_processed_data(processed_data_dict):
processed_data.insert(processed_data_dict)
Empty file removed monitor.py
Empty file.
17 changes: 17 additions & 0 deletions pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from mongo import get_raw_email, store_processed_email
from extractors import extractors

def process_email(raw_email):
results = {}
results['email_id'] = raw_email['id']
for extractor in extractors:
results.update(extractor.extract(raw_email))

def process_notification(notification):
object_id = notification['object_id']
with get_raw_email(object_id) as raw_email:
if raw_email is None:
return
processed_email = process_email(raw_email)
store_processed_email(processed_email)
return processed_email
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
argparse==1.2.1
boto==2.32.1
pymongo==2.7.2
wsgiref==0.1.2
7 changes: 0 additions & 7 deletions remote.py → sqs_conn.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,6 @@
from pymongo import MongoClient
import boto, os
from boto.sqs.jsonmessage import JSONMessage

client = MongoClient(os.environ.get('MONGO_URI'))
db = client[os.environ.get('MONGO_URI').split('/')[-1]]
users = db.users
raw_data = db.raw_data
processed_data = db.processed_data

sqs_conn = boto.sqs.connect_to_region("us-east-1")
importer_queue = sqs_conn.get_queue(os.environ.get('SQS_QUEUE'))
importer_queue.set_message_class(JSONMessage)
20 changes: 20 additions & 0 deletions sqs_helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from sqs_conn import *

def send_to_queue(dict):
notification = JSONMessage()
notification.update(dict)
importer_queue.write(notification)

def get_notifications(num):
return importer_queue.get_notifications(num_notifications=n, wait_time_seconds=20)

def delete_notification(notification):
importer_queue.delete_notification(notification)

def delete_notifications(list_of_notifications):
importer_queue.delete_notification_batch(list_of_notifications)

def process_notifications(list_of_notifications, func):
for notification in list_of_notifications:
func(notification)
delete_notification(notification)

0 comments on commit 9fdff93

Please sign in to comment.