Skip to content
This repository has been archived by the owner on Mar 16, 2020. It is now read-only.

Commit

Permalink
socketapp that uses Redis PubSub to send messages with tornadio2 to a…
Browse files Browse the repository at this point in the history
… socket.
  • Loading branch information
peterbe committed Dec 16, 2011
1 parent c333670 commit 7c2fb4d
Show file tree
Hide file tree
Showing 9 changed files with 3,923 additions and 21 deletions.
3 changes: 3 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,6 @@
[submodule "vendor/src/tornado"]
path = vendor/src/tornado
url = git://github.com/facebook/tornado.git
[submodule "vendor/src/tornadio2"]
path = vendor/src/tornadio2
url = git://github.com/MrJoes/tornadio2.git
13 changes: 11 additions & 2 deletions handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,15 +185,21 @@ def increment_lookup_count(self, username, usernames, jsonp=False):
key = 'lookups:json'
if not isinstance(usernames, int):
usernames = len(usernames)
self.redis.incr(key)

self.redis.publish('lookups', tornado.escape.json_encode({
key: int(self.redis.get(key))
}))
key = 'lookups:username:%s' % username
assert username
self.redis.incr(key)

key = 'lookups:usernames'
self.redis.incr(key, usernames)

self.redis.publish('lookups', tornado.escape.json_encode({
key: int(self.redis.get(key))

}))

@tornado.web.asynchronous
@tornado.gen.engine
def get(self):
Expand Down Expand Up @@ -359,6 +365,9 @@ def increment_authentication_count(self, username):
self.redis.incr(key)
key = 'auths:total'
self.redis.incr(key)
self.redis.publish('lookups', tornado.escape.json_encode({
key: int(self.redis.get(key))
}))

@tornado.web.asynchronous
def get(self):
Expand Down
104 changes: 104 additions & 0 deletions socketapp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
#!/usr/bin/env python
import os
import re
import here
import logging
from pprint import pprint
import tornado.escape
import redis.client
import tornado.options
from tornado.options import define, options
import settings

from tornadio2 import SocketConnection, TornadioRouter, SocketServer, event

define("debug", default=False, help="run in debug mode", type=bool)
define("port", default=8888, help="run on the given port (default 8888)", type=int)


class LookupsConnection(SocketConnection):
connected_clients = set()
def on_open(self, request):
#print "OPEN"
for each in self.connected_clients:
each.send({'message': "Someone connected!"})
self.connected_clients.add(self)

def on_message(self, message):
logging.debug("RECEIVED: %r" % message)
for client in self.connected_clients:
if client != self:
#print "CLIENT", repr(client)
#print "\t", client.is_closed
if client.is_closed:
print "DBG consider deleting", repr(client)
else:
client.send({'message': message.upper()})

def on_close(self):
logging.debug("Closing client")
if self in self.connected_clients:
logging.debug("Removing %r" % self)
self.connected_clients.remove(self)


def redis_listener():
r = redis.Redis(settings.REDIS_HOST, settings.REDIS_PORT)
ps = r.pubsub()
ps.subscribe(['lookups'])
for message in ps.listen():
try:
data = tornado.escape.json_decode(message['data'])
except ValueError:
data = message['data']

#print "****MESSAGE"
#pprint(data)
#print "\t send this to", len(LookupsConnection.connected_clients), 'clients'
to_send = {}
for key, value in data.items():
new_key = {
'lookups:json': 'lookups_json',
'lookups:jsonp': 'lookups_jsonp',
'auths:total': 'auths',
'lookups:usernames': 'lookups_usernames'
}.get(key)
if new_key is None:
print "Skipping", repr(key)
continue
#print new_key, repr(value)
to_send[new_key] = value

for client in LookupsConnection.connected_clients:
client.send(to_send)



def main():
import threading
t = threading.Thread(target=redis_listener)
t.setDaemon(True)
t.start()

LookupsServer = TornadioRouter(LookupsConnection)
# Fill your routes here
routes = []
# Extend list of routes with Tornadio2 URLs
routes.extend(LookupsServer.urls)

