Skip to content

Commit efe70d4

Browse files
author
Anthony Romano
committed
etcdmain, tcpproxy: srv-priority policy
Adds DNS SRV weighting and priorities to gateway. Fixes #4378
1 parent 07ad181 commit efe70d4

File tree

5 files changed

+127
-37
lines changed

5 files changed

+127
-37
lines changed

etcdmain/gateway.go

+27-8
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
var (
3030
gatewayListenAddr string
3131
gatewayEndpoints []string
32+
gatewayEndpointPolicy string
3233
gatewayDNSCluster string
3334
gatewayInsecureDiscovery bool
3435
getewayRetryDelay time.Duration
@@ -67,6 +68,7 @@ func newGatewayStartCommand() *cobra.Command {
6768

6869
cmd.Flags().StringVar(&gatewayListenAddr, "listen-addr", "127.0.0.1:23790", "listen address")
6970
cmd.Flags().StringVar(&gatewayDNSCluster, "discovery-srv", "", "DNS domain used to bootstrap initial cluster")
71+
cmd.Flags().StringVar(&gatewayEndpointPolicy, "endpoint-policy", "round-robin", "Policy for selecting next connection's endpoint (round-robin, srv-priority)")
7072
cmd.Flags().BoolVar(&gatewayInsecureDiscovery, "insecure-discovery", false, "accept insecure SRV records")
7173
cmd.Flags().StringVar(&gatewayCA, "trusted-ca-file", "", "path to the client server TLS CA file.")
7274

@@ -91,17 +93,27 @@ func stripSchema(eps []string) []string {
9193

9294
return endpoints
9395
}
94-
func startGateway(cmd *cobra.Command, args []string) {
95-
endpoints := gatewayEndpoints
9696

97-
if eps := discoverEndpoints(gatewayDNSCluster, gatewayCA, gatewayInsecureDiscovery); len(eps) != 0 {
98-
endpoints = eps
97+
func startGateway(cmd *cobra.Command, args []string) {
98+
srvs := discoverEndpoints(gatewayDNSCluster, gatewayCA, gatewayInsecureDiscovery)
99+
if len(srvs.Endpoints) != 0 {
100+
srvs.Endpoints = gatewayEndpoints
99101
}
100-
101102
// Strip the schema from the endpoints because we start just a TCP proxy
102-
endpoints = stripSchema(endpoints)
103+
srvs.Endpoints = stripSchema(srvs.Endpoints)
104+
if len(srvs.SRVs) == 0 {
105+
for _, ep := range srvs.Endpoints {
106+
h, p, err := net.SplitHostPort(ep)
107+
if err != nil {
108+
plog.Fatalf("error parsing endpoint %q", ep)
109+
}
110+
var port uint16
111+
fmt.Sscanf(p, "%d", &port)
112+
srvs.SRVs = append(srvs.SRVs, &net.SRV{Target: h, Port: port})
113+
}
114+
}
103115

104-
if len(endpoints) == 0 {
116+
if len(srvs.Endpoints) == 0 {
105117
plog.Fatalf("no endpoints found")
106118
}
107119

@@ -111,10 +123,17 @@ func startGateway(cmd *cobra.Command, args []string) {
111123
os.Exit(1)
112124
}
113125

126+
picker, perr := tcpproxy.NewPicker(gatewayEndpointPolicy)
127+
if perr != nil {
128+
fmt.Fprintln(os.Stderr, err)
129+
os.Exit(1)
130+
}
131+
114132
tp := tcpproxy.TCPProxy{
115133
Listener: l,
116-
Endpoints: endpoints,
134+
Endpoints: srvs.SRVs,
117135
MonitorInterval: getewayRetryDelay,
136+
Picker: picker,
118137
}
119138

120139
// At this point, etcd gateway listener is initialized

etcdmain/grpc_proxy.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -106,8 +106,9 @@ func startGRPCProxy(cmd *cobra.Command, args []string) {
106106
os.Exit(1)
107107
}
108108

109-
if eps := discoverEndpoints(grpcProxyDNSCluster, grpcProxyCA, grpcProxyInsecureDiscovery); len(eps) != 0 {
110-
grpcProxyEndpoints = eps
109+
srvs := discoverEndpoints(grpcProxyDNSCluster, grpcProxyCA, grpcProxyInsecureDiscovery)
110+
if len(srvs.Endpoints) != 0 {
111+
grpcProxyEndpoints = srvs.Endpoints
111112
}
112113

113114
l, err := net.Listen("tcp", grpcProxyListenAddr)

etcdmain/util.go

+19-5
Original file line numberDiff line numberDiff line change
@@ -22,19 +22,19 @@ import (
2222
"github.com/coreos/etcd/pkg/transport"
2323
)
2424

25-
func discoverEndpoints(dns string, ca string, insecure bool) (endpoints []string) {
25+
func discoverEndpoints(dns string, ca string, insecure bool) (s srv.SRVClients) {
2626
if dns == "" {
27-
return nil
27+
return s
2828
}
2929
srvs, err := srv.GetClient("etcd-client", dns)
3030
if err != nil {
3131
fmt.Fprintln(os.Stderr, err)
3232
os.Exit(1)
3333
}
34-
endpoints = srvs.Endpoints
34+
endpoints := srvs.Endpoints
3535
plog.Infof("discovered the cluster %s from %s", endpoints, dns)
3636
if insecure {
37-
return endpoints
37+
return *srvs
3838
}
3939
// confirm TLS connections are good
4040
tlsInfo := transport.TLSInfo{
@@ -47,5 +47,19 @@ func discoverEndpoints(dns string, ca string, insecure bool) (endpoints []string
4747
plog.Warningf("%v", err)
4848
}
4949
plog.Infof("using discovered endpoints %v", endpoints)
50-
return endpoints
50+
51+
// map endpoints back to SRVClients struct with SRV data
52+
eps := make(map[string]struct{})
53+
for _, ep := range endpoints {
54+
eps[ep] = struct{}{}
55+
}
56+
for i := range srvs.Endpoints {
57+
if _, ok := eps[srvs.Endpoints[i]]; !ok {
58+
continue
59+
}
60+
s.Endpoints = append(s.Endpoints, srvs.Endpoints[i])
61+
s.SRVs = append(s.SRVs, srvs.SRVs[i])
62+
}
63+
64+
return s
5165
}

proxy/tcpproxy/userspace.go

+75-21
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,9 @@
1515
package tcpproxy
1616

1717
import (
18+
"fmt"
1819
"io"
20+
"math/rand"
1921
"net"
2022
"sync"
2123
"time"
@@ -29,6 +31,7 @@ var (
2931

3032
type remote struct {
3133
mu sync.Mutex
34+
srv *net.SRV
3235
addr string
3336
inactive bool
3437
}
@@ -59,26 +62,30 @@ func (r *remote) isActive() bool {
5962

6063
type TCPProxy struct {
6164
Listener net.Listener
62-
Endpoints []string
65+
Endpoints []*net.SRV
6366
MonitorInterval time.Duration
67+
Picker PickerFunc
6468

6569
donec chan struct{}
6670

67-
mu sync.Mutex // guards the following fields
68-
remotes []*remote
69-
nextRemote int
71+
mu sync.Mutex // guards the following fields
72+
remotes []*remote
7073
}
7174

7275
func (tp *TCPProxy) Run() error {
76+
if tp.Picker == nil {
77+
tp.Picker, _ = NewPicker("round-robin")
78+
}
7379
tp.donec = make(chan struct{})
7480
if tp.MonitorInterval == 0 {
7581
tp.MonitorInterval = 5 * time.Minute
7682
}
77-
for _, ep := range tp.Endpoints {
78-
tp.remotes = append(tp.remotes, &remote{addr: ep})
83+
for _, srv := range tp.Endpoints {
84+
addr := fmt.Sprintf("%s:%d", srv.Target, srv.Port)
85+
tp.remotes = append(tp.remotes, &remote{srv: srv, addr: addr})
7986
}
8087

81-
plog.Printf("ready to proxy client requests to %v", tp.Endpoints)
88+
plog.Printf("ready to proxy client requests to %+v", tp.Endpoints)
8289
go tp.runMonitor()
8390
for {
8491
in, err := tp.Listener.Accept()
@@ -90,6 +97,61 @@ func (tp *TCPProxy) Run() error {
9097
}
9198
}
9299

100+
type PickerFunc func(tp *TCPProxy) *remote
101+
102+
// NewPicker returns a picker to choose remotes for the tcp proxy.
103+
func NewPicker(name string) (PickerFunc, error) {
104+
switch name {
105+
case "srv-priority":
106+
return srvPicker, nil
107+
case "round-robin":
108+
nextRemote := 0
109+
f := func(tp *TCPProxy) *remote {
110+
for i := 0; i < len(tp.remotes); i++ {
111+
picked := tp.remotes[nextRemote]
112+
nextRemote = (nextRemote + 1) % len(tp.remotes)
113+
if picked.isActive() {
114+
return picked
115+
}
116+
}
117+
return nil
118+
}
119+
return f, nil
120+
default:
121+
}
122+
return nil, fmt.Errorf("unknown picker %q", name)
123+
}
124+
125+
func srvPicker(tp *TCPProxy) *remote {
126+
bestPr := uint16(65535)
127+
w := 0
128+
var candidates []*remote
129+
// find best priority class
130+
for _, r := range tp.remotes {
131+
switch {
132+
case !r.isActive():
133+
case r.srv.Priority < bestPr:
134+
bestPr = r.srv.Priority
135+
candidates = []*remote{r}
136+
w = int(r.srv.Weight) + 1
137+
case r.srv.Priority == bestPr:
138+
candidates = append(candidates, r)
139+
w += int(r.srv.Weight) + 1
140+
}
141+
}
142+
// randomly choose by weight
143+
if len(candidates) > 0 {
144+
choose := rand.Intn(w)
145+
for i := 0; i < len(candidates); i++ {
146+
choose -= int(candidates[i].srv.Weight) + 1
147+
if choose <= 0 {
148+
return candidates[i]
149+
}
150+
}
151+
}
152+
return nil
153+
}
154+
93155
func (tp *TCPProxy) numRemotes() int {
94156
tp.mu.Lock()
95157
defer tp.mu.Unlock()
@@ -102,10 +164,12 @@ func (tp *TCPProxy) serve(in net.Conn) {
102164
out net.Conn
103165
)
104166

105-
for i := 0; i < tp.numRemotes(); i++ {
106-
remote := tp.pick()
107-
if !remote.isActive() {
108-
continue
167+
for {
168+
tp.mu.Lock()
169+
remote := tp.Picker(tp)
170+
tp.mu.Unlock()
171+
if remote == nil {
172+
break
109173
}
110174
// TODO: add timeout
111175
out, err = net.Dial("tcp", remote.addr)
@@ -132,16 +196,6 @@ func (tp *TCPProxy) serve(in net.Conn) {
132196
in.Close()
133197
}
134198

135-
// pick picks a remote in round-robin fashion
136-
func (tp *TCPProxy) pick() *remote {
137-
tp.mu.Lock()
138-
defer tp.mu.Unlock()
139-
140-
picked := tp.remotes[tp.nextRemote]
141-
tp.nextRemote = (tp.nextRemote + 1) % len(tp.remotes)
142-
return picked
143-
}
144-
145199
func (tp *TCPProxy) runMonitor() {
146200
for {
147201
select {

proxy/tcpproxy/userspace_test.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,11 @@ func TestUserspaceProxy(t *testing.T) {
4242
t.Fatal(err)
4343
}
4444

45+
var port uint16
46+
fmt.Sscanf(u.Port(), "%d", &port)
4547
p := TCPProxy{
4648
Listener: l,
47-
Endpoints: []string{u.Host},
49+
Endpoints: []*net.SRV{{Target: u.Hostname(), Port: port}},
4850
}
4951
go p.Run()
5052
defer p.Stop()

0 commit comments

Comments
 (0)