Skip to content

Commit

Permalink
add celery and delivery tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
aaron morton authored and aaron morton committed Jul 27, 2012
1 parent b264ee0 commit 044b294
Show file tree
Hide file tree
Showing 10 changed files with 297 additions and 50 deletions.
9 changes: 9 additions & 0 deletions cassandra-schema.txt
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,12 @@ AND
comparator = UTF8Type
;


CREATE COLUMN FAMILY
TweetDelivery
WITH
key_validation_class = IntegerType
AND
comparator = UTF8Type
;

15 changes: 15 additions & 0 deletions celeryconfig.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Use redis as the message broker.
BROKER_URL = "redis://localhost:6379/0"

# This is the task status stored.
# Want to change to cassandra later

CELERY_RESULT_BACKEND = "redis"
CELERY_REDIS_HOST = "localhost"
CELERY_REDIS_PORT = 6379
CELERY_REDIS_DB = 0

CELERYD_LOG_FORMAT="%(asctime)s - %(processName)s - %(name)s - %(lineno)d - %(levelname)s - %(message)s"

# This tells the celery worker server to look in tasks.py for tasks
CELERY_IMPORTS = ("wdcnz.tasks", )
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
tornado>=2.2.1
pycassa>=1.6.0
Mako>=0.7.0
Mako>=0.7.0
celery-with-redis>=2.4.1
1 change: 1 addition & 0 deletions wdcnz/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ def __init__(self):
(r"/users/([^/]+)/followers/?", controllers.UserFollowers),

(r"/login", controllers.Login),
(r"/logout", controllers.Logout),
(r"/signup", controllers.Signup),

]
Expand Down
157 changes: 144 additions & 13 deletions wdcnz/controllers.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import tornado.web
from tornado import escape

from wdcnz import tasks

# ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
#

Expand Down Expand Up @@ -51,6 +53,10 @@ def _user_cookie(self, user):


def render(self, template_name, **kwargs):

if "user" not in kwargs:
kwargs["user"] = self.current_user

template = self.template_lookup.get_template(template_name)
self.write(template.render(**kwargs))
return
Expand Down Expand Up @@ -100,33 +106,39 @@ def post(self):

timestamp, tweet_id = self.next_tweet_id()
tweet_body = self.get_argument("tweet_body")
this_user = self.current_user["user_name"]

user_tweets_cf = self.column_family("UserTweets")
user_timeline_cf = self.column_family("UserTimeline")

with pycassa.batch.Mutator(self.application.cass_pool) as batch:
batch.write_consistency_level = cass_types.ConsistencyLevel.QUORUM

# Store the tweet in the users list.
row_key = self.current_user["user_name"]
# Store the tweet against the user
row_key = this_user
columns = {
(tweet_id, "tweet_id") : str(tweet_id),
(tweet_id, "body"): tweet_body,
(tweet_id, "user_name") : self.current_user["user_name"],
(tweet_id, "timestamp") : timestamp
}
batch.insert(self.column_family("UserTweets"), row_key, columns)
batch.insert(user_tweets_cf, row_key, columns)

# Put the tweet into the users timeline
row_key = self.current_user["user_name"]
tweet_data = {
tweet_json = escape.json_encode({
"tweet_id" : tweet_id,
"body" : tweet_body,
"user_name" : self.current_user["user_name"],
"user_name" : this_user,
"timestamp" : timestamp
}
})
columns = {
tweet_id : escape.json_encode(tweet_data)
tweet_id : tweet_json
}
batch.insert(self.column_family("UserTimeline"), row_key, columns)

batch.insert(user_timeline_cf, row_key, columns)

#
tasks.deliver_tweet.delay(this_user, tweet_id, tweet_json)
self.redirect("/")
return

Expand Down Expand Up @@ -161,7 +173,8 @@ def get(self, user_name):
tweets.append(this_tweet)
this_tweet = {}
this_tweet[tweet_property] = col_value
tweets.append(this_tweet)
if this_tweet:
tweets.append(this_tweet)

# Step 3 - get the followers
# OrderedFollowers
Expand Down Expand Up @@ -192,8 +205,58 @@ def get(self, user_name):
followers = users_cols.values()
else:
followers = []
self.render("pages/user.mako", tweets=tweets, user=user,
followers=followers)

# Step 3 - get who the user is following
# OrderedFollowing
# row_key is the user_name
# column_name is the (timestamp, user_name)

cf = self.column_family("OrderedFollowing")
row_key = user_name

try:
following_cols = cf.get(row_key, column_count=20)
except (pycassa.NotFoundException):
following_cols = {}

# We have the columns
# {(timestamp, user_name) : None}
following_names = [
key[1]
for key in following_cols.keys()
]

# Pivot to get the user data
if following_names:
cf = self.column_family("User")
users_cols = cf.multiget(following_names)

#have {row_key : {col_name : col_value}}
following = users_cols.values()
else:
following = []

# Step 4 - check if the current user is following this one.
# AllFollowers CF
# row key is user_name
# column_name is follower user_name

cf = self.column_family("AllFollowers")
row_key = user_name
columns = [
self.current_user["user_name"]
]

try:
is_following = True if cf.get(row_key, columns) else False
except (pycassa.NotFoundException):
is_following = False

is_current_user = user_name == self.current_user["user_name"]

self.render("pages/user.mako", tweets=tweets,
followers=followers, following=following,
is_following=is_following, is_current_user=is_current_user)
return

class UserFollowers(ControllerBase):
Expand All @@ -219,7 +282,8 @@ def post(self, user_to_follow):
else:
# no double dipping.
self.redirect("/users/%(user_to_follow)s" % vars())

