From 1d872f05e1961c5d2cd9ee93836a056d436780fe Mon Sep 17 00:00:00 2001 From: Peter Bengtsson Date: Wed, 2 Nov 2011 15:56:08 -0700 Subject: [PATCH] Implemented Celery MQ (using Redis) on user details lookups. --- README.md | 10 ++- bin/run_shell.py | 8 ++- celeryconfig.py | 15 +++++ handlers.py | 134 ++++++++++++++------------------------- models.py | 35 ++++++++++ requirements.txt | 2 + tasks.py | 57 +++++++++++++++++ templates/following.html | 16 ++--- tests/base.py | 7 ++ tests/test_handlers.py | 88 ++++++++++++++++++++++++- 10 files changed, 275 insertions(+), 97 deletions(-) create mode 100644 celeryconfig.py create mode 100644 tasks.py diff --git a/README.md b/README.md index 7628123..2ddab67 100644 --- a/README.md +++ b/README.md @@ -11,4 +11,12 @@ Running tests Run this: - python bin/_run_tests.py --logging=error + $ python bin/_run_tests.py --logging=error + + +Running celeryd +--------------- + +Run celeryd like this: + + $ celeryd --loglevel=INFO diff --git a/bin/run_shell.py b/bin/run_shell.py index 80e726f..280da57 100755 --- a/bin/run_shell.py +++ b/bin/run_shell.py @@ -1,7 +1,13 @@ #!/usr/bin/env python import code, re -import here +try: + import here +except ImportError: + import sys + import os.path as op + sys.path.insert(0, op.abspath(op.join(op.dirname(__file__), '..'))) + import here if __name__ == '__main__': diff --git a/celeryconfig.py b/celeryconfig.py new file mode 100644 index 0000000..54399c3 --- /dev/null +++ b/celeryconfig.py @@ -0,0 +1,15 @@ +import here +# http://docs.celeryproject.org/en/latest/tutorials/otherqueues.html#redis +BROKER_TRANSPORT = "redis" + +import settings +BROKER_HOST = settings.REDIS_HOST +BROKER_PORT = settings.REDIS_PORT +BROKER_VHOST = "0" # Maps to database number. + +CELERY_IGNORE_RESULT = True + +CELERY_IMPORTS = ("tasks", ) + +import os +CELERY_ALWAYS_EAGER = bool(os.environ.get('ALWAYS_EAGER', False)) diff --git a/handlers.py b/handlers.py index ba7aed0..23aee61 100644 --- a/handlers.py +++ b/handlers.py @@ -14,7 +14,7 @@ from tornado.escape import json_decode, json_encode from pymongo.objectid import InvalidId, ObjectId import utils - +import tasks from models import User, Tweeter @@ -61,50 +61,10 @@ def save_following(self, source_username, dest_username, result): def save_tweeter_user(self, user): user_id = user['id'] tweeter = self.db.Tweeter.find_one({'user_id': user_id}) - _save = False if not tweeter: tweeter = self.db.Tweeter() tweeter['user_id'] = user_id - _save = True - - if tweeter['name'] != user['name']: - tweeter['name'] = user['name'] - _save = True - - if tweeter['username'] != user['screen_name']: - tweeter['username'] = user['screen_name'] - _save = True - - if tweeter['followers'] != user['followers_count']: - tweeter['followers'] = user['followers_count'] - _save = True - - if tweeter['following'] != user['friends_count']: - tweeter['following'] = user['friends_count'] - _save = True - - def parse_status_date(dstr): - dstr = re.sub('\+\d{1,4}', '', dstr) - return datetime.datetime.strptime( - dstr, - '%a %b %d %H:%M:%S %Y' - ) - last_tweet_date = None - if 'status' in user: - last_tweet_date = user['status']['created_at'] - last_tweet_date = parse_status_date(last_tweet_date) - if tweeter['last_tweet_date'] != last_tweet_date: - tweeter['last_tweet_date'] = last_tweet_date - _save = True - - ratio_before = tweeter['ratio'] - ratio = tweeter.set_ratio() - if ratio != ratio_before: - _save = True - - if _save: - tweeter.save() - + Tweeter.update_tweeter(tweeter, user) return tweeter def assert_tweeter_user(self, user): @@ -520,53 +480,51 @@ def _fetch_info(self, options, username=None): if username is None: username = options['username'] - key = 'info:%s' % username - value = self.redis.get(key) + def age(d): + return (datetime.datetime.utcnow() - d).seconds - if value is None: - user = self.db.User.find_one({'username': options['this_username']}) - access_token = user['access_token'] + tweeter = self.db.Tweeter.find_one({'username': username}) + current_user = self.get_current_user() + if not tweeter: + access_token = current_user['access_token'] result = yield tornado.gen.Task(self.twitter_request, "/users/show", screen_name=username, access_token=access_token) - if result: - self.save_tweeter_user(result) - else: - result = json_decode(value) - self.assert_tweeter_user(result) - key = None - if result is None: + tweeter = self.save_tweeter_user(result) + elif age(tweeter['modify_date']) > 3600: + tasks.refresh_user_info.delay( + username, current_user['access_token']) + + if not tweeter: options['error'] = "Unable to look up info for %s" % username self._render(options) return - if isinstance(result, basestring): - result = json_decode(result) - if key: - self.redis.setex(key, json_encode(result), 60 * 60) + if 'info' not in options: - options['info'] = {options['username']: result} + options['info'] = {options['username']: tweeter} self._fetch_info(options, username=options['this_username']) else: - options['info'][options['this_username']] = result + options['info'][options['this_username']] = tweeter self._render(options) def _render(self, options): - if 'error' not in options: - if options['follows']: - page_title = '%s follows me' - else: - page_title = '%s is too cool for me' - self._set_ratio(options, 'username') - self._set_ratio(options, 'this_username') - options['page_title'] = page_title % options['username'] - options['perm_url'] = self.get_following_perm_url( - options['username'], options['this_username']) - self.render('following.html', **options) - else: + if 'error' in options: options['page_title'] = 'Error :(' self.render('following_error.html', **options) + return + + if options['follows']: + page_title = '%s follows me' + else: + page_title = '%s is too cool for me' + options['page_title'] = page_title % options['username'] + options['perm_url'] = self.get_following_perm_url( + options['username'], + options['this_username'] + ) + self.render('following.html', **options) def _set_ratio(self, options, key): value = options[key] @@ -680,11 +638,16 @@ class FollowingComparedtoHandler(FollowingHandler): @tornado.gen.engine def get(self, username, compared_to): options = {'compared_to': compared_to} - tweeter = self.db.Tweeter.find_by_username(self.db, username) - compared_tweeter = self.db.Tweeter.find_by_username(self.db, compared_to) + tweeter = self.db.Tweeter.find_one({'username': username}) + compared_tweeter = self.db.Tweeter.find_one({'username': compared_to}) + + def age(d): + return (datetime.datetime.utcnow() - d).seconds + current_user = self.get_current_user() if current_user: + # if we don't have tweeter info on any of them, fetch it if not tweeter: # fetch it @@ -693,12 +656,19 @@ def get(self, username, compared_to): screen_name=username, access_token=current_user['access_token']) tweeter = self.save_tweeter_user(result) + elif age(tweeter['modify_date']) > 3600: + tasks.refresh_user_info.delay( + username, current_user['access_token']) + if not compared_tweeter: result = yield tornado.gen.Task(self.twitter_request, "/users/show", screen_name=compared_to, access_token=current_user['access_token']) compared_tweeter = self.save_tweeter_user(result) + elif age(compared_tweeter['modify_date']) > 3600: + tasks.refresh_user_info.delay( + compared_to, current_user['access_token']) elif not tweeter or not compared_tweeter: options = { @@ -717,8 +687,8 @@ def get(self, username, compared_to): value = self.redis.get(key) if value is None: following = (self.db.Following - .find_one({'user': tweeter['_id'], - 'follows': compared_tweeter['_id']})) + .find_one({'user': tweeter['_id'], + 'follows': compared_tweeter['_id']})) if following: options['follows'] = following['following'] else: @@ -735,19 +705,11 @@ def get(self, username, compared_to): (username, compared_to)) options['info'] = { - username: { - 'followers_count': tweeter['followers'], - 'friends_count': tweeter['following'], - }, - compared_to: { - 'followers_count': compared_tweeter['followers'], - 'friends_count': compared_tweeter['following'], - } + username: tweeter, + compared_to: compared_tweeter } options['username'] = username options['this_username'] = compared_to - self._set_ratio(options, 'username') - self._set_ratio(options, 'this_username') options['compared_to'] = compared_to options['perm_url'] = self.get_following_perm_url( options['username'], options['this_username']) diff --git a/models.py b/models.py index 5574e2d..c3acf53 100644 --- a/models.py +++ b/models.py @@ -2,6 +2,9 @@ import datetime from pymongo.objectid import ObjectId from mongolite import Connection, Document + + + connection = Connection() class BaseDocument(Document): @@ -57,6 +60,38 @@ def find_by_username(db, username): tweeter = db.Tweeter.find_one({'username': re.compile(re.escape(username), re.I)}) return tweeter + @staticmethod + def update_tweeter(tweeter, user): + if tweeter['name'] != user['name']: + tweeter['name'] = user['name'] + + if tweeter['username'] != user['screen_name']: + tweeter['username'] = user['screen_name'] + + if tweeter['followers'] != user['followers_count']: + tweeter['followers'] = user['followers_count'] + + if tweeter['following'] != user['friends_count']: + tweeter['following'] = user['friends_count'] + + def parse_status_date(dstr): + dstr = re.sub('\+\d{1,4}', '', dstr) + return datetime.datetime.strptime( + dstr, + '%a %b %d %H:%M:%S %Y' + ) + last_tweet_date = None + if 'status' in user: + last_tweet_date = user['status']['created_at'] + last_tweet_date = parse_status_date(last_tweet_date) + if tweeter['last_tweet_date'] != last_tweet_date: + tweeter['last_tweet_date'] = last_tweet_date + + ratio_before = tweeter['ratio'] + tweeter.set_ratio() + tweeter.save() + + @connection.register class Following(BaseDocument): diff --git a/requirements.txt b/requirements.txt index 657331f..6e20bd5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,3 +2,5 @@ redis tornado mongolite mock +tornado-utils +Celery diff --git a/tasks.py b/tasks.py new file mode 100644 index 0000000..294826a --- /dev/null +++ b/tasks.py @@ -0,0 +1,57 @@ +import logging +import tornado.escape +import tornado.auth +import tornado.ioloop +from celery.task import task +from celery import conf +import settings +from models import Tweeter, connection + + + +@task +def refresh_user_info(*args, **kwargs): + try: + _refresh_user_info(*args, **kwargs) + except: + logging.error("_refresh_user_info() failed", exc_info=True) + if conf.ALWAYS_EAGER: + raise + +def _refresh_user_info(username, access_token): + #from time import sleep; sleep(5) + uu = UserUpdate() + def cb(r, *args, **kwargs): + try: + uu.callback(username, r) + finally: + if not conf.ALWAYS_EAGER: + tornado.ioloop.IOLoop.instance().stop() + uu.twitter_request("/users/show", cb, access_token=access_token, + screen_name=username) + if not conf.ALWAYS_EAGER: + tornado.ioloop.IOLoop.instance().start() + + +class UserUpdate(tornado.auth.TwitterMixin): + def __init__(self): + self.settings = dict( + twitter_consumer_key=settings.TWITTER_CONSUMER_KEY, + twitter_consumer_secret=settings.TWITTER_CONSUMER_SECRET, + ) + + @property + def db(self): + return connection[settings.DATABASE_NAME] + + def require_setting(self, key, error): + assert key in self.settings, "%s (%s)" % (error, key) + + def async_callback(self, func, callback): + return callback + + def callback(self, username, response): + result = tornado.escape.json_decode(response.body) + tweeter = self.db.Tweeter.find_one({'user_id': result['id']}) + assert tweeter['username'].lower() == username.lower() + Tweeter.update_tweeter(tweeter, result) diff --git a/templates/following.html b/templates/following.html index c2bb1d4..97ec431 100644 --- a/templates/following.html +++ b/templates/following.html @@ -71,11 +71,11 @@ data.addColumn('number', 'following'); data.addRows(2); data.setValue(0, 0, USERNAME); - data.setValue(0, 1, {{ info[username]['followers_count'] }}); - data.setValue(0, 2, {{ info[username]['friends_count'] }}); + data.setValue(0, 1, {{ info[username]['followers'] }}); + data.setValue(0, 2, {{ info[username]['following'] }}); data.setValue(1, 0, COMPARED_TO ? COMPARED_TO : 'you'); - data.setValue(1, 1, {{ info[this_username]['followers_count'] }}); - data.setValue(1, 2, {{ info[this_username]['friends_count'] }}); + data.setValue(1, 1, {{ info[this_username]['followers'] }}); + data.setValue(1, 2, {{ info[this_username]['following'] }}); var chart = new google.visualization.ColumnChart(document.getElementById('chart_div')); chart.draw(data, {width: 600, height: 400, title: 'Coolness in terms of following and followers', @@ -124,13 +124,13 @@ followers: - {{ info[username]['followers_count'] }} - {{ info[this_username]['followers_count'] }} + {{ info[username]['followers'] }} + {{ info[this_username]['followers'] }} following: - {{ info[username]['friends_count'] }} - {{ info[this_username]['friends_count'] }} + {{ info[username]['following'] }} + {{ info[this_username]['following'] }} ratio: diff --git a/tests/base.py b/tests/base.py index 9518297..3c497ab 100644 --- a/tests/base.py +++ b/tests/base.py @@ -8,11 +8,16 @@ import hashlib import unittest + from tornado.testing import LogTrapTestCase, AsyncHTTPTestCase +os.environ['ALWAYS_EAGER'] = 'true' +import celery +import settings import app from tornado_utils.http_test_client import TestClient, HTTPClientMixin + class DatabaseTestCaseMixin(object): _once = False @@ -51,6 +56,8 @@ def setUp(self): 'tornado_utils.send_mail.backends.locmem.EmailBackend' self._app.settings['email_exceptions'] = False self.client = TestClient(self) + celery.conf.ALWAYS_EAGER = True + settings.DATABASE_NAME = 'test' def tearDown(self): super(BaseHTTPTestCase, self).tearDown() diff --git a/tests/test_handlers.py b/tests/test_handlers.py index 5578ad2..c076bf8 100644 --- a/tests/test_handlers.py +++ b/tests/test_handlers.py @@ -2,6 +2,7 @@ import os import json from urllib import urlencode +import tornado.escape from .base import BaseHTTPTestCase from handlers import (TwitterAuthHandler, FollowsHandler, FollowingHandler, EveryoneIFollowJSONHandler) @@ -809,7 +810,6 @@ def test_suggest_tweet(self): struct = json.loads(response.body) self.assertTrue(len(struct['text']) <= 140) - #self.assertTrue(struct['text'].endswith('#toocool')) self.assertTrue('@Mr_Billy_Nomates' in struct['text']) peterbe = self.db.Tweeter.find_one({'username': 'peterbe'}) @@ -841,6 +841,92 @@ def test_default_page_not_found(self): self.assertEqual(response.code, 404) self.assertTrue('restart your computer' in response.body) + def test_following_compared_refresh(self): + url = self.reverse_url('following_compared', 'obama', 'kimk') + self._login() + + FollowingHandler.twitter_request = \ + make_mock_twitter_request({ + "/friendships/show": {u'relationship': { + u'target': {u'followed_by': False, + u'following': False, + u'screen_name': u'obama'}}}, + "/users/show?screen_name=obama": {u'followers_count': 41700, + u'following': False, + u'friends_count': 1300, + u'name': u'Barak Obama', + u'screen_name': u'obama', + 'id': 9876543210, + }, + "/users/show?screen_name=kimk": { + u'followers_count': 40117, + u'following': False, + u'friends_count': 200, + u'name': u'Kim Kardashian', + u'screen_name': u'kimk', + 'id': 123456789, + } + }) + + response = self.client.get(url) + self.assertEqual(response.code, 200) + + obama = self.db.Tweeter.find_one({'username': 'obama'}) + self.assertEqual(obama['ratio'], 41700.0 / 1300) + self.assertTrue('%.1f' % (41700.0 / 1300) in response.body) + + kimk = self.db.Tweeter.find_one({'username': 'kimk'}) + self.assertEqual(kimk['ratio'], 40117.0 / 200) + self.assertTrue('%.1f' % (40117.0 / 200) in response.body) + + # change the stats + import tasks + def mock_twitter_request(self, url, callback, access_token, screen_name): + results = { + 'obama': { + u'followers_count': 40700, + u'following': False, + u'friends_count': 1333, + u'name': u'Barak Obama', + u'screen_name': u'obama', + 'id': 9876543210, + }, + 'kimk': { + u'followers_count': 41117, + u'following': False, + u'friends_count': 222, + u'name': u'Kim Kardashian', + u'screen_name': u'kimk', + 'id': 123456789, + } + } + class R(object): + def __init__(self, result): + self.body = tornado.escape.json_encode(result) + callback(R(results[screen_name])) + + tasks.UserUpdate.twitter_request = mock_twitter_request + + # now, pretend time passes + obama['modify_date'] -= datetime.timedelta(seconds=60 * 60 + 1) + obama.save(update_modify_date=False) + kimk['modify_date'] -= datetime.timedelta(seconds=60 * 60 + 1) + kimk.save(update_modify_date=False) + + # second time it's going to use the saved data + response = self.client.get(url) + self.assertEqual(response.code, 200) + # the old numbers will still be there + self.assertTrue('%.1f' % (41700.0 / 1300) in response.body) + self.assertTrue('%.1f' % (40117.0 / 200) in response.body) + + # but the actual numbers will be updated! + obama = self.db.Tweeter.find_one({'username': 'obama'}) + self.assertEqual(obama['ratio'], 40700.0 / 1333) # new + + kimk = self.db.Tweeter.find_one({'username': 'kimk'}) + self.assertEqual(kimk['ratio'], 41117.0 / 222) # new + def make_twitter_get_authenticated_user_callback(struct): def twitter_get_authenticated_user(self, callback, **kw): callback(struct)