Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

etcdserver: not reuse connections for peer transport #3796

Merged
merged 3 commits into from
Nov 4, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion etcdmain/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ func startEtcd(cfg *config) (<-chan struct{}, error) {
plog.Warningf("The scheme of peer url %s is http while peer key/cert files are presented. Ignored peer key/cert files.", u.String())
}
var l net.Listener
l, err = transport.NewTimeoutListener(u.Host, u.Scheme, cfg.peerTLSInfo, rafthttp.ConnReadTimeout, rafthttp.ConnWriteTimeout)
l, err = rafthttp.NewListener(u, cfg.peerTLSInfo)
if err != nil {
return nil, err
}
Expand Down
24 changes: 12 additions & 12 deletions etcdserver/cluster_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ import (

// isMemberBootstrapped tries to check if the given member has been bootstrapped
// in the given cluster.
func isMemberBootstrapped(cl *cluster, member string, tr *http.Transport) bool {
rcl, err := getClusterFromRemotePeers(getRemotePeerURLs(cl, member), time.Second, false, tr)
func isMemberBootstrapped(cl *cluster, member string, rt http.RoundTripper) bool {
rcl, err := getClusterFromRemotePeers(getRemotePeerURLs(cl, member), time.Second, false, rt)
if err != nil {
return false
}
Expand All @@ -52,14 +52,14 @@ func isMemberBootstrapped(cl *cluster, member string, tr *http.Transport) bool {
// response, an error is returned.
// Each request has a 10-second timeout. Because the upper limit of TTL is 5s,
// 10 second is enough for building connection and finishing request.
func GetClusterFromRemotePeers(urls []string, tr *http.Transport) (*cluster, error) {
return getClusterFromRemotePeers(urls, 10*time.Second, true, tr)
func GetClusterFromRemotePeers(urls []string, rt http.RoundTripper) (*cluster, error) {
return getClusterFromRemotePeers(urls, 10*time.Second, true, rt)
}

// If logerr is true, it prints out more error messages.
func getClusterFromRemotePeers(urls []string, timeout time.Duration, logerr bool, tr *http.Transport) (*cluster, error) {
func getClusterFromRemotePeers(urls []string, timeout time.Duration, logerr bool, rt http.RoundTripper) (*cluster, error) {
cc := &http.Client{
Transport: tr,
Transport: rt,
Timeout: timeout,
}
for _, u := range urls {
Expand Down Expand Up @@ -114,7 +114,7 @@ func getRemotePeerURLs(cl Cluster, local string) []string {
// The key of the returned map is the member's ID. The value of the returned map
// is the semver versions string, including server and cluster.
// If it fails to get the version of a member, the key will be nil.
func getVersions(cl Cluster, local types.ID, tr *http.Transport) map[string]*version.Versions {
func getVersions(cl Cluster, local types.ID, rt http.RoundTripper) map[string]*version.Versions {
members := cl.Members()
vers := make(map[string]*version.Versions)
for _, m := range members {
Expand All @@ -126,7 +126,7 @@ func getVersions(cl Cluster, local types.ID, tr *http.Transport) map[string]*ver
vers[m.ID.String()] = &version.Versions{Server: version.Version, Cluster: cv}
continue
}
ver, err := getVersion(m, tr)
ver, err := getVersion(m, rt)
if err != nil {
plog.Warningf("cannot get the version of member %s (%v)", m.ID, err)
vers[m.ID.String()] = nil
Expand Down Expand Up @@ -172,8 +172,8 @@ func decideClusterVersion(vers map[string]*version.Versions) *semver.Version {
// cluster version in the range of [MinClusterVersion, Version] and no known members has a cluster version
// out of the range.
// We set this rule since when the local member joins, another member might be offline.
func isCompatibleWithCluster(cl Cluster, local types.ID, tr *http.Transport) bool {
vers := getVersions(cl, local, tr)
func isCompatibleWithCluster(cl Cluster, local types.ID, rt http.RoundTripper) bool {
vers := getVersions(cl, local, rt)
minV := semver.Must(semver.NewVersion(version.MinClusterVersion))
maxV := semver.Must(semver.NewVersion(version.Version))
maxV = &semver.Version{
Expand Down Expand Up @@ -214,9 +214,9 @@ func isCompatibleWithVers(vers map[string]*version.Versions, local types.ID, min

// getVersion returns the Versions of the given member via its
// peerURLs. Returns the last error if it fails to get the version.
func getVersion(m *Member, tr *http.Transport) (*version.Versions, error) {
func getVersion(m *Member, rt http.RoundTripper) (*version.Versions, error) {
cc := &http.Client{
Transport: tr,
Transport: rt,
}
var (
err error
Expand Down
17 changes: 8 additions & 9 deletions etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ import (
"github.com/coreos/etcd/pkg/pbutil"
"github.com/coreos/etcd/pkg/runtime"
"github.com/coreos/etcd/pkg/timeutil"
"github.com/coreos/etcd/pkg/transport"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/pkg/wait"
"github.com/coreos/etcd/raft"
Expand Down Expand Up @@ -171,8 +170,8 @@ type EtcdServer struct {
// consistent index used to hold the offset of current executing entry
// It is initialized to 0 before executing any entry.
consistIndex consistentIndex
// versionTr used to send requests for peer version
versionTr *http.Transport
// versionRt used to send requests for peer version
versionRt http.RoundTripper
reqIDGen *idutil.Generator

// forceVersionC is used to force the version monitor loop
Expand Down Expand Up @@ -211,7 +210,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
haveWAL := wal.Exist(cfg.WALDir())
ss := snap.New(cfg.SnapDir())

pt, err := transport.NewTransport(cfg.PeerTLSInfo, cfg.peerDialTimeout())
prt, err := rafthttp.NewRoundTripper(cfg.PeerTLSInfo, cfg.peerDialTimeout())
if err != nil {
return nil, err
}
Expand All @@ -225,14 +224,14 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
if err != nil {
return nil, err
}
existingCluster, err := GetClusterFromRemotePeers(getRemotePeerURLs(cl, cfg.Name), pt)
existingCluster, err := GetClusterFromRemotePeers(getRemotePeerURLs(cl, cfg.Name), prt)
if err != nil {
return nil, fmt.Errorf("cannot fetch cluster info from peer urls: %v", err)
}
if err := ValidateClusterAndAssignIDs(cl, existingCluster); err != nil {
return nil, fmt.Errorf("error validating peerURLs %s: %v", existingCluster, err)
}
if !isCompatibleWithCluster(cl, cl.MemberByName(cfg.Name).ID, pt) {
if !isCompatibleWithCluster(cl, cl.MemberByName(cfg.Name).ID, prt) {
return nil, fmt.Errorf("incomptible with current running cluster")
}

Expand All @@ -250,7 +249,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
return nil, err
}
m := cl.MemberByName(cfg.Name)
if isMemberBootstrapped(cl, cfg.Name, pt) {
if isMemberBootstrapped(cl, cfg.Name, prt) {
return nil, fmt.Errorf("member %s has already been bootstrapped", m.ID)
}
if cfg.ShouldDiscover() {
Expand Down Expand Up @@ -338,7 +337,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
stats: sstats,
lstats: lstats,
SyncTicker: time.Tick(500 * time.Millisecond),
versionTr: pt,
versionRt: prt,
reqIDGen: idutil.NewGenerator(uint8(id), time.Now()),
forceVersionC: make(chan struct{}),
}
Expand Down Expand Up @@ -1090,7 +1089,7 @@ func (s *EtcdServer) monitorVersions() {
continue
}

v := decideClusterVersion(getVersions(s.cluster, s.id, s.versionTr))
v := decideClusterVersion(getVersions(s.cluster, s.id, s.versionRt))
if v != nil {
// only keep major.minor version for comparasion
v = &semver.Version{
Expand Down
7 changes: 2 additions & 5 deletions rafthttp/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,14 +136,11 @@ type Transport struct {

func (t *Transport) Start() error {
var err error
// Read/write timeout is set for stream roundTripper to promptly
// find out broken status, which minimizes the number of messages
// sent on broken connection.
t.streamRt, err = transport.NewTimeoutTransport(t.TLSInfo, t.DialTimeout, ConnReadTimeout, ConnWriteTimeout)
t.streamRt, err = newStreamRoundTripper(t.TLSInfo, t.DialTimeout)
if err != nil {
return err
}
t.pipelineRt, err = transport.NewTransport(t.TLSInfo, t.DialTimeout)
t.pipelineRt, err = NewRoundTripper(t.TLSInfo, t.DialTimeout)
if err != nil {
return err
}
Expand Down
27 changes: 27 additions & 0 deletions rafthttp/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,45 @@ import (
"encoding/binary"
"fmt"
"io"
"net"
"net/http"
"net/url"
"strings"
"time"

"github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/go-semver/semver"
"github.com/coreos/etcd/pkg/transport"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/version"
)

var errMemberRemoved = fmt.Errorf("the member has been permanently removed from the cluster")

// NewListener returns a listener for raft message transfer between peers.
// It uses timeout listener to identify broken streams promptly.
func NewListener(u url.URL, tlsInfo transport.TLSInfo) (net.Listener, error) {
return transport.NewTimeoutListener(u.Host, u.Scheme, tlsInfo, ConnReadTimeout, ConnWriteTimeout)
}

// NewRoundTripper returns a roundTripper used to send requests
// to rafthttp listener of remote peers.
func NewRoundTripper(tlsInfo transport.TLSInfo, dialTimeout time.Duration) (http.RoundTripper, error) {
// It uses timeout transport to pair with remote timeout listeners.
// It sets no read/write timeout, because message in requests may
// take long time to write out before reading out the response.
return transport.NewTimeoutTransport(tlsInfo, dialTimeout, 0, 0)
}

// newStreamRoundTripper returns a roundTripper used to send stream requests
// to rafthttp listener of remote peers.
// Read/write timeout is set for stream roundTripper to promptly
// find out broken status, which minimizes the number of messages
// sent on broken connection.
func newStreamRoundTripper(tlsInfo transport.TLSInfo, dialTimeout time.Duration) (http.RoundTripper, error) {
return transport.NewTimeoutTransport(tlsInfo, dialTimeout, ConnReadTimeout, ConnWriteTimeout)
}

func writeEntryTo(w io.Writer, ent *raftpb.Entry) error {
size := ent.Size()
if err := binary.Write(w, binary.BigEndian, uint64(size)); err != nil {
Expand Down