return

# Step 2 - lets get following
with pycassa.batch.Mutator(self.application.cass_pool) as batch:
batch.write_consistency_level = cass_types.ConsistencyLevel.QUORUM
Expand Down Expand Up @@ -254,6 +318,64 @@ def post(self, user_to_follow):

self.redirect("/users/%(user_to_follow)s" % vars())
return

class UserNotFollowers(ControllerBase):

@tornado.web.authenticated
def post(self, user_to_not_follow):
"""Updates the logged in user to not follow ``user_to_not_follow``."""

raise RuntimeError("Not implemented")
# this_user = self.current_user["user_name"]
#
# # Step 1 - check if we follow this user
# cf = self.column_family("AllFollowers")
# row_key = user_to_not_follow
# columns = [
# this_user
# ]
#
# try:
# existing = all_followers_cf.get(row_key, columns=columns)
# except (pycassa.NotFoundException):
# # not following the user.
# self.redirect("/users/%(user_to_not_follow)s" % vars())
# return
#
# # Step 2 - stop following the user
# with pycassa.batch.Mutator(self.application.cass_pool) as batch:
# batch.write_consistency_level = cass_types.ConsistencyLevel.QUORUM
#
# now = int(time.time() * 10**6)
#
# # TODO: just use columns.
# # OrderedFollowers CF stores who is following a user ordered by
# # when they started.
# row_key = user_to_follow
# columns = {
# (now, this_user) : ""
# }
# batch.insert(self.column_family("OrderedFollowers"), row_key,
# columns)
#
# # OrderedFollowing CF stores who a user is following ordered by
# # when they started
# row_key = this_user
# columns = {
# (now, user_to_follow) : ""
# }
# batch.insert(self.column_family("OrderedFollowing"), row_key,
# columns)
#
# # AllFollowers CF stores who is following a user without order
# row_key = user_to_follow
# columns = {
# this_user : ""
# }
# batch.insert(self.column_family("AllFollowers"), row_key, columns)
#
# self.redirect("/users/%(user_to_follow)s" % vars())
# return

class Login(ControllerBase):

Expand Down Expand Up @@ -316,3 +438,12 @@ def post(self):

self.redirect("/")
return

class Logout(ControllerBase):

def get(self):

self.clear_cookie("user")
self.redirect("/")
return

1 change: 0 additions & 1 deletion wdcnz/library.py

This file was deleted.

48 changes: 48 additions & 0 deletions wdcnz/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
"""Just put all the smarts in here"""

import logging

import celery

import pycassa
from pycassa.cassandra import ttypes as cass_types


@celery.task()
def deliver_tweet(from_user, tweet_id, tweet_json):
"""Delivers the tweet with ``tweet_id`` to ``from_user``'s followers.
"""

pool = get_cass_pool()
followers_cf = column_family(pool, "OrderedFollowers")
user_timeline_cf = column_family(pool, "UserTimeline")

with pycassa.batch.Mutator(pool, queue_size=50) as batch:
batch.write_consistency_level = cass_types.ConsistencyLevel.QUORUM

columns = {
tweet_id : tweet_json
}

followers_row_key = from_user
for col_name, _ in followers_cf.get(followers_row_key).iteritems():
# col name is (timestamp, user_name)
_, follower_user_name = col_name

# Insert tweet into UserTimeline CF
row_key = follower_user_name
batch.insert(user_timeline_cf, row_key, columns)

return


# ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
# Helpers

def get_cass_pool():
return pycassa.ConnectionPool("wdcnz", ["localhost:9160"])

def column_family(cass_pool, name):
return pycassa.ColumnFamily(cass_pool, name,
read_consistency_level=cass_types.ConsistencyLevel.QUORUM,
write_consistency_level=cass_types.ConsistencyLevel.QUORUM)
34 changes: 20 additions & 14 deletions wdcnz/templates/base.mako
Original file line number Diff line number Diff line change
Expand Up @@ -37,20 +37,23 @@
<div class="navbar navbar-fixed-top">
<div class="navbar-inner">
<div class="container">
<a class="btn btn-navbar" data-toggle="collapse" data-target=".nav-collapse">
<span class="icon-bar"></span>
<span class="icon-bar"></span>
<span class="icon-bar"></span>
</a>
<a class="brand" href="#">WDCNZ</a>
<div class="nav-collapse">
<ul class="nav">
<li class="active"><a href="#">Home</a></li>
<li><a href="#about">About</a></li>
<li><a href="#contact">Contact</a></li>
</ul>
</div><!--/.nav-collapse -->

<a class="brand" href="/">WDCNZ</a>

<ul class="nav">
<li class="active"><a href="/">Home</a></li>
</ul>

<ul class="nav pull-right">
<li class="dropdown">
<a href="#" class="dropdown-toggle" data-toggle="dropdown">${user["user_name"]} <b class="caret"></b></a>
<ul class="dropdown-menu">
<li><a href="/logout">Logout</a></li>
</ul>
</li>

<li class="divider-vertical"></li>

<a class="btn" data-toggle="modal" href="#modal_new_tweet"><i class="icon-pencil"></i></a>
</ul>
</div>
Expand Down Expand Up @@ -80,7 +83,10 @@
<!-- Placed at the end of the document so the pages load faster -->
<script src="/static/js/jquery.js"></script>
<script src="/static/js/bootstrap.min.js"></script>


<%block name="script_inline">
</%block>

<div class="modal hide" id="modal_new_tweet">
<div class="modal-header">
<button type="button" class="close" data-dismiss="modal">x</button>
Expand Down
Loading

0 comments on commit 044b294

Please sign in to comment.