Skip to content
This repository has been archived by the owner on Mar 16, 2020. It is now read-only.

Commit

Permalink
Implemented Celery MQ (using Redis) on user details lookups.
Browse files Browse the repository at this point in the history
  • Loading branch information
peterbe committed Dec 12, 2011
1 parent f3077e2 commit 1d872f0
Show file tree
Hide file tree
Showing 10 changed files with 275 additions and 97 deletions.
10 changes: 9 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,12 @@ Running tests

Run this:

python bin/_run_tests.py --logging=error
$ python bin/_run_tests.py --logging=error


Running celeryd
---------------

Run celeryd like this:

$ celeryd --loglevel=INFO
8 changes: 7 additions & 1 deletion bin/run_shell.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
#!/usr/bin/env python

import code, re
import here
try:
import here
except ImportError:
import sys
import os.path as op
sys.path.insert(0, op.abspath(op.join(op.dirname(__file__), '..')))
import here

if __name__ == '__main__':

Expand Down
15 changes: 15 additions & 0 deletions celeryconfig.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import here
# http://docs.celeryproject.org/en/latest/tutorials/otherqueues.html#redis
BROKER_TRANSPORT = "redis"

import settings
BROKER_HOST = settings.REDIS_HOST
BROKER_PORT = settings.REDIS_PORT
BROKER_VHOST = "0" # Maps to database number.

CELERY_IGNORE_RESULT = True

CELERY_IMPORTS = ("tasks", )

import os
CELERY_ALWAYS_EAGER = bool(os.environ.get('ALWAYS_EAGER', False))
134 changes: 48 additions & 86 deletions handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from tornado.escape import json_decode, json_encode
from pymongo.objectid import InvalidId, ObjectId
import utils

import tasks
from models import User, Tweeter


Expand Down Expand Up @@ -61,50 +61,10 @@ def save_following(self, source_username, dest_username, result):
def save_tweeter_user(self, user):
user_id = user['id']
tweeter = self.db.Tweeter.find_one({'user_id': user_id})
_save = False
if not tweeter:
tweeter = self.db.Tweeter()
tweeter['user_id'] = user_id
_save = True

if tweeter['name'] != user['name']:
tweeter['name'] = user['name']
_save = True

if tweeter['username'] != user['screen_name']:
tweeter['username'] = user['screen_name']
_save = True

if tweeter['followers'] != user['followers_count']:
tweeter['followers'] = user['followers_count']
_save = True

if tweeter['following'] != user['friends_count']:
tweeter['following'] = user['friends_count']
_save = True

def parse_status_date(dstr):
dstr = re.sub('\+\d{1,4}', '', dstr)
return datetime.datetime.strptime(
dstr,
'%a %b %d %H:%M:%S %Y'
)
last_tweet_date = None
if 'status' in user:
last_tweet_date = user['status']['created_at']
last_tweet_date = parse_status_date(last_tweet_date)
if tweeter['last_tweet_date'] != last_tweet_date:
tweeter['last_tweet_date'] = last_tweet_date
_save = True

ratio_before = tweeter['ratio']
ratio = tweeter.set_ratio()
if ratio != ratio_before:
_save = True

if _save:
tweeter.save()

Tweeter.update_tweeter(tweeter, user)
return tweeter

def assert_tweeter_user(self, user):
Expand Down Expand Up @@ -520,53 +480,51 @@ def _fetch_info(self, options, username=None):
if username is None:
username = options['username']

key = 'info:%s' % username
value = self.redis.get(key)
def age(d):
return (datetime.datetime.utcnow() - d).seconds

if value is None:
user = self.db.User.find_one({'username': options['this_username']})
access_token = user['access_token']
tweeter = self.db.Tweeter.find_one({'username': username})
current_user = self.get_current_user()
if not tweeter:
access_token = current_user['access_token']
result = yield tornado.gen.Task(self.twitter_request,
"/users/show",
screen_name=username,
access_token=access_token)
if result:
self.save_tweeter_user(result)
else:
result = json_decode(value)
self.assert_tweeter_user(result)
key = None

if result is None:
tweeter = self.save_tweeter_user(result)
elif age(tweeter['modify_date']) > 3600:
tasks.refresh_user_info.delay(
username, current_user['access_token'])

if not tweeter:
options['error'] = "Unable to look up info for %s" % username
self._render(options)
return
if isinstance(result, basestring):
result = json_decode(result)
if key:
self.redis.setex(key, json_encode(result), 60 * 60)

if 'info' not in options:
options['info'] = {options['username']: result}
options['info'] = {options['username']: tweeter}
self._fetch_info(options, username=options['this_username'])
else:
options['info'][options['this_username']] = result
options['info'][options['this_username']] = tweeter
self._render(options)

def _render(self, options):
if 'error' not in options:
if options['follows']:
page_title = '%s follows me'
else:
page_title = '%s is too cool for me'
self._set_ratio(options, 'username')
self._set_ratio(options, 'this_username')
options['page_title'] = page_title % options['username']
options['perm_url'] = self.get_following_perm_url(
options['username'], options['this_username'])
self.render('following.html', **options)
else:
if 'error' in options:
options['page_title'] = 'Error :('
self.render('following_error.html', **options)
return

if options['follows']:
page_title = '%s follows me'
else:
page_title = '%s is too cool for me'
options['page_title'] = page_title % options['username']
options['perm_url'] = self.get_following_perm_url(
options['username'],
options['this_username']
)
self.render('following.html', **options)