tornado.options.parse_command_line()
if options.debug:
logging.getLogger().setLevel(logging.DEBUG)
else:
logging.getLogger().setLevel(logging.INFO)

application = tornado.web.Application(routes,
socket_io_port=options.port)
try:
SocketServer(application)
except KeyboardInterrupt:
pass


if __name__ == "__main__":
main()
68 changes: 49 additions & 19 deletions static/js/lookups.js
Original file line number Diff line number Diff line change
Expand Up @@ -66,53 +66,68 @@ function _set_up_charts(numbers) {
height:300,
lineWidth:3
};
if (chart_jsons === null) {
if (chart_jsons === null && numbers.lookups_json) {
var p = new cloneObject(options);
p.title = 'Twitter requests by JSON';
chart_jsons = new Chart('chart-jsons', p);
}
if (chart_jsonps === null) {
if (chart_jsonps === null && numbers.lookups_jsonp) {
var p = new cloneObject(options);
p.title = 'Twitter requests by JSONP';
chart_jsonps = new Chart('chart-jsonps', p);
}
if (chart_usernames === null) {
if (chart_usernames === null && numbers.lookups_usernames) {
var p = new cloneObject(options);
p.title = 'Total number of usernames looked up';
p.series = [{color: 'green'}];
chart_usernames = new Chart('chart-usernames', p);
}
if (chart_auths === null) {
if (chart_auths === null && numbers.auths) {
var p = new cloneObject(options);
p.title = 'Authentications';
p.series = [{color: 'red'}];
chart_auths = new Chart('chart-auths', p);
}

chart_jsons.add_value(numbers.lookups_json);
chart_jsonps.add_value(numbers.lookups_jsonp);
chart_usernames.add_value(numbers.lookups_usernames);
chart_auths.add_value(numbers.auths);
if (numbers.lookups_json)
chart_jsons.add_value(numbers.lookups_json);
if (numbers.lookups_jsonp)
chart_jsonps.add_value(numbers.lookups_jsonp);
if (numbers.lookups_usernames)
chart_usernames.add_value(numbers.lookups_usernames);
if (numbers.auths)
chart_auths.add_value(numbers.auths);

}

function update() {
function incr_number(key, num) {
var before = $(key).text();
if (before !== '' + tsep(num)) {
// there's a change!
$(key).fadeTo(200, 0.1, function() {
$(this).text(tsep(num)).fadeTo(300, 1.0);
});
}
function incr_number(key, num) {
var before = $(key).text();
if (before !== '' + tsep(num)) {
// there's a change!
$(key).fadeTo(200, 0.1, function() {
$(this).text(tsep(num)).fadeTo(300, 1.0);
});
}
}

$.getJSON(JSON_URL, function(response) {
function process_response(response) {
if (response.lookups_json && response.lookups_jsonp)
incr_number('#lookups-total', response.lookups_json + response.lookups_jsonp);
if (response.lookups_json)
incr_number('#lookups-json', response.lookups_json);
if (response.lookups_jsonp)
incr_number('#lookups-jsonp', response.lookups_jsonp);
if (response.lookups_usernames)
incr_number('#lookups-usernames', response.lookups_usernames);
if (response.auths)
incr_number('#auths', response.auths);
_set_up_charts(response);
}

/*
function update() {
$.getJSON(JSON_URL, function(response) {
process_response(response);
var change = !compareAssociativeArrays(response, previous);
previous = response;
Expand All @@ -128,7 +143,22 @@ function update() {
setTimeout(update, Math.ceil(t * 1000));
});
}
*/

window.WEB_SOCKET_DEBUG = true;
function setupSocket() {
var socket = new io.connect('http://' + window.location.host, {
port: 8888
});

socket.on('connect', function() {
socket.on('message', function(msg) {
process_response(msg);
});

});
}

$(function() {
setTimeout(update, 5 * 1000);
setupSocket();
});
Loading

0 comments on commit 7c2fb4d

Please sign in to comment.