Skip to content

Commit

Permalink
Merge branch 'stapelberg-strings'
Browse files Browse the repository at this point in the history
  • Loading branch information
armon committed May 8, 2015
2 parents 6c2c8a2 + 8224bee commit a8065f2
Show file tree
Hide file tree
Showing 15 changed files with 120 additions and 170 deletions.
5 changes: 2 additions & 3 deletions future.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package raft

import (
"net"
"sync"
"time"
)
Expand Down Expand Up @@ -81,7 +80,7 @@ func (l *logFuture) Response() interface{} {

type peerFuture struct {
deferError
peers []net.Addr
peers []string
}

type shutdownFuture struct {
Expand All @@ -108,7 +107,7 @@ type reqSnapshotFuture struct {
// snapshot details provided by the FSM runner before responding
index uint64
term uint64
peers []net.Addr
peers []string
snapshot FSMSnapshot
}

Expand Down
60 changes: 22 additions & 38 deletions inmem_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,14 @@ package raft
import (
"fmt"
"io"
"net"
"sync"
"time"
)

// InmemAddr implements the net.Addr interface.
type InmemAddr struct {
ID string
}

// NewInmemAddr returns a new in-memory addr with
// a randomly generate UUID as the ID
func NewInmemAddr() *InmemAddr {
return &InmemAddr{generateUUID()}
}

// Network implements the net.Addr interface.
func (ia *InmemAddr) Network() string {
return "inmem"
}

// String implements the net.Addr interface.
func (ia *InmemAddr) String() string {
return ia.ID
func NewInmemAddr() string {
return generateUUID()
}

// inmemPipeline is used to pipeline requests for the in-mem transport
Expand All @@ -53,15 +37,15 @@ type inmemPipelineInflight struct {
type InmemTransport struct {
sync.RWMutex
consumerCh chan RPC
localAddr *InmemAddr
localAddr string
peers map[string]*InmemTransport
pipelines []*inmemPipeline
timeout time.Duration
}

// NewInmemTransport is used to initialize a new transport
// and generates a random local address.
func NewInmemTransport() (*InmemAddr, *InmemTransport) {
func NewInmemTransport() (string, *InmemTransport) {
addr := NewInmemAddr()
trans := &InmemTransport{
consumerCh: make(chan RPC, 16),
Expand All @@ -83,28 +67,28 @@ func (i *InmemTransport) Consumer() <-chan RPC {
}

// LocalAddr implements the Transport interface.
func (i *InmemTransport) LocalAddr() net.Addr {
func (i *InmemTransport) LocalAddr() string {
return i.localAddr
}

// AppendEntriesPipeline returns an interface that can be used to pipeline
// AppendEntries requests.
func (i *InmemTransport) AppendEntriesPipeline(target net.Addr) (AppendPipeline, error) {
func (i *InmemTransport) AppendEntriesPipeline(target string) (AppendPipeline, error) {
i.RLock()
peer, ok := i.peers[target.String()]
peer, ok := i.peers[target]
i.RUnlock()
if !ok {
return nil, fmt.Errorf("failed to connect to peer: %v", target)
}
pipeline := newInmemPipeline(i, peer, target.String())
pipeline := newInmemPipeline(i, peer, target)
i.Lock()
i.pipelines = append(i.pipelines, pipeline)
i.Unlock()
return pipeline, nil
}

// AppendEntries implements the Transport interface.
func (i *InmemTransport) AppendEntries(target net.Addr, args *AppendEntriesRequest, resp *AppendEntriesResponse) error {
func (i *InmemTransport) AppendEntries(target string, args *AppendEntriesRequest, resp *AppendEntriesResponse) error {
rpcResp, err := i.makeRPC(target, args, nil, i.timeout)
if err != nil {
return err
Expand All @@ -117,7 +101,7 @@ func (i *InmemTransport) AppendEntries(target net.Addr, args *AppendEntriesReque
}

// RequestVote implements the Transport interface.
func (i *InmemTransport) RequestVote(target net.Addr, args *RequestVoteRequest, resp *RequestVoteResponse) error {
func (i *InmemTransport) RequestVote(target string, args *RequestVoteRequest, resp *RequestVoteResponse) error {
rpcResp, err := i.makeRPC(target, args, nil, i.timeout)
if err != nil {
return err
Expand All @@ -130,7 +114,7 @@ func (i *InmemTransport) RequestVote(target net.Addr, args *RequestVoteRequest,
}

// InstallSnapshot implements the Transport interface.
func (i *InmemTransport) InstallSnapshot(target net.Addr, args *InstallSnapshotRequest, resp *InstallSnapshotResponse, data io.Reader) error {
func (i *InmemTransport) InstallSnapshot(target string, args *InstallSnapshotRequest, resp *InstallSnapshotResponse, data io.Reader) error {
rpcResp, err := i.makeRPC(target, args, data, 10*i.timeout)
if err != nil {
return err
Expand All @@ -142,9 +126,9 @@ func (i *InmemTransport) InstallSnapshot(target net.Addr, args *InstallSnapshotR
return nil
}

func (i *InmemTransport) makeRPC(target net.Addr, args interface{}, r io.Reader, timeout time.Duration) (rpcResp RPCResponse, err error) {
func (i *InmemTransport) makeRPC(target string, args interface{}, r io.Reader, timeout time.Duration) (rpcResp RPCResponse, err error) {
i.RLock()
peer, ok := i.peers[target.String()]
peer, ok := i.peers[target]
i.RUnlock()

if !ok {
Expand Down Expand Up @@ -174,34 +158,34 @@ func (i *InmemTransport) makeRPC(target net.Addr, args interface{}, r io.Reader,

// EncodePeer implements the Transport interface. It uses the UUID as the
// address directly.
func (i *InmemTransport) EncodePeer(p net.Addr) []byte {
return []byte(p.String())
func (i *InmemTransport) EncodePeer(p string) []byte {
return []byte(p)
}

// DecodePeer implements the Transport interface. It wraps the UUID in an
// InmemAddr.
func (i *InmemTransport) DecodePeer(buf []byte) net.Addr {
return &InmemAddr{string(buf)}
func (i *InmemTransport) DecodePeer(buf []byte) string {
return string(buf)
}

// Connect is used to connect this transport to another transport for
// a given peer name. This allows for local routing.
func (i *InmemTransport) Connect(peer net.Addr, trans *InmemTransport) {
func (i *InmemTransport) Connect(peer string, trans *InmemTransport) {
i.Lock()
defer i.Unlock()
i.peers[peer.String()] = trans
i.peers[peer] = trans
}

// Disconnect is used to remove the ability to route to a given peer.
func (i *InmemTransport) Disconnect(peer net.Addr) {
func (i *InmemTransport) Disconnect(peer string) {
i.Lock()
defer i.Unlock()
delete(i.peers, peer.String())
delete(i.peers, peer)

// Disconnect any pipelines
n := len(i.pipelines)
for idx := 0; idx < n; idx++ {
if i.pipelines[idx].peerAddr == peer.String() {
if i.pipelines[idx].peerAddr == peer {
i.pipelines[idx].Close()
i.pipelines[idx], i.pipelines[n-1] = i.pipelines[n-1], nil
idx--
Expand Down
18 changes: 0 additions & 18 deletions inmem_transport_test.go
Original file line number Diff line number Diff line change
@@ -1,27 +1,9 @@
package raft

import (
"net"
"testing"
)

func TestInmemAddrImpl(t *testing.T) {
var inm interface{} = NewInmemAddr()
if _, ok := inm.(net.Addr); !ok {
t.Fatalf("InmemAddr is not a net.Addr")
}
}

func TestInmemAddr(t *testing.T) {
inm := NewInmemAddr()
if inm.Network() != "inmem" {
t.Fatalf("bad network")
}
if inm.String() != inm.ID {
t.Fatalf("bad string")
}
}

func TestInmemTransportImpl(t *testing.T) {
var inm interface{} = &InmemTransport{}
if _, ok := inm.(Transport); !ok {
Expand Down
4 changes: 1 addition & 3 deletions log.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package raft

import "net"

// LogType describes various types of log entries.
type LogType uint8

Expand Down Expand Up @@ -36,7 +34,7 @@ type Log struct {

// Peer is not exported since it is not transmitted, only used
// internally to construct the Data field.
peer net.Addr
peer string
}

// LogStore is used to provide an interface for storing
Expand Down
43 changes: 19 additions & 24 deletions net_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ var (
NetworkTransport provides a network based transport that can be
used to communicate with Raft on remote machines. It requires
an underlying stream layer to provide a stream abstraction, which can
be simple TCP, TLS, etc. Underlying addresses must be castable to TCPAddr
be simple TCP, TLS, etc.
This transport is very simple and lightweight. Each RPC request is
framed by sending a byte that indicates the message type, followed
Expand Down Expand Up @@ -88,7 +88,7 @@ type StreamLayer interface {
}

type netConn struct {
target net.Addr
target string
conn net.Conn
r *bufio.Reader
w *bufio.Writer
Expand Down Expand Up @@ -167,8 +167,8 @@ func (n *NetworkTransport) Consumer() <-chan RPC {
}

// LocalAddr implements the Transport interface.
func (n *NetworkTransport) LocalAddr() net.Addr {
return n.stream.Addr()
func (n *NetworkTransport) LocalAddr() string {
return n.stream.Addr().String()
}

// IsShutdown is used to check if the transport is shutdown
Expand All @@ -182,32 +182,31 @@ func (n *NetworkTransport) IsShutdown() bool {
}

// getExistingConn is used to grab a pooled connection
func (n *NetworkTransport) getPooledConn(target net.Addr) *netConn {
func (n *NetworkTransport) getPooledConn(target string) *netConn {
n.connPoolLock.Lock()
defer n.connPoolLock.Unlock()

key := target.String()
conns, ok := n.connPool[key]
conns, ok := n.connPool[target]
if !ok || len(conns) == 0 {
return nil
}

var conn *netConn
num := len(conns)
conn, conns[num-1] = conns[num-1], nil
n.connPool[key] = conns[:num-1]
n.connPool[target] = conns[:num-1]
return conn
}

// getConn is used to get a connection from the pool
func (n *NetworkTransport) getConn(target net.Addr) (*netConn, error) {
func (n *NetworkTransport) getConn(target string) (*netConn, error) {
// Check for a pooled conn
if conn := n.getPooledConn(target); conn != nil {
return conn, nil
}

// Dial a new connection
conn, err := n.stream.Dial(target.String(), n.timeout)
conn, err := n.stream.Dial(target, n.timeout)
if err != nil {
return nil, err
}
Expand All @@ -233,7 +232,7 @@ func (n *NetworkTransport) returnConn(conn *netConn) {
n.connPoolLock.Lock()
defer n.connPoolLock.Unlock()

key := conn.target.String()
key := conn.target
conns, _ := n.connPool[key]

if !n.IsShutdown() && len(conns) < n.maxPool {
Expand All @@ -245,7 +244,7 @@ func (n *NetworkTransport) returnConn(conn *netConn) {

// AppendEntriesPipeline returns an interface that can be used to pipeline
// AppendEntries requests.
func (n *NetworkTransport) AppendEntriesPipeline(target net.Addr) (AppendPipeline, error) {
func (n *NetworkTransport) AppendEntriesPipeline(target string) (AppendPipeline, error) {
// Get a connection
conn, err := n.getConn(target)
if err != nil {
Expand All @@ -257,17 +256,17 @@ func (n *NetworkTransport) AppendEntriesPipeline(target net.Addr) (AppendPipelin
}

// AppendEntries implements the Transport interface.
func (n *NetworkTransport) AppendEntries(target net.Addr, args *AppendEntriesRequest, resp *AppendEntriesResponse) error {
func (n *NetworkTransport) AppendEntries(target string, args *AppendEntriesRequest, resp *AppendEntriesResponse) error {
return n.genericRPC(target, rpcAppendEntries, args, resp)
}

// RequestVote implements the Transport interface.
func (n *NetworkTransport) RequestVote(target net.Addr, args *RequestVoteRequest, resp *RequestVoteResponse) error {
func (n *NetworkTransport) RequestVote(target string, args *RequestVoteRequest, resp *RequestVoteResponse) error {
return n.genericRPC(target, rpcRequestVote, args, resp)
}

// genericRPC handles a simple request/response RPC
func (n *NetworkTransport) genericRPC(target net.Addr, rpcType uint8, args interface{}, resp interface{}) error {
func (n *NetworkTransport) genericRPC(target string, rpcType uint8, args interface{}, resp interface{}) error {
// Get a conn
conn, err := n.getConn(target)
if err != nil {
Expand All @@ -293,7 +292,7 @@ func (n *NetworkTransport) genericRPC(target net.Addr, rpcType uint8, args inter
}

// InstallSnapshot implements the Transport interface.
func (n *NetworkTransport) InstallSnapshot(target net.Addr, args *InstallSnapshotRequest, resp *InstallSnapshotResponse, data io.Reader) error {
func (n *NetworkTransport) InstallSnapshot(target string, args *InstallSnapshotRequest, resp *InstallSnapshotResponse, data io.Reader) error {
// Get a conn, always close for InstallSnapshot
conn, err := n.getConn(target)
if err != nil {
Expand Down Expand Up @@ -331,17 +330,13 @@ func (n *NetworkTransport) InstallSnapshot(target net.Addr, args *InstallSnapsho
}

// EncodePeer implements the Transport interface.
func (n *NetworkTransport) EncodePeer(p net.Addr) []byte {
return []byte(p.String())
func (n *NetworkTransport) EncodePeer(p string) []byte {
return []byte(p)
}

// DecodePeer implements the Transport interface.
func (n *NetworkTransport) DecodePeer(buf []byte) net.Addr {
addr, err := net.ResolveTCPAddr("tcp", string(buf))
if err != nil {
panic(fmt.Errorf("failed to parse network address: %s", buf))
}
return addr
func (n *NetworkTransport) DecodePeer(buf []byte) string {
return string(buf)
}

// listen is used to handling incoming connections
Expand Down
4 changes: 2 additions & 2 deletions net_transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ func TestNetworkTransport_EncodeDecode(t *testing.T) {
enc := trans1.EncodePeer(local)
dec := trans1.DecodePeer(enc)

if dec.String() != local.String() {
if dec != local {
t.Fatalf("enc/dec fail: %v %v", dec, local)
}
}
Expand Down Expand Up @@ -443,7 +443,7 @@ func TestNetworkTransport_PooledConn(t *testing.T) {

// Check the conn pool size
addr := trans1.LocalAddr()
if len(trans2.connPool[addr.String()]) != 3 {
if len(trans2.connPool[addr]) != 3 {
t.Fatalf("Expected 2 pooled conns!")
}
}
Loading

0 comments on commit a8065f2

Please sign in to comment.