diff --git a/client/client.go b/client/client.go index ece4cc0cd4e5..5f6df263f3db 100644 --- a/client/client.go +++ b/client/client.go @@ -48,6 +48,19 @@ var DefaultTransport CancelableTransport = &http.Transport{ TLSHandshakeTimeout: 10 * time.Second, } +type EndpointSelectionMode int + +const ( + // EndpointSelectionRandom is to pick an endpoint in a random manner. + EndpointSelectionRandom EndpointSelectionMode = iota + + // EndpointSelectionPrioritizeLeader is to prioritize leader for reducing needless + // forward between follower and leader. + // + // This mode should be used with Client.AutoSync(). + EndpointSelectionPrioritizeLeader +) + type Config struct { // Endpoints defines a set of URLs (schemes, hosts and ports only) // that can be used to communicate with a logical etcd cluster. For @@ -104,6 +117,9 @@ type Config struct { // // A HeaderTimeoutPerRequest of zero means no timeout. HeaderTimeoutPerRequest time.Duration + + // SelectionMode specifies a way of selecting destination endpoint. + SelectionMode EndpointSelectionMode } func (cfg *Config) transport() CancelableTransport { @@ -167,8 +183,9 @@ type Client interface { func New(cfg Config) (Client, error) { c := &httpClusterClient{ - clientFactory: newHTTPClientFactory(cfg.transport(), cfg.checkRedirect(), cfg.HeaderTimeoutPerRequest), - rand: rand.New(rand.NewSource(int64(time.Now().Nanosecond()))), + clientFactory: newHTTPClientFactory(cfg.transport(), cfg.checkRedirect(), cfg.HeaderTimeoutPerRequest), + rand: rand.New(rand.NewSource(int64(time.Now().Nanosecond()))), + selectionMode: cfg.SelectionMode, } if cfg.Username != "" { c.credentials = &credentials{ @@ -216,7 +233,18 @@ type httpClusterClient struct { pinned int credentials *credentials sync.RWMutex - rand *rand.Rand + rand *rand.Rand + selectionMode EndpointSelectionMode +} + +func (c *httpClusterClient) getLeaderEndpoint() (string, error) { + mAPI := NewMembersAPI(c) + leader, err := mAPI.Leader(context.Background()) + if err != nil { + return "", err + } + + return leader.ClientURLs[0], nil // TODO: how to handle multiple client URLs? } func (c *httpClusterClient) reset(eps []string) error { @@ -233,9 +261,32 @@ func (c *httpClusterClient) reset(eps []string) error { neps[i] = *u } - c.endpoints = shuffleEndpoints(c.rand, neps) - // TODO: pin old endpoint if possible, and rebalance when new endpoint appears - c.pinned = 0 + switch c.selectionMode { + case EndpointSelectionRandom: + c.endpoints = shuffleEndpoints(c.rand, neps) + c.pinned = 0 + case EndpointSelectionPrioritizeLeader: + c.endpoints = neps + // TODO: should return ErrNoEndpoints in a case of getting leader fail? + lep, err := c.getLeaderEndpoint() + if err != nil { + return ErrNoEndpoints + } + + lu, err := url.Parse(lep) + if err != nil { + return ErrNoEndpoints + } + + for i := range c.endpoints { + if c.endpoints[i].String() == lu.String() { + c.pinned = i + break + } + } + default: + return errors.New(fmt.Sprintf("invalid mode of endpoint selection: %d", c.selectionMode)) + } return nil }