def _set_ratio(self, options, key):
value = options[key]
Expand Down Expand Up @@ -680,11 +638,16 @@ class FollowingComparedtoHandler(FollowingHandler):
@tornado.gen.engine
def get(self, username, compared_to):
options = {'compared_to': compared_to}
tweeter = self.db.Tweeter.find_by_username(self.db, username)
compared_tweeter = self.db.Tweeter.find_by_username(self.db, compared_to)
tweeter = self.db.Tweeter.find_one({'username': username})
compared_tweeter = self.db.Tweeter.find_one({'username': compared_to})

def age(d):
return (datetime.datetime.utcnow() - d).seconds


current_user = self.get_current_user()
if current_user:

# if we don't have tweeter info on any of them, fetch it
if not tweeter:
# fetch it
Expand All @@ -693,12 +656,19 @@ def get(self, username, compared_to):
screen_name=username,
access_token=current_user['access_token'])
tweeter = self.save_tweeter_user(result)
elif age(tweeter['modify_date']) > 3600:
tasks.refresh_user_info.delay(
username, current_user['access_token'])

if not compared_tweeter:
result = yield tornado.gen.Task(self.twitter_request,
"/users/show",
screen_name=compared_to,
access_token=current_user['access_token'])
compared_tweeter = self.save_tweeter_user(result)
elif age(compared_tweeter['modify_date']) > 3600:
tasks.refresh_user_info.delay(
compared_to, current_user['access_token'])

elif not tweeter or not compared_tweeter:
options = {
Expand All @@ -717,8 +687,8 @@ def get(self, username, compared_to):
value = self.redis.get(key)
if value is None:
following = (self.db.Following
.find_one({'user': tweeter['_id'],
'follows': compared_tweeter['_id']}))
.find_one({'user': tweeter['_id'],
'follows': compared_tweeter['_id']}))
if following:
options['follows'] = following['following']
else:
Expand All @@ -735,19 +705,11 @@ def get(self, username, compared_to):
(username, compared_to))

options['info'] = {
username: {
'followers_count': tweeter['followers'],
'friends_count': tweeter['following'],
},
compared_to: {
'followers_count': compared_tweeter['followers'],
'friends_count': compared_tweeter['following'],
}
username: tweeter,
compared_to: compared_tweeter
}
options['username'] = username
options['this_username'] = compared_to
self._set_ratio(options, 'username')
self._set_ratio(options, 'this_username')
options['compared_to'] = compared_to
options['perm_url'] = self.get_following_perm_url(
options['username'], options['this_username'])
Expand Down
35 changes: 35 additions & 0 deletions models.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
import datetime
from pymongo.objectid import ObjectId
from mongolite import Connection, Document



connection = Connection()

class BaseDocument(Document):
Expand Down Expand Up @@ -57,6 +60,38 @@ def find_by_username(db, username):
tweeter = db.Tweeter.find_one({'username': re.compile(re.escape(username), re.I)})
return tweeter

@staticmethod
def update_tweeter(tweeter, user):
if tweeter['name'] != user['name']:
tweeter['name'] = user['name']

if tweeter['username'] != user['screen_name']:
tweeter['username'] = user['screen_name']

if tweeter['followers'] != user['followers_count']:
tweeter['followers'] = user['followers_count']

if tweeter['following'] != user['friends_count']:
tweeter['following'] = user['friends_count']

def parse_status_date(dstr):
dstr = re.sub('\+\d{1,4}', '', dstr)
return datetime.datetime.strptime(
dstr,
'%a %b %d %H:%M:%S %Y'
)
last_tweet_date = None
if 'status' in user:
last_tweet_date = user['status']['created_at']
last_tweet_date = parse_status_date(last_tweet_date)
if tweeter['last_tweet_date'] != last_tweet_date:
tweeter['last_tweet_date'] = last_tweet_date

ratio_before = tweeter['ratio']
tweeter.set_ratio()
tweeter.save()



@connection.register
class Following(BaseDocument):
Expand Down
2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,5 @@ redis
tornado
mongolite
mock
tornado-utils
Celery
57 changes: 57 additions & 0 deletions tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import logging
import tornado.escape
import tornado.auth
import tornado.ioloop
from celery.task import task
from celery import conf
import settings
from models import Tweeter, connection



@task
def refresh_user_info(*args, **kwargs):
try:
_refresh_user_info(*args, **kwargs)
except:
logging.error("_refresh_user_info() failed", exc_info=True)
if conf.ALWAYS_EAGER:
raise

def _refresh_user_info(username, access_token):
#from time import sleep; sleep(5)
uu = UserUpdate()
def cb(r, *args, **kwargs):
try:
uu.callback(username, r)
finally:
if not conf.ALWAYS_EAGER:
tornado.ioloop.IOLoop.instance().stop()
uu.twitter_request("/users/show", cb, access_token=access_token,
screen_name=username)
if not conf.ALWAYS_EAGER:
tornado.ioloop.IOLoop.instance().start()


class UserUpdate(tornado.auth.TwitterMixin):
def __init__(self):
self.settings = dict(
twitter_consumer_key=settings.TWITTER_CONSUMER_KEY,
twitter_consumer_secret=settings.TWITTER_CONSUMER_SECRET,
)

@property
def db(self):
return connection[settings.DATABASE_NAME]

def require_setting(self, key, error):
assert key in self.settings, "%s (%s)" % (error, key)

def async_callback(self, func, callback):
return callback

def callback(self, username, response):
result = tornado.escape.json_decode(response.body)
tweeter = self.db.Tweeter.find_one({'user_id': result['id']})
assert tweeter['username'].lower() == username.lower()
Tweeter.update_tweeter(tweeter, result)
Loading

0 comments on commit 1d872f0

Please sign in to comment.