Skip to content

Commit

Permalink
Merge pull request #11 from djmitche/udp-network
Browse files Browse the repository at this point in the history
Use more realistic UDP-based networking
  • Loading branch information
MichaelDiBernardo committed Dec 2, 2013
2 parents ac9a4e4 + 7c3f540 commit 5669b44
Show file tree
Hide file tree
Showing 8 changed files with 326 additions and 113 deletions.
8 changes: 5 additions & 3 deletions cluster/README.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ Requirements: Python
This directory holds a simple implementation of a replicated state machine
that can also feed clients information on cluster membership.

* aa_statemachine.py -- illustrates the state machine client API
* bb_network.py -- introduces a network of communicating processors
* cc_replicated.py -- a replicated state machine with consensus and a fixed membership
* statemachine.py -- illustrates the state machine client API
* network.py -- handles network communication
* client.py -- a simple client
* member_single.py -- a non-clustered member server, talking to the client
* member_replicated.py -- a replicated state machine with consensus and a fixed membership
78 changes: 0 additions & 78 deletions cluster/bb_network.py

This file was deleted.

52 changes: 52 additions & 0 deletions cluster/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import sys
import logging
from network import Node


class Client(Node):

def __init__(self, member_address):
super(Client, self).__init__()
self.member_address = member_address

def invoke(self, input):
self.output = None
self.send([self.member_address], 'INVOKE',
input=input, caller=self.address)
self.run()
return self.output

def do_INVOKED(self, output):
self.logger.debug("received output %r" % (output,))
self.output = output
self.stop()

if __name__ == "__main__":
logging.basicConfig(
format="%(asctime)s %(name)s %(message)s", level=logging.DEBUG)
client = Client(sys.argv[1])
print client.invoke(4)
print client.invoke(1)

# tests

import unittest
import threading


class FakeMember(Node):

def do_INVOKE(self, caller, input):
self.send([caller], 'INVOKED', output=input * 10)
self.stop()


class ClientTests(unittest.TestCase):

def test_invoke(self):
member = FakeMember()
client = Client(member.address)
memberthd = threading.Thread(target=member.run)
memberthd.start()
self.assertEqual(client.invoke(5), 50)
memberthd.join()
126 changes: 95 additions & 31 deletions cluster/cc_replicated.py → cluster/member_replicated.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
from flufl.enum import Enum
import sys
import uuid
import random
import logging
from bb_network import Member, Client, sequence_generator
from member_single import Member, sequence_generator

# Fix in final copy:
# - include repeated classes
# - merge ClusterMember and Member
# - remove logging stuff


def log_acceptor_state(fn):
def wrap(self, **kwargs):
round = kwargs['round']
Expand All @@ -17,6 +21,7 @@ def wrap(self, **kwargs):
return rv
return wrap


def log_proposer_state(fn):
def wrap(self, **kwargs):
rv = fn(self, **kwargs)
Expand All @@ -26,6 +31,7 @@ def wrap(self, **kwargs):
return rv
return wrap


def log_learner_state(fn):
def wrap(self, **kwargs):
round = kwargs['round']
Expand All @@ -46,24 +52,40 @@ def __getitem__(self, i):

def __setitem__(self, i, v):
if i >= len(self):
self.extend([None]*(i - len(self) + 1))
self.extend([None] * (i - len(self) + 1))
list.__setitem__(self, i, v)


Modes = Enum('Modes', ('Initialized', 'Joining', 'Active'))

def only_in_mode(*modes):
modes = set(modes)
def wrap(fn):
def replacement(self, *args, **kwargs):
if self.mode not in modes:
return
return fn(self, *args, **kwargs)
return replacement
return wrap


class ClusterMember(Member):

cluster = []
namespace = uuid.UUID('7e0d7720-fa98-4270-94ff-650a2c25f3f0')
def __init__(self, execute_fn):
Member.__init__(self, execute_fn)
self.unique_id = uuid.uuid3(self.namespace, self.address).int

def __init__(self, execute_fn, address):
Member.__init__(self, execute_fn, address)
self.cluster.append(address)
# member
self.mode = Modes.Initialized
self.peers = [self.address]

# acceptor
self.last_accepts = defaultlist()
self.promises = defaultlist()

# proposer
self.biggest_n = (0,0)
self.biggest_n = (0, 0)
self.active_proposals = defaultlist()

# learner
Expand All @@ -72,26 +94,59 @@ def __init__(self, execute_fn, address):
self.waiting_clients = defaultlist()
self.values = defaultlist()

def start(self, initial_value=None, cluster_members=[]):
assert self.mode is Modes.Initialized
if initial_value is not None:
self.state = initial_value
self.mode = Modes.Active
else:
self.state = None
self.mode = Modes.Joining
self.send(cluster_members, 'ADD_MEMBER', new_member=self.address)
self.run()

@property
def quorum(self):
return len(self.cluster)/2+1
return len(self.peers) / 2 + 1

def next_n(self):
n = (self.biggest_n[0]+1, self.pid)
n = (self.biggest_n[0] + 1, self.unique_id)
self.biggest_n = n
return n[0]*65536+n[1]

return n[0] * 65536 + n[1]

@only_in_mode(Modes.Active)
def do_ADD_MEMBER(self, new_member):
self.peers.append(new_member)
self.send([new_member], 'JOIN_CLUSTER',
state=self.state,
last_invoked=self.last_invoked,
peers=self.peers)
self.send(self.peers, 'UPDATE_PEERS',
peers=self.peers)

@only_in_mode(Modes.Joining)
def do_JOIN_CLUSTER(self, state, last_invoked, peers):
self.mode = Modes.Active
self.state = state
self.last_invoked = last_invoked
self.peers = list(set(peers) | set(self.peers))

@only_in_mode(Modes.Active)
def do_UPDATE_PEERS(self, peers):
self.peers = list(set(peers) | set(self.peers))

@only_in_mode(Modes.Active)
@log_proposer_state
def do_INVOKE(self, input, caller):
round = len(self.values)
self.waiting_clients[round] = caller
self.values.append(None)
n = self.next_n()
quorum = random.sample(self.cluster, self.quorum)
n = self.next_n()
quorum = random.sample(self.peers, self.quorum)
self.logger.info("beginning round %d with value %r and quorum %r" %
(round, input, quorum))
(round, input, quorum))
self.active_proposals[round] = dict(
n=n, value=input, quorum=quorum, promises={}, have_accepted=False)
n=n, value=input, quorum=quorum, promises={}, have_accepted=False)
self.send(quorum, 'PREPARE', proposer=self.address, n=n, round=round)
# if this request fails, make_request will re-invoke us

Expand All @@ -103,8 +158,9 @@ def do_PREPARE(self, proposer, n, round):
last_accept = self.last_accepts[round]
prev_n = last_accept['n'] if last_accept else -1
prev_value = last_accept['value'] if last_accept else None
self.send([proposer], 'PROMISE', round=round, responder=self.address,
prev_n=prev_n, prev_value=prev_value)
self.send(
[proposer], 'PROMISE', round=round, responder=self.address,
prev_n=prev_n, prev_value=prev_value)

@log_proposer_state
def do_PROMISE(self, round, responder, prev_n, prev_value):
Expand All @@ -114,16 +170,18 @@ def do_PROMISE(self, round, responder, prev_n, prev_value):
promises = active_proposal['promises']
promises[responder] = dict(prev_n=prev_n, prev_value=prev_value)
if len(promises) >= self.quorum:
self.logger.info("received %d responses in round %d: %r" % (len(promises), round, promises))
self.logger.info("received %d responses in round %d: %r" %
(len(promises), round, promises))
# find the value of the largest-numbered proposal returned to us, defaulting
# to our own value if none is given
value = active_proposal['value']
largest_n = -1
for prom in promises.values():
if prom['prev_n'] > largest_n:
value, largest_n = prom['prev_value'], prom['prev_n']
quorum = random.sample(self.cluster, self.quorum)
self.send(quorum, 'PROPOSE', round=round, n=active_proposal['n'], value=value, proposer=self.address)
quorum = random.sample(self.peers, self.quorum)
self.send(quorum, 'PROPOSE', round=round,
n=active_proposal['n'], value=value, proposer=self.address)

@log_acceptor_state
def do_PROPOSE(self, round, n, value, proposer):
Expand All @@ -133,7 +191,7 @@ def do_PROPOSE(self, round, n, value, proposer):
last_accept = self.last_accepts[round]
if not last_accept or last_accept['n'] < n:
self.last_accepts[round] = {'n': n, 'value': value}
self.send(self.cluster, 'ACCEPT', round=round, n=n, value=value)
self.send(self.peers, 'ACCEPT', round=round, n=n, value=value)

@log_learner_state
def do_ACCEPT(self, round, n, value):
Expand All @@ -143,16 +201,19 @@ def do_ACCEPT(self, round, n, value):
count = accept_reqs_by_n[n] = accept_reqs_by_n.get(n, 0) + 1
if count < self.quorum:
return
self.logger.info("received %d accepts in round %d for proposal n=%d/value=%r; learning" % (count, round, n, value))
self.logger.info(
"received %d accepts in round %d for proposal n=%d/value=%r; learning" %
(count, round, n, value))
self.learn(round, value)

def learn(self, round, value):
if self.values[round] != value:
assert self.values[round] is None, "values[%d] is already %r" % (round, self.values[round])
assert self.values[round] is None, "values[%d] is already %r" % (
round, self.values[round])
self.values[round] = value
# catch up on the log as far as we can go, sending INVOKED messages
# where necessary
for i in xrange(self.last_invoked+1, len(self.values)):
for i in xrange(self.last_invoked + 1, len(self.values)):
if self.values[i] is None:
break
output = self.invoke(self.values[i])
Expand All @@ -162,10 +223,13 @@ def learn(self, round, value):


if __name__ == "__main__":
logging.basicConfig(format="%(asctime)s %(name)s proc=%(processName)s %(message)s", level=logging.DEBUG)
members = [ClusterMember(sequence_generator, address='memb%d' % i) for i in range(3)]
client = Client('client', members[0].address)
[member.start(initial_value=0) for member in members]
print client.invoke(4)
print client.invoke(1)
[member.join() for member in members]
logging.basicConfig(
format="%(asctime)s %(name)s %(message)s", level=logging.DEBUG)
member = ClusterMember(sequence_generator)
print member.address
cluster_members = sys.argv[1:]
if cluster_members:
member.start(cluster_members=cluster_members)
else:
print "starting new cluster"
member.start(initial_value=0)
Loading

0 comments on commit 5669b44

Please sign in to comment.