Skip to content

Commit 94df716

Browse files
authored
resolver: State: add Endpoints and deprecate Addresses (#6471)
1 parent 20c51a9 commit 94df716

File tree

11 files changed

+132
-43
lines changed

11 files changed

+132
-43
lines changed

balancer/balancer.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -105,8 +105,8 @@ type SubConn interface {
105105
//
106106
// This will trigger a state transition for the SubConn.
107107
//
108-
// Deprecated: This method is now part of the ClientConn interface and will
109-
// eventually be removed from here.
108+
// Deprecated: this method will be removed. Create new SubConns for new
109+
// addresses instead.
110110
UpdateAddresses([]resolver.Address)
111111
// Connect starts the connecting for this SubConn.
112112
Connect()
@@ -150,6 +150,9 @@ type ClientConn interface {
150150
// NewSubConn is called by balancer to create a new SubConn.
151151
// It doesn't block and wait for the connections to be established.
152152
// Behaviors of the SubConn can be controlled by options.
153+
//
154+
// Deprecated: please be aware that in a future version, SubConns will only
155+
// support one address per SubConn.
153156
NewSubConn([]resolver.Address, NewSubConnOptions) (SubConn, error)
154157
// RemoveSubConn removes the SubConn from ClientConn.
155158
// The SubConn will be shutdown.
@@ -159,7 +162,10 @@ type ClientConn interface {
159162
// If so, the connection will be kept. Else, the connection will be
160163
// gracefully closed, and a new connection will be created.
161164
//
162-
// This will trigger a state transition for the SubConn.
165+
// This may trigger a state transition for the SubConn.
166+
//
167+
// Deprecated: this method will be removed. Create new SubConns for new
168+
// addresses instead.
163169
UpdateAddresses(SubConn, []resolver.Address)
164170

165171
// UpdateState notifies gRPC that the balancer's internal state has

internal/internal.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,10 @@ var (
164164

165165
// ORCAAllowAnyMinReportingInterval is for examples/orca use ONLY.
166166
ORCAAllowAnyMinReportingInterval interface{} // func(so *orca.ServiceOptions)
167+
168+
// GRPCResolverSchemeExtraMetadata determines when gRPC will add extra
169+
// metadata to RPCs.
170+
GRPCResolverSchemeExtraMetadata string = "xds"
167171
)
168172

169173
// HealthChecker defines the signature of the client-side LB channel health checking function.

resolver/resolver.go

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,9 @@ type Address struct {
104104
// BalancerAttributes contains arbitrary data about this address intended
105105
// for consumption by the LB policy. These attributes do not affect SubConn
106106
// creation, connection establishment, handshaking, etc.
107+
//
108+
// Deprecated: when an Address is inside an Endpoint, this field should not
109+
// be used, and it will eventually be removed entirely.
107110
BalancerAttributes *attributes.Attributes
108111

109112
// Metadata is the information associated with Addr, which may be used
@@ -167,11 +170,37 @@ type BuildOptions struct {
167170
Dialer func(context.Context, string) (net.Conn, error)
168171
}
169172

173+
// An Endpoint is one network endpoint, or server, which may have multiple
174+
// addresses with which it can be accessed.
175+
type Endpoint struct {
176+
// Addresses contains a list of addresses used to access this endpoint.
177+
Addresses []Address
178+
179+
// Attributes contains arbitrary data about this endpoint intended for
180+
// consumption by the LB policy.
181+
Attributes *attributes.Attributes
182+
}
183+
170184
// State contains the current Resolver state relevant to the ClientConn.
171185
type State struct {
172186
// Addresses is the latest set of resolved addresses for the target.
187+
//
188+
// If a resolver sets Addresses but does not set Endpoints, one Endpoint
189+
// will be created for each Address before the State is passed to the LB
190+
// policy. The BalancerAttributes of each entry in Addresses will be set
191+
// in Endpoints.Attributes, and be cleared in the Endpoint's Address's
192+
// BalancerAttributes.
193+
//
194+
// Soon, Addresses will be deprecated and replaced fully by Endpoints.
173195
Addresses []Address
174196

197+
// Endpoints is the latest set of resolved endpoints for the target.
198+
//
199+
// If a resolver produces a State containing Endpoints but not Addresses,
200+
// it must take care to ensure the LB policies it selects will support
201+
// Endpoints.
202+
Endpoints []Endpoint
203+
175204
// ServiceConfig contains the result from parsing the latest service
176205
// config. If it is nil, it indicates no service config is present or the
177206
// resolver does not provide service configs.
@@ -294,10 +323,3 @@ type Resolver interface {
294323
// Close closes the resolver.
295324
Close()
296325
}
297-
298-
// UnregisterForTesting removes the resolver builder with the given scheme from the
299-
// resolver map.
300-
// This function is for testing only.
301-
func UnregisterForTesting(scheme string) {
302-
delete(m, scheme)
303-
}

resolver_conn_wrapper.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,14 @@ func (ccr *ccResolverWrapper) serializerScheduleLocked(f func(context.Context))
152152
// which includes addresses and service config.
153153
func (ccr *ccResolverWrapper) UpdateState(s resolver.State) error {
154154
errCh := make(chan error, 1)
155+
if s.Endpoints == nil {
156+
s.Endpoints = make([]resolver.Endpoint, 0, len(s.Addresses))
157+
for _, a := range s.Addresses {
158+
ep := resolver.Endpoint{Addresses: []resolver.Address{a}, Attributes: a.BalancerAttributes}
159+
ep.Addresses[0].BalancerAttributes = nil
160+
s.Endpoints = append(s.Endpoints, ep)
161+
}
162+
}
155163
ok := ccr.serializer.Schedule(func(context.Context) {
156164
ccr.addChannelzTraceEvent(s)
157165
ccr.curState = s

resolver_test.go

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,13 @@ import (
2424
"net"
2525
"testing"
2626

27+
"github.com/google/go-cmp/cmp"
28+
"google.golang.org/grpc/attributes"
29+
"google.golang.org/grpc/balancer"
2730
"google.golang.org/grpc/credentials/insecure"
31+
"google.golang.org/grpc/internal/balancer/stub"
2832
"google.golang.org/grpc/resolver"
33+
"google.golang.org/grpc/resolver/manual"
2934
)
3035

3136
type wrapResolverBuilder struct {
@@ -91,3 +96,49 @@ func (s) TestResolverCaseSensitivity(t *testing.T) {
9196
}
9297
cc.Close()
9398
}
99+
100+
// TestResolverAddressesToEndpoints ensures one Endpoint is created for each
101+
// entry in resolver.State.Addresses automatically.
102+
func (s) TestResolverAddressesToEndpoints(t *testing.T) {
103+
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
104+
defer cancel()
105+
106+
const scheme = "testresolveraddressestoendpoints"
107+
r := manual.NewBuilderWithScheme(scheme)
108+
109+
stateCh := make(chan balancer.ClientConnState, 1)
110+
bf := stub.BalancerFuncs{
111+
UpdateClientConnState: func(_ *stub.BalancerData, ccs balancer.ClientConnState) error {
112+
stateCh <- ccs
113+
return nil
114+
},
115+
}
116+
balancerName := "stub-balancer-" + scheme
117+
stub.Register(balancerName, bf)
118+
119+
a1 := attributes.New("x", "y")
120+
a2 := attributes.New("a", "b")
121+
r.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: "addr1", BalancerAttributes: a1}, {Addr: "addr2", BalancerAttributes: a2}}})
122+
123+
cc, err := Dial(r.Scheme()+":///",
124+
WithTransportCredentials(insecure.NewCredentials()),
125+
WithResolvers(r),
126+
WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, balancerName)))
127+
if err != nil {
128+
t.Fatalf("Unexpected error dialing: %v", err)
129+
}
130+
defer cc.Close()
131+
132+
select {
133+
case got := <-stateCh:
134+
want := []resolver.Endpoint{
135+
{Addresses: []resolver.Address{{Addr: "addr1"}}, Attributes: a1},
136+
{Addresses: []resolver.Address{{Addr: "addr2"}}, Attributes: a2},
137+
}
138+
if diff := cmp.Diff(got.ResolverState.Endpoints, want); diff != "" {
139+
t.Errorf("Did not receive expected endpoints. Diff (-got +want):\n%v", diff)
140+
}
141+
case <-ctx.Done():
142+
t.Fatalf("timed out waiting for endpoints")
143+
}
144+
}

stream.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131
"google.golang.org/grpc/balancer"
3232
"google.golang.org/grpc/codes"
3333
"google.golang.org/grpc/encoding"
34+
"google.golang.org/grpc/internal"
3435
"google.golang.org/grpc/internal/balancerload"
3536
"google.golang.org/grpc/internal/binarylog"
3637
"google.golang.org/grpc/internal/channelz"
@@ -433,7 +434,7 @@ func (cs *clientStream) newAttemptLocked(isTransparent bool) (*csAttempt, error)
433434
ctx = trace.NewContext(ctx, trInfo.tr)
434435
}
435436

436-
if cs.cc.parsedTarget.URL.Scheme == "xds" {
437+
if cs.cc.parsedTarget.URL.Scheme == internal.GRPCResolverSchemeExtraMetadata {
437438
// Add extra metadata (metadata that will be added by transport) to context
438439
// so the balancer can see them.
439440
ctx = grpcutil.WithExtraMetadata(ctx, metadata.Pairs(

test/balancer_test.go

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import (
3636
"google.golang.org/grpc/connectivity"
3737
"google.golang.org/grpc/credentials"
3838
"google.golang.org/grpc/credentials/insecure"
39+
"google.golang.org/grpc/internal"
3940
"google.golang.org/grpc/internal/balancer/stub"
4041
"google.golang.org/grpc/internal/balancerload"
4142
"google.golang.org/grpc/internal/grpcsync"
@@ -196,14 +197,10 @@ func testPickExtraMetadata(t *testing.T, e env) {
196197
te.startServer(&testServer{security: e.security})
197198
defer te.tearDown()
198199

199-
// Set resolver to xds to trigger the extra metadata code path.
200-
r := manual.NewBuilderWithScheme("xds")
201-
resolver.Register(r)
202-
defer func() {
203-
resolver.UnregisterForTesting("xds")
204-
}()
205-
r.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: te.srvAddr}}})
206-
te.resolverScheme = "xds"
200+
// Trigger the extra-metadata-adding code path.
201+
defer func(old string) { internal.GRPCResolverSchemeExtraMetadata = old }(internal.GRPCResolverSchemeExtraMetadata)
202+
internal.GRPCResolverSchemeExtraMetadata = "passthrough"
203+
207204
cc := te.clientConn()
208205
tc := testgrpc.NewTestServiceClient(cc)
209206

xds/googledirectpath/googlec2p_test.go

Lines changed: 2 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -60,31 +60,13 @@ var (
6060
)
6161

6262
func replaceResolvers() func() {
63-
var registerForTesting bool
64-
if resolver.Get(c2pScheme) == nil {
65-
// If env var to enable c2p is not set, the resolver isn't registered.
66-
// Need to register and unregister in defer.
67-
registerForTesting = true
68-
resolver.Register(&c2pResolverBuilder{})
69-
}
7063
oldDNS := resolver.Get("dns")
7164
resolver.Register(testDNSResolver)
7265
oldXDS := resolver.Get("xds")
7366
resolver.Register(testXDSResolver)
7467
return func() {
75-
if oldDNS != nil {
76-
resolver.Register(oldDNS)
77-
} else {
78-
resolver.UnregisterForTesting("dns")
79-
}
80-
if oldXDS != nil {
81-
resolver.Register(oldXDS)
82-
} else {
83-
resolver.UnregisterForTesting("xds")
84-
}
85-
if registerForTesting {
86-
resolver.UnregisterForTesting(c2pScheme)
87-
}
68+
resolver.Register(oldDNS)
69+
resolver.Register(oldXDS)
8870
}
8971
}
9072

xds/internal/balancer/clusterresolver/clusterresolver.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -253,8 +253,15 @@ func (b *clusterResolverBalancer) updateChildConfig() {
253253
}
254254
b.logger.Infof("Built child policy config: %v", pretty.ToJSON(childCfg))
255255

256+
endpoints := make([]resolver.Endpoint, len(addrs))
257+
for i, a := range addrs {
258+
endpoints[i].Attributes = a.BalancerAttributes
259+
endpoints[i].Addresses = []resolver.Address{a}
260+
endpoints[i].Addresses[0].BalancerAttributes = nil
261+
}
256262
if err := b.child.UpdateClientConnState(balancer.ClientConnState{
257263
ResolverState: resolver.State{
264+
Endpoints: endpoints,
258265
Addresses: addrs,
259266
ServiceConfig: b.configRaw,
260267
Attributes: b.attrsWithClient,

xds/internal/balancer/clusterresolver/e2e_test/aggregate_cluster_test.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,6 @@ func setupDNS() (chan resolver.Target, chan struct{}, chan resolver.ResolveNowOp
123123
mr.ResolveNowCallback = func(opts resolver.ResolveNowOptions) { resolveNowCh <- opts }
124124

125125
dnsResolverBuilder := resolver.Get("dns")
126-
resolver.UnregisterForTesting("dns")
127126
resolver.Register(mr)
128127

129128
return targetCh, closeCh, resolveNowCh, mr, func() { resolver.Register(dnsResolverBuilder) }

xds/internal/balancer/clusterresolver/resource_resolver_dns.go

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -133,9 +133,21 @@ func (dr *dnsDiscoveryMechanism) UpdateState(state resolver.State) error {
133133
}
134134

135135
dr.mu.Lock()
136-
addrs := make([]string, len(state.Addresses))
137-
for i, a := range state.Addresses {
138-
addrs[i] = a.Addr
136+
var addrs []string
137+
if len(state.Endpoints) > 0 {
138+
// Assume 1 address per endpoint, which is how DNS is expected to
139+
// behave. The slice will grow as needed, however.
140+
addrs = make([]string, 0, len(state.Endpoints))
141+
for _, e := range state.Endpoints {
142+
for _, a := range e.Addresses {
143+
addrs = append(addrs, a.Addr)
144+
}
145+
}
146+
} else {
147+
addrs = make([]string, len(state.Addresses))
148+
for i, a := range state.Addresses {
149+
addrs[i] = a.Addr
150+
}
139151
}
140152
dr.addrs = addrs
141153
dr.updateReceived = true

0 commit comments

Comments
 (0)