From 582ef458c6d8174087877ee83bb514abc16650a5 Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Thu, 22 Jul 2021 16:12:30 -0700 Subject: [PATCH] cluster_resolver: move balancer config types into cluster_resolver package and unexport (#4607) --- .../balancer/cdsbalancer/cdsbalancer.go | 11 +- .../balancer/cdsbalancer/cdsbalancer_test.go | 7 +- .../clusterresolver/balancerconfig/type.go | 140 ----------- .../balancerconfig/type_test.go | 88 ------- .../clusterresolver/clusterresolver.go | 5 +- .../clusterresolver/clusterresolver_test.go | 5 +- .../balancer/clusterresolver/config.go | 120 +++++++++- .../balancer/clusterresolver/config_test.go | 81 ++++++- .../{balancerconfig => }/configbuilder.go | 42 ++-- .../configbuilder_test.go | 34 +-- .../balancer/clusterresolver/eds_impl_test.go | 29 ++- .../balancer/clusterresolver/priority_test.go | 7 +- .../clusterresolver/resource_resolver.go | 23 +- .../clusterresolver/resource_resolver_test.go | 225 +++++++++--------- .../balancer/weightedtarget/weightedtarget.go | 4 +- 15 files changed, 371 insertions(+), 450 deletions(-) delete mode 100644 xds/internal/balancer/clusterresolver/balancerconfig/type.go delete mode 100644 xds/internal/balancer/clusterresolver/balancerconfig/type_test.go rename xds/internal/balancer/clusterresolver/{balancerconfig => }/configbuilder.go (89%) rename xds/internal/balancer/clusterresolver/{balancerconfig => }/configbuilder_test.go (97%) diff --git a/xds/internal/balancer/cdsbalancer/cdsbalancer.go b/xds/internal/balancer/cdsbalancer/cdsbalancer.go index e52a34a7d29a..3ea14add9ebf 100644 --- a/xds/internal/balancer/cdsbalancer/cdsbalancer.go +++ b/xds/internal/balancer/cdsbalancer/cdsbalancer.go @@ -35,7 +35,6 @@ import ( "google.golang.org/grpc/resolver" "google.golang.org/grpc/serviceconfig" "google.golang.org/grpc/xds/internal/balancer/clusterresolver" - "google.golang.org/grpc/xds/internal/balancer/clusterresolver/balancerconfig" "google.golang.org/grpc/xds/internal/xdsclient" ) @@ -304,12 +303,12 @@ func (b *cdsBalancer) handleWatchUpdate(update clusterHandlerUpdate) { b.logger.Infof("Created child policy %p of type %s", b.childLB, clusterresolver.Name) } - dms := make([]balancerconfig.DiscoveryMechanism, len(update.updates)) + dms := make([]clusterresolver.DiscoveryMechanism, len(update.updates)) for i, cu := range update.updates { switch cu.ClusterType { case xdsclient.ClusterTypeEDS: - dms[i] = balancerconfig.DiscoveryMechanism{ - Type: balancerconfig.DiscoveryMechanismTypeEDS, + dms[i] = clusterresolver.DiscoveryMechanism{ + Type: clusterresolver.DiscoveryMechanismTypeEDS, Cluster: cu.ClusterName, EDSServiceName: cu.EDSServiceName, MaxConcurrentRequests: cu.MaxRequests, @@ -322,8 +321,8 @@ func (b *cdsBalancer) handleWatchUpdate(update clusterHandlerUpdate) { } case xdsclient.ClusterTypeLogicalDNS: - dms[i] = balancerconfig.DiscoveryMechanism{ - Type: balancerconfig.DiscoveryMechanismTypeLogicalDNS, + dms[i] = clusterresolver.DiscoveryMechanism{ + Type: clusterresolver.DiscoveryMechanismTypeLogicalDNS, DNSHostname: cu.DNSHostName, } default: diff --git a/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go b/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go index a4c6d40f7824..864af36857bc 100644 --- a/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go +++ b/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go @@ -36,7 +36,6 @@ import ( "google.golang.org/grpc/resolver" "google.golang.org/grpc/serviceconfig" "google.golang.org/grpc/xds/internal/balancer/clusterresolver" - "google.golang.org/grpc/xds/internal/balancer/clusterresolver/balancerconfig" xdstestutils "google.golang.org/grpc/xds/internal/testutils" "google.golang.org/grpc/xds/internal/testutils/fakeclient" "google.golang.org/grpc/xds/internal/xdsclient" @@ -198,8 +197,8 @@ func cdsCCS(cluster string, xdsC xdsclient.XDSClient) balancer.ClientConnState { // edsCCS is a helper function to construct a good update passed from the // cdsBalancer to the edsBalancer. func edsCCS(service string, countMax *uint32, enableLRS bool) balancer.ClientConnState { - discoveryMechanism := balancerconfig.DiscoveryMechanism{ - Type: balancerconfig.DiscoveryMechanismTypeEDS, + discoveryMechanism := clusterresolver.DiscoveryMechanism{ + Type: clusterresolver.DiscoveryMechanismTypeEDS, Cluster: service, MaxConcurrentRequests: countMax, } @@ -208,7 +207,7 @@ func edsCCS(service string, countMax *uint32, enableLRS bool) balancer.ClientCon } lbCfg := &clusterresolver.LBConfig{ - DiscoveryMechanisms: []balancerconfig.DiscoveryMechanism{discoveryMechanism}, + DiscoveryMechanisms: []clusterresolver.DiscoveryMechanism{discoveryMechanism}, } return balancer.ClientConnState{ diff --git a/xds/internal/balancer/clusterresolver/balancerconfig/type.go b/xds/internal/balancer/clusterresolver/balancerconfig/type.go deleted file mode 100644 index 3e47b8234e33..000000000000 --- a/xds/internal/balancer/clusterresolver/balancerconfig/type.go +++ /dev/null @@ -1,140 +0,0 @@ -/* - * - * Copyright 2021 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package balancerconfig - -import ( - "bytes" - "encoding/json" - "fmt" -) - -// DiscoveryMechanismType is the type of discovery mechanism. -type DiscoveryMechanismType int - -const ( - // DiscoveryMechanismTypeEDS is eds. - DiscoveryMechanismTypeEDS DiscoveryMechanismType = iota // `json:"EDS"` - // DiscoveryMechanismTypeLogicalDNS is DNS. - DiscoveryMechanismTypeLogicalDNS // `json:"LOGICAL_DNS"` -) - -// MarshalJSON marshals a DiscoveryMechanismType to a quoted json string. -// -// This is necessary to handle enum (as strings) from JSON. -// -// Note that this needs to be defined on the type not pointer, otherwise the -// variables of this type will marshal to int not string. -func (t DiscoveryMechanismType) MarshalJSON() ([]byte, error) { - buffer := bytes.NewBufferString(`"`) - switch t { - case DiscoveryMechanismTypeEDS: - buffer.WriteString("EDS") - case DiscoveryMechanismTypeLogicalDNS: - buffer.WriteString("LOGICAL_DNS") - } - buffer.WriteString(`"`) - return buffer.Bytes(), nil -} - -// UnmarshalJSON unmarshals a quoted json string to the DiscoveryMechanismType. -func (t *DiscoveryMechanismType) UnmarshalJSON(b []byte) error { - var s string - err := json.Unmarshal(b, &s) - if err != nil { - return err - } - switch s { - case "EDS": - *t = DiscoveryMechanismTypeEDS - case "LOGICAL_DNS": - *t = DiscoveryMechanismTypeLogicalDNS - default: - return fmt.Errorf("unable to unmarshal string %q to type DiscoveryMechanismType", s) - } - return nil -} - -// DiscoveryMechanism is the discovery mechanism, can be either EDS or DNS. -// -// For DNS, the ClientConn target will be used for name resolution. -// -// For EDS, if EDSServiceName is not empty, it will be used for watching. If -// EDSServiceName is empty, Cluster will be used. -type DiscoveryMechanism struct { - // Cluster is the cluster name. - Cluster string `json:"cluster,omitempty"` - // LoadReportingServerName is the LRS server to send load reports to. If - // not present, load reporting will be disabled. If set to the empty string, - // load reporting will be sent to the same server that we obtained CDS data - // from. - LoadReportingServerName *string `json:"lrsLoadReportingServerName,omitempty"` - // MaxConcurrentRequests is the maximum number of outstanding requests can - // be made to the upstream cluster. Default is 1024. - MaxConcurrentRequests *uint32 `json:"maxConcurrentRequests,omitempty"` - // Type is the discovery mechanism type. - Type DiscoveryMechanismType `json:"type,omitempty"` - // EDSServiceName is the EDS service name, as returned in CDS. May be unset - // if not specified in CDS. For type EDS only. - // - // This is used for EDS watch if set. If unset, Cluster is used for EDS - // watch. - EDSServiceName string `json:"edsServiceName,omitempty"` - // DNSHostname is the DNS name to resolve in "host:port" form. For type - // LOGICAL_DNS only. - DNSHostname string `json:"dnsHostname,omitempty"` -} - -// Equal returns whether the DiscoveryMechanism is the same with the parameter. -func (dm DiscoveryMechanism) Equal(b DiscoveryMechanism) bool { - switch { - case dm.Cluster != b.Cluster: - return false - case !equalStringP(dm.LoadReportingServerName, b.LoadReportingServerName): - return false - case !equalUint32P(dm.MaxConcurrentRequests, b.MaxConcurrentRequests): - return false - case dm.Type != b.Type: - return false - case dm.EDSServiceName != b.EDSServiceName: - return false - case dm.DNSHostname != b.DNSHostname: - return false - } - return true -} - -func equalStringP(a, b *string) bool { - if a == nil && b == nil { - return true - } - if a == nil || b == nil { - return false - } - return *a == *b -} - -func equalUint32P(a, b *uint32) bool { - if a == nil && b == nil { - return true - } - if a == nil || b == nil { - return false - } - return *a == *b -} diff --git a/xds/internal/balancer/clusterresolver/balancerconfig/type_test.go b/xds/internal/balancer/clusterresolver/balancerconfig/type_test.go deleted file mode 100644 index 1adbc7c5d230..000000000000 --- a/xds/internal/balancer/clusterresolver/balancerconfig/type_test.go +++ /dev/null @@ -1,88 +0,0 @@ -/* - * - * Copyright 2021 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package balancerconfig - -import ( - "encoding/json" - "testing" - - "github.com/google/go-cmp/cmp" -) - -func TestDiscoveryMechanismTypeMarshalJSON(t *testing.T) { - tests := []struct { - name string - typ DiscoveryMechanismType - want string - }{ - { - name: "eds", - typ: DiscoveryMechanismTypeEDS, - want: `"EDS"`, - }, - { - name: "dns", - typ: DiscoveryMechanismTypeLogicalDNS, - want: `"LOGICAL_DNS"`, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - if got, err := json.Marshal(tt.typ); err != nil || string(got) != tt.want { - t.Fatalf("DiscoveryMechanismTypeEDS.MarshalJSON() = (%v, %v), want (%s, nil)", string(got), err, tt.want) - } - }) - } -} -func TestDiscoveryMechanismTypeUnmarshalJSON(t *testing.T) { - tests := []struct { - name string - js string - want DiscoveryMechanismType - wantErr bool - }{ - { - name: "eds", - js: `"EDS"`, - want: DiscoveryMechanismTypeEDS, - }, - { - name: "dns", - js: `"LOGICAL_DNS"`, - want: DiscoveryMechanismTypeLogicalDNS, - }, - { - name: "error", - js: `"1234"`, - wantErr: true, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - var got DiscoveryMechanismType - err := json.Unmarshal([]byte(tt.js), &got) - if (err != nil) != tt.wantErr { - t.Fatalf("DiscoveryMechanismTypeEDS.UnmarshalJSON() error = %v, wantErr %v", err, tt.wantErr) - } - if diff := cmp.Diff(got, tt.want); diff != "" { - t.Fatalf("DiscoveryMechanismTypeEDS.UnmarshalJSON() got unexpected output, diff (-got +want): %v", diff) - } - }) - } -} diff --git a/xds/internal/balancer/clusterresolver/clusterresolver.go b/xds/internal/balancer/clusterresolver/clusterresolver.go index cb8176d16448..b48e3e97b716 100644 --- a/xds/internal/balancer/clusterresolver/clusterresolver.go +++ b/xds/internal/balancer/clusterresolver/clusterresolver.go @@ -34,7 +34,6 @@ import ( "google.golang.org/grpc/internal/pretty" "google.golang.org/grpc/resolver" "google.golang.org/grpc/serviceconfig" - "google.golang.org/grpc/xds/internal/balancer/clusterresolver/balancerconfig" "google.golang.org/grpc/xds/internal/balancer/priority" "google.golang.org/grpc/xds/internal/xdsclient" ) @@ -138,7 +137,7 @@ type clusterResolverBalancer struct { attrsWithClient *attributes.Attributes // Attributes with xdsClient attached to be passed to the child policies. child balancer.Balancer - priorities []balancerconfig.PriorityConfig + priorities []priorityConfig watchUpdateReceived bool } @@ -210,7 +209,7 @@ func (b *clusterResolverBalancer) updateChildConfig() error { b.child = newChildBalancer(b.priorityBuilder, b.cc, b.bOpts) } - childCfgBytes, addrs, err := balancerconfig.BuildPriorityConfigJSON(b.priorities, b.config.EndpointPickingPolicy) + childCfgBytes, addrs, err := buildPriorityConfigJSON(b.priorities, b.config.EndpointPickingPolicy) if err != nil { return fmt.Errorf("failed to build priority balancer config: %v", err) } diff --git a/xds/internal/balancer/clusterresolver/clusterresolver_test.go b/xds/internal/balancer/clusterresolver/clusterresolver_test.go index 7e2df25e0535..e7d0cd347cb7 100644 --- a/xds/internal/balancer/clusterresolver/clusterresolver_test.go +++ b/xds/internal/balancer/clusterresolver/clusterresolver_test.go @@ -33,7 +33,6 @@ import ( "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/resolver" "google.golang.org/grpc/xds/internal" - "google.golang.org/grpc/xds/internal/balancer/clusterresolver/balancerconfig" "google.golang.org/grpc/xds/internal/testutils/fakeclient" "google.golang.org/grpc/xds/internal/xdsclient" @@ -492,9 +491,9 @@ func (s) TestClientWatchEDS(t *testing.T) { func newLBConfigWithOneEDS(edsServiceName string) *LBConfig { return &LBConfig{ - DiscoveryMechanisms: []balancerconfig.DiscoveryMechanism{{ + DiscoveryMechanisms: []DiscoveryMechanism{{ Cluster: testClusterName, - Type: balancerconfig.DiscoveryMechanismTypeEDS, + Type: DiscoveryMechanismTypeEDS, EDSServiceName: edsServiceName, }}, } diff --git a/xds/internal/balancer/clusterresolver/config.go b/xds/internal/balancer/clusterresolver/config.go index 043c834399e6..3bcb432b4091 100644 --- a/xds/internal/balancer/clusterresolver/config.go +++ b/xds/internal/balancer/clusterresolver/config.go @@ -18,13 +18,129 @@ package clusterresolver import ( + "bytes" "encoding/json" + "fmt" internalserviceconfig "google.golang.org/grpc/internal/serviceconfig" "google.golang.org/grpc/serviceconfig" - "google.golang.org/grpc/xds/internal/balancer/clusterresolver/balancerconfig" ) +// DiscoveryMechanismType is the type of discovery mechanism. +type DiscoveryMechanismType int + +const ( + // DiscoveryMechanismTypeEDS is eds. + DiscoveryMechanismTypeEDS DiscoveryMechanismType = iota // `json:"EDS"` + // DiscoveryMechanismTypeLogicalDNS is DNS. + DiscoveryMechanismTypeLogicalDNS // `json:"LOGICAL_DNS"` +) + +// MarshalJSON marshals a DiscoveryMechanismType to a quoted json string. +// +// This is necessary to handle enum (as strings) from JSON. +// +// Note that this needs to be defined on the type not pointer, otherwise the +// variables of this type will marshal to int not string. +func (t DiscoveryMechanismType) MarshalJSON() ([]byte, error) { + buffer := bytes.NewBufferString(`"`) + switch t { + case DiscoveryMechanismTypeEDS: + buffer.WriteString("EDS") + case DiscoveryMechanismTypeLogicalDNS: + buffer.WriteString("LOGICAL_DNS") + } + buffer.WriteString(`"`) + return buffer.Bytes(), nil +} + +// UnmarshalJSON unmarshals a quoted json string to the DiscoveryMechanismType. +func (t *DiscoveryMechanismType) UnmarshalJSON(b []byte) error { + var s string + err := json.Unmarshal(b, &s) + if err != nil { + return err + } + switch s { + case "EDS": + *t = DiscoveryMechanismTypeEDS + case "LOGICAL_DNS": + *t = DiscoveryMechanismTypeLogicalDNS + default: + return fmt.Errorf("unable to unmarshal string %q to type DiscoveryMechanismType", s) + } + return nil +} + +// DiscoveryMechanism is the discovery mechanism, can be either EDS or DNS. +// +// For DNS, the ClientConn target will be used for name resolution. +// +// For EDS, if EDSServiceName is not empty, it will be used for watching. If +// EDSServiceName is empty, Cluster will be used. +type DiscoveryMechanism struct { + // Cluster is the cluster name. + Cluster string `json:"cluster,omitempty"` + // LoadReportingServerName is the LRS server to send load reports to. If + // not present, load reporting will be disabled. If set to the empty string, + // load reporting will be sent to the same server that we obtained CDS data + // from. + LoadReportingServerName *string `json:"lrsLoadReportingServerName,omitempty"` + // MaxConcurrentRequests is the maximum number of outstanding requests can + // be made to the upstream cluster. Default is 1024. + MaxConcurrentRequests *uint32 `json:"maxConcurrentRequests,omitempty"` + // Type is the discovery mechanism type. + Type DiscoveryMechanismType `json:"type,omitempty"` + // EDSServiceName is the EDS service name, as returned in CDS. May be unset + // if not specified in CDS. For type EDS only. + // + // This is used for EDS watch if set. If unset, Cluster is used for EDS + // watch. + EDSServiceName string `json:"edsServiceName,omitempty"` + // DNSHostname is the DNS name to resolve in "host:port" form. For type + // LOGICAL_DNS only. + DNSHostname string `json:"dnsHostname,omitempty"` +} + +// Equal returns whether the DiscoveryMechanism is the same with the parameter. +func (dm DiscoveryMechanism) Equal(b DiscoveryMechanism) bool { + switch { + case dm.Cluster != b.Cluster: + return false + case !equalStringP(dm.LoadReportingServerName, b.LoadReportingServerName): + return false + case !equalUint32P(dm.MaxConcurrentRequests, b.MaxConcurrentRequests): + return false + case dm.Type != b.Type: + return false + case dm.EDSServiceName != b.EDSServiceName: + return false + case dm.DNSHostname != b.DNSHostname: + return false + } + return true +} + +func equalStringP(a, b *string) bool { + if a == nil && b == nil { + return true + } + if a == nil || b == nil { + return false + } + return *a == *b +} + +func equalUint32P(a, b *uint32) bool { + if a == nil && b == nil { + return true + } + if a == nil || b == nil { + return false + } + return *a == *b +} + // LBConfig is the config for cluster resolver balancer. type LBConfig struct { serviceconfig.LoadBalancingConfig `json:"-"` @@ -32,7 +148,7 @@ type LBConfig struct { // // Must have at least one element. Results from each discovery mechanism are // concatenated together in successive priorities. - DiscoveryMechanisms []balancerconfig.DiscoveryMechanism `json:"discoveryMechanisms,omitempty"` + DiscoveryMechanisms []DiscoveryMechanism `json:"discoveryMechanisms,omitempty"` // LocalityPickingPolicy is policy for locality picking. // diff --git a/xds/internal/balancer/clusterresolver/config_test.go b/xds/internal/balancer/clusterresolver/config_test.go index 1333692b7fca..77b14deb6abe 100644 --- a/xds/internal/balancer/clusterresolver/config_test.go +++ b/xds/internal/balancer/clusterresolver/config_test.go @@ -21,13 +21,75 @@ package clusterresolver import ( + "encoding/json" "testing" "github.com/google/go-cmp/cmp" internalserviceconfig "google.golang.org/grpc/internal/serviceconfig" - "google.golang.org/grpc/xds/internal/balancer/clusterresolver/balancerconfig" ) +func TestDiscoveryMechanismTypeMarshalJSON(t *testing.T) { + tests := []struct { + name string + typ DiscoveryMechanismType + want string + }{ + { + name: "eds", + typ: DiscoveryMechanismTypeEDS, + want: `"EDS"`, + }, + { + name: "dns", + typ: DiscoveryMechanismTypeLogicalDNS, + want: `"LOGICAL_DNS"`, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got, err := json.Marshal(tt.typ); err != nil || string(got) != tt.want { + t.Fatalf("DiscoveryMechanismTypeEDS.MarshalJSON() = (%v, %v), want (%s, nil)", string(got), err, tt.want) + } + }) + } +} +func TestDiscoveryMechanismTypeUnmarshalJSON(t *testing.T) { + tests := []struct { + name string + js string + want DiscoveryMechanismType + wantErr bool + }{ + { + name: "eds", + js: `"EDS"`, + want: DiscoveryMechanismTypeEDS, + }, + { + name: "dns", + js: `"LOGICAL_DNS"`, + want: DiscoveryMechanismTypeLogicalDNS, + }, + { + name: "error", + js: `"1234"`, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var got DiscoveryMechanismType + err := json.Unmarshal([]byte(tt.js), &got) + if (err != nil) != tt.wantErr { + t.Fatalf("DiscoveryMechanismTypeEDS.UnmarshalJSON() error = %v, wantErr %v", err, tt.wantErr) + } + if diff := cmp.Diff(got, tt.want); diff != "" { + t.Fatalf("DiscoveryMechanismTypeEDS.UnmarshalJSON() got unexpected output, diff (-got +want): %v", diff) + } + }) + } +} + const ( testJSONConfig1 = `{ "discoveryMechanisms": [{ @@ -60,9 +122,6 @@ const ( "localityPickingPolicy":[{"pick_first":{}}], "endpointPickingPolicy":[{"pick_first":{}}] }` - - testLRSServer = "test-lrs-server" - testMaxRequests = 314 ) func TestParseConfig(t *testing.T) { @@ -82,12 +141,12 @@ func TestParseConfig(t *testing.T) { name: "OK with one discovery mechanism", js: testJSONConfig1, want: &LBConfig{ - DiscoveryMechanisms: []balancerconfig.DiscoveryMechanism{ + DiscoveryMechanisms: []DiscoveryMechanism{ { Cluster: testClusterName, LoadReportingServerName: newString(testLRSServer), MaxConcurrentRequests: newUint32(testMaxRequests), - Type: balancerconfig.DiscoveryMechanismTypeEDS, + Type: DiscoveryMechanismTypeEDS, EDSServiceName: testEDSServcie, }, }, @@ -100,16 +159,16 @@ func TestParseConfig(t *testing.T) { name: "OK with multiple discovery mechanisms", js: testJSONConfig2, want: &LBConfig{ - DiscoveryMechanisms: []balancerconfig.DiscoveryMechanism{ + DiscoveryMechanisms: []DiscoveryMechanism{ { Cluster: testClusterName, LoadReportingServerName: newString(testLRSServer), MaxConcurrentRequests: newUint32(testMaxRequests), - Type: balancerconfig.DiscoveryMechanismTypeEDS, + Type: DiscoveryMechanismTypeEDS, EDSServiceName: testEDSServcie, }, { - Type: balancerconfig.DiscoveryMechanismTypeLogicalDNS, + Type: DiscoveryMechanismTypeLogicalDNS, }, }, LocalityPickingPolicy: nil, @@ -121,12 +180,12 @@ func TestParseConfig(t *testing.T) { name: "OK with picking policy override", js: testJSONConfig3, want: &LBConfig{ - DiscoveryMechanisms: []balancerconfig.DiscoveryMechanism{ + DiscoveryMechanisms: []DiscoveryMechanism{ { Cluster: testClusterName, LoadReportingServerName: newString(testLRSServer), MaxConcurrentRequests: newUint32(testMaxRequests), - Type: balancerconfig.DiscoveryMechanismTypeEDS, + Type: DiscoveryMechanismTypeEDS, EDSServiceName: testEDSServcie, }, }, diff --git a/xds/internal/balancer/clusterresolver/balancerconfig/configbuilder.go b/xds/internal/balancer/clusterresolver/configbuilder.go similarity index 89% rename from xds/internal/balancer/clusterresolver/balancerconfig/configbuilder.go rename to xds/internal/balancer/clusterresolver/configbuilder.go index 4f96cc61f1ef..dfbdc1e2d671 100644 --- a/xds/internal/balancer/clusterresolver/balancerconfig/configbuilder.go +++ b/xds/internal/balancer/clusterresolver/configbuilder.go @@ -16,17 +16,7 @@ * */ -// Package balancerconfig contains utility functions to build balancer config. -// The built config will generate a tree of balancers with priority, -// cluster_impl, weighted_target, lrs, and roundrobin. -// -// This is in a subpackage of cluster_resolver so that it can be used by the EDS -// balancer. Eventually we will delete the EDS balancer, and replace it with -// cluster_resolver, then we can move the functions to package cluster_resolver, -// and unexport them. -// -// TODO: move and unexport. Read above. -package balancerconfig +package clusterresolver import ( "encoding/json" @@ -47,22 +37,22 @@ import ( const million = 1000000 -// PriorityConfig is config for one priority. For example, if there an EDS and a -// DNS, the priority list will be [priorityConfig{EDS}, PriorityConfig{DNS}]. +// priorityConfig is config for one priority. For example, if there an EDS and a +// DNS, the priority list will be [priorityConfig{EDS}, priorityConfig{DNS}]. // -// Each PriorityConfig corresponds to one discovery mechanism from the LBConfig +// Each priorityConfig corresponds to one discovery mechanism from the LBConfig // generated by the CDS balancer. The CDS balancer resolves the cluster name to // an ordered list of discovery mechanisms (if the top cluster is an aggregated // cluster), one for each underlying cluster. -type PriorityConfig struct { - Mechanism DiscoveryMechanism - // EDSResp is set only if type is EDS. - EDSResp xdsclient.EndpointsUpdate - // Addresses is set only if type is DNS. - Addresses []string +type priorityConfig struct { + mechanism DiscoveryMechanism + // edsResp is set only if type is EDS. + edsResp xdsclient.EndpointsUpdate + // addresses is set only if type is DNS. + addresses []string } -// BuildPriorityConfigJSON builds balancer config for the passed in +// buildPriorityConfigJSON builds balancer config for the passed in // priorities. // // The built tree of balancers (see test for the output struct). @@ -94,7 +84,7 @@ type PriorityConfig struct { // // TODO: support setting locality picking policy, and add a parameter for // locality picking policy. -func BuildPriorityConfigJSON(priorities []PriorityConfig, endpointPickingPolicy *internalserviceconfig.BalancerConfig) ([]byte, []resolver.Address, error) { +func buildPriorityConfigJSON(priorities []priorityConfig, endpointPickingPolicy *internalserviceconfig.BalancerConfig) ([]byte, []resolver.Address, error) { pc, addrs := buildPriorityConfig(priorities, endpointPickingPolicy) ret, err := json.Marshal(pc) if err != nil { @@ -103,15 +93,15 @@ func BuildPriorityConfigJSON(priorities []PriorityConfig, endpointPickingPolicy return ret, addrs, nil } -func buildPriorityConfig(priorities []PriorityConfig, endpointPickingPolicy *internalserviceconfig.BalancerConfig) (*priority.LBConfig, []resolver.Address) { +func buildPriorityConfig(priorities []priorityConfig, endpointPickingPolicy *internalserviceconfig.BalancerConfig) (*priority.LBConfig, []resolver.Address) { var ( retConfig = &priority.LBConfig{Children: make(map[string]*priority.Child)} retAddrs []resolver.Address ) for i, p := range priorities { - switch p.Mechanism.Type { + switch p.mechanism.Type { case DiscoveryMechanismTypeEDS: - names, configs, addrs := buildClusterImplConfigForEDS(i, p.EDSResp, p.Mechanism, endpointPickingPolicy) + names, configs, addrs := buildClusterImplConfigForEDS(i, p.edsResp, p.mechanism, endpointPickingPolicy) retConfig.Priorities = append(retConfig.Priorities, names...) for n, c := range configs { retConfig.Children[n] = &priority.Child{ @@ -122,7 +112,7 @@ func buildPriorityConfig(priorities []PriorityConfig, endpointPickingPolicy *int } retAddrs = append(retAddrs, addrs...) case DiscoveryMechanismTypeLogicalDNS: - name, config, addrs := buildClusterImplConfigForDNS(i, p.Addresses) + name, config, addrs := buildClusterImplConfigForDNS(i, p.addresses) retConfig.Priorities = append(retConfig.Priorities, name) retConfig.Children[name] = &priority.Child{ Config: &internalserviceconfig.BalancerConfig{Name: clusterimpl.Name, Config: config}, diff --git a/xds/internal/balancer/clusterresolver/balancerconfig/configbuilder_test.go b/xds/internal/balancer/clusterresolver/configbuilder_test.go similarity index 97% rename from xds/internal/balancer/clusterresolver/balancerconfig/configbuilder_test.go rename to xds/internal/balancer/clusterresolver/configbuilder_test.go index 95ded6019c57..d8f17d053aae 100644 --- a/xds/internal/balancer/clusterresolver/balancerconfig/configbuilder_test.go +++ b/xds/internal/balancer/clusterresolver/configbuilder_test.go @@ -1,3 +1,5 @@ +// +build go1.12 + /* * * Copyright 2021 gRPC authors. @@ -16,7 +18,7 @@ * */ -package balancerconfig +package clusterresolver import ( "bytes" @@ -41,10 +43,8 @@ import ( ) const ( - testClusterName = "test-cluster-name" testLRSServer = "test-lrs-server" testMaxRequests = 314 - testEDSServcie = "test-eds-service-name" testEDSServiceName = "service-name-from-parent" testDropCategory = "test-drops" testDropOverMillion = 1 @@ -123,16 +123,16 @@ func init() { // TestBuildPriorityConfigJSON is a sanity check that the built balancer config // can be parsed. The behavior test is covered by TestBuildPriorityConfig. func TestBuildPriorityConfigJSON(t *testing.T) { - gotConfig, _, err := BuildPriorityConfigJSON([]PriorityConfig{ + gotConfig, _, err := buildPriorityConfigJSON([]priorityConfig{ { - Mechanism: DiscoveryMechanism{ + mechanism: DiscoveryMechanism{ Cluster: testClusterName, LoadReportingServerName: newString(testLRSServer), MaxConcurrentRequests: newUint32(testMaxRequests), Type: DiscoveryMechanismTypeEDS, EDSServiceName: testEDSServiceName, }, - EDSResp: xdsclient.EndpointsUpdate{ + edsResp: xdsclient.EndpointsUpdate{ Drops: []xdsclient.OverloadDropConfig{ { Category: testDropCategory, @@ -149,10 +149,10 @@ func TestBuildPriorityConfigJSON(t *testing.T) { }, }, { - Mechanism: DiscoveryMechanism{ + mechanism: DiscoveryMechanism{ Type: DiscoveryMechanismTypeLogicalDNS, }, - Addresses: testAddressStrs[4], + addresses: testAddressStrs[4], }, }, nil) if err != nil { @@ -173,16 +173,16 @@ func TestBuildPriorityConfigJSON(t *testing.T) { } func TestBuildPriorityConfig(t *testing.T) { - gotConfig, gotAddrs := buildPriorityConfig([]PriorityConfig{ + gotConfig, gotAddrs := buildPriorityConfig([]priorityConfig{ { - Mechanism: DiscoveryMechanism{ + mechanism: DiscoveryMechanism{ Cluster: testClusterName, LoadReportingServerName: newString(testLRSServer), MaxConcurrentRequests: newUint32(testMaxRequests), Type: DiscoveryMechanismTypeEDS, EDSServiceName: testEDSServiceName, }, - EDSResp: xdsclient.EndpointsUpdate{ + edsResp: xdsclient.EndpointsUpdate{ Drops: []xdsclient.OverloadDropConfig{ { Category: testDropCategory, @@ -199,10 +199,10 @@ func TestBuildPriorityConfig(t *testing.T) { }, }, { - Mechanism: DiscoveryMechanism{ + mechanism: DiscoveryMechanism{ Type: DiscoveryMechanismTypeLogicalDNS, }, - Addresses: testAddressStrs[4], + addresses: testAddressStrs[4], }, }, nil) @@ -723,14 +723,6 @@ func TestLocalitiesToWeightedTarget(t *testing.T) { } } -func newString(s string) *string { - return &s -} - -func newUint32(i uint32) *uint32 { - return &i -} - func assertString(f func() (string, error)) string { s, err := f() if err != nil { diff --git a/xds/internal/balancer/clusterresolver/eds_impl_test.go b/xds/internal/balancer/clusterresolver/eds_impl_test.go index bf7e7f6c421c..f565c5249870 100644 --- a/xds/internal/balancer/clusterresolver/eds_impl_test.go +++ b/xds/internal/balancer/clusterresolver/eds_impl_test.go @@ -35,7 +35,6 @@ import ( "google.golang.org/grpc/resolver" "google.golang.org/grpc/xds/internal/balancer/balancergroup" "google.golang.org/grpc/xds/internal/balancer/clusterimpl" - "google.golang.org/grpc/xds/internal/balancer/clusterresolver/balancerconfig" "google.golang.org/grpc/xds/internal/balancer/priority" "google.golang.org/grpc/xds/internal/balancer/weightedtarget" "google.golang.org/grpc/xds/internal/testutils" @@ -74,9 +73,9 @@ func setupTestEDS(t *testing.T, initChild *internalserviceconfig.BalancerConfig) if err := edsb.UpdateClientConnState(balancer.ClientConnState{ ResolverState: xdsclient.SetClient(resolver.State{}, xdsC), BalancerConfig: &LBConfig{ - DiscoveryMechanisms: []balancerconfig.DiscoveryMechanism{{ + DiscoveryMechanisms: []DiscoveryMechanism{{ Cluster: testClusterName, - Type: balancerconfig.DiscoveryMechanismTypeEDS, + Type: DiscoveryMechanismTypeEDS, }}, EndpointPickingPolicy: initChild, }, @@ -495,9 +494,9 @@ func (s) TestEDS_UpdateSubBalancerName(t *testing.T) { t.Logf("update sub-balancer to stub-balancer") if err := edsb.UpdateClientConnState(balancer.ClientConnState{ BalancerConfig: &LBConfig{ - DiscoveryMechanisms: []balancerconfig.DiscoveryMechanism{{ + DiscoveryMechanisms: []DiscoveryMechanism{{ Cluster: testClusterName, - Type: balancerconfig.DiscoveryMechanismTypeEDS, + Type: DiscoveryMechanismTypeEDS, }}, EndpointPickingPolicy: &internalserviceconfig.BalancerConfig{ Name: balancerName, @@ -525,9 +524,9 @@ func (s) TestEDS_UpdateSubBalancerName(t *testing.T) { t.Logf("update sub-balancer to round-robin") if err := edsb.UpdateClientConnState(balancer.ClientConnState{ BalancerConfig: &LBConfig{ - DiscoveryMechanisms: []balancerconfig.DiscoveryMechanism{{ + DiscoveryMechanisms: []DiscoveryMechanism{{ Cluster: testClusterName, - Type: balancerconfig.DiscoveryMechanismTypeEDS, + Type: DiscoveryMechanismTypeEDS, }}, EndpointPickingPolicy: &internalserviceconfig.BalancerConfig{ Name: roundrobin.Name, @@ -555,9 +554,9 @@ func (s) TestEDS_UpdateSubBalancerName(t *testing.T) { t.Logf("update sub-balancer to stub-balancer") if err := edsb.UpdateClientConnState(balancer.ClientConnState{ BalancerConfig: &LBConfig{ - DiscoveryMechanisms: []balancerconfig.DiscoveryMechanism{{ + DiscoveryMechanisms: []DiscoveryMechanism{{ Cluster: testClusterName, - Type: balancerconfig.DiscoveryMechanismTypeEDS, + Type: DiscoveryMechanismTypeEDS, }}, EndpointPickingPolicy: &internalserviceconfig.BalancerConfig{ Name: balancerName, @@ -588,9 +587,9 @@ func (s) TestEDS_UpdateSubBalancerName(t *testing.T) { t.Logf("update sub-balancer to round-robin") if err := edsb.UpdateClientConnState(balancer.ClientConnState{ BalancerConfig: &LBConfig{ - DiscoveryMechanisms: []balancerconfig.DiscoveryMechanism{{ + DiscoveryMechanisms: []DiscoveryMechanism{{ Cluster: testClusterName, - Type: balancerconfig.DiscoveryMechanismTypeEDS, + Type: DiscoveryMechanismTypeEDS, }}, EndpointPickingPolicy: &internalserviceconfig.BalancerConfig{ Name: roundrobin.Name, @@ -623,10 +622,10 @@ func (s) TestEDS_CircuitBreaking(t *testing.T) { var maxRequests uint32 = 50 if err := edsb.UpdateClientConnState(balancer.ClientConnState{ BalancerConfig: &LBConfig{ - DiscoveryMechanisms: []balancerconfig.DiscoveryMechanism{{ + DiscoveryMechanisms: []DiscoveryMechanism{{ Cluster: testClusterName, MaxConcurrentRequests: &maxRequests, - Type: balancerconfig.DiscoveryMechanismTypeEDS, + Type: DiscoveryMechanismTypeEDS, }}, EndpointPickingPolicy: &internalserviceconfig.BalancerConfig{ Name: roundrobin.Name, @@ -689,10 +688,10 @@ func (s) TestEDS_CircuitBreaking(t *testing.T) { var maxRequests2 uint32 = 10 if err := edsb.UpdateClientConnState(balancer.ClientConnState{ BalancerConfig: &LBConfig{ - DiscoveryMechanisms: []balancerconfig.DiscoveryMechanism{{ + DiscoveryMechanisms: []DiscoveryMechanism{{ Cluster: testClusterName, MaxConcurrentRequests: &maxRequests2, - Type: balancerconfig.DiscoveryMechanismTypeEDS, + Type: DiscoveryMechanismTypeEDS, }}, EndpointPickingPolicy: &internalserviceconfig.BalancerConfig{ Name: roundrobin.Name, diff --git a/xds/internal/balancer/clusterresolver/priority_test.go b/xds/internal/balancer/clusterresolver/priority_test.go index b2935be0c362..2e0e9fd5d2ff 100644 --- a/xds/internal/balancer/clusterresolver/priority_test.go +++ b/xds/internal/balancer/clusterresolver/priority_test.go @@ -29,7 +29,6 @@ import ( "google.golang.org/grpc/balancer" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/resolver" - "google.golang.org/grpc/xds/internal/balancer/clusterresolver/balancerconfig" "google.golang.org/grpc/xds/internal/balancer/priority" "google.golang.org/grpc/xds/internal/testutils" ) @@ -727,13 +726,13 @@ func (s) TestFallbackToDNS(t *testing.T) { if err := edsb.UpdateClientConnState(balancer.ClientConnState{ BalancerConfig: &LBConfig{ - DiscoveryMechanisms: []balancerconfig.DiscoveryMechanism{ + DiscoveryMechanisms: []DiscoveryMechanism{ { - Type: balancerconfig.DiscoveryMechanismTypeEDS, + Type: DiscoveryMechanismTypeEDS, Cluster: testClusterName, }, { - Type: balancerconfig.DiscoveryMechanismTypeLogicalDNS, + Type: DiscoveryMechanismTypeLogicalDNS, DNSHostname: testDNSTarget, }, }, diff --git a/xds/internal/balancer/clusterresolver/resource_resolver.go b/xds/internal/balancer/clusterresolver/resource_resolver.go index e68d77d3efe9..2125bd2326f2 100644 --- a/xds/internal/balancer/clusterresolver/resource_resolver.go +++ b/xds/internal/balancer/clusterresolver/resource_resolver.go @@ -21,14 +21,13 @@ package clusterresolver import ( "sync" - "google.golang.org/grpc/xds/internal/balancer/clusterresolver/balancerconfig" "google.golang.org/grpc/xds/internal/xdsclient" ) // resourceUpdate is a combined update from all the resources, in the order of // priority. For example, it can be {EDS, EDS, DNS}. type resourceUpdate struct { - priorities []balancerconfig.PriorityConfig + priorities []priorityConfig err error } @@ -43,7 +42,7 @@ type discoveryMechanism interface { // mechanisms, both for the same EDS resource, but has different circuit // breaking config. type discoveryMechanismKey struct { - typ balancerconfig.DiscoveryMechanismType + typ DiscoveryMechanismType name string } @@ -52,7 +51,7 @@ type discoveryMechanismKey struct { // mechanism for fields like circuit breaking, LRS etc when generating the // balancer config. type resolverMechanismTuple struct { - dm balancerconfig.DiscoveryMechanism + dm DiscoveryMechanism dmKey discoveryMechanismKey r discoveryMechanism } @@ -63,7 +62,7 @@ type resourceResolver struct { // mu protects the slice and map, and content of the resolvers in the slice. mu sync.Mutex - mechanisms []balancerconfig.DiscoveryMechanism + mechanisms []DiscoveryMechanism children []resolverMechanismTuple childrenMap map[discoveryMechanismKey]discoveryMechanism } @@ -76,7 +75,7 @@ func newResourceResolver(parent *clusterResolverBalancer) *resourceResolver { } } -func equalDiscoveryMechanisms(a, b []balancerconfig.DiscoveryMechanism) bool { +func equalDiscoveryMechanisms(a, b []DiscoveryMechanism) bool { if len(a) != len(b) { return false } @@ -89,7 +88,7 @@ func equalDiscoveryMechanisms(a, b []balancerconfig.DiscoveryMechanism) bool { return true } -func (rr *resourceResolver) updateMechanisms(mechanisms []balancerconfig.DiscoveryMechanism) { +func (rr *resourceResolver) updateMechanisms(mechanisms []DiscoveryMechanism) { rr.mu.Lock() defer rr.mu.Unlock() if equalDiscoveryMechanisms(rr.mechanisms, mechanisms) { @@ -102,7 +101,7 @@ func (rr *resourceResolver) updateMechanisms(mechanisms []balancerconfig.Discove // Start one watch for each new discover mechanism {type+resource_name}. for i, dm := range mechanisms { switch dm.Type { - case balancerconfig.DiscoveryMechanismTypeEDS: + case DiscoveryMechanismTypeEDS: // If EDSServiceName is not set, use the cluster name as EDS service // name to watch. nameToWatch := dm.EDSServiceName @@ -118,7 +117,7 @@ func (rr *resourceResolver) updateMechanisms(mechanisms []balancerconfig.Discove rr.childrenMap[dmKey] = r } rr.children[i] = resolverMechanismTuple{dm: dm, dmKey: dmKey, r: r} - case balancerconfig.DiscoveryMechanismTypeLogicalDNS: + case DiscoveryMechanismTypeLogicalDNS: // Name to resolve in DNS is the hostname, not the ClientConn // target. dmKey := discoveryMechanismKey{typ: dm.Type, name: dm.DNSHostname} @@ -172,7 +171,7 @@ func (rr *resourceResolver) stop() { // // caller must hold rr.mu. func (rr *resourceResolver) generate() { - var ret []balancerconfig.PriorityConfig + var ret []priorityConfig for _, rDM := range rr.children { r, ok := rr.childrenMap[rDM.dmKey] if !ok { @@ -188,9 +187,9 @@ func (rr *resourceResolver) generate() { } switch uu := u.(type) { case xdsclient.EndpointsUpdate: - ret = append(ret, balancerconfig.PriorityConfig{Mechanism: rDM.dm, EDSResp: uu}) + ret = append(ret, priorityConfig{mechanism: rDM.dm, edsResp: uu}) case []string: - ret = append(ret, balancerconfig.PriorityConfig{Mechanism: rDM.dm, Addresses: uu}) + ret = append(ret, priorityConfig{mechanism: rDM.dm, addresses: uu}) } } select { diff --git a/xds/internal/balancer/clusterresolver/resource_resolver_test.go b/xds/internal/balancer/clusterresolver/resource_resolver_test.go index 621ca2a127c8..efdb4d58da69 100644 --- a/xds/internal/balancer/clusterresolver/resource_resolver_test.go +++ b/xds/internal/balancer/clusterresolver/resource_resolver_test.go @@ -28,7 +28,6 @@ import ( "github.com/google/go-cmp/cmp" "google.golang.org/grpc/resolver" "google.golang.org/grpc/resolver/manual" - "google.golang.org/grpc/xds/internal/balancer/clusterresolver/balancerconfig" "google.golang.org/grpc/xds/internal/testutils" "google.golang.org/grpc/xds/internal/testutils/fakeclient" xdsclient "google.golang.org/grpc/xds/internal/xdsclient" @@ -58,20 +57,20 @@ func (s) TestResourceResolverOneEDSResource(t *testing.T) { clusterName, edsName string wantName string edsUpdate xdsclient.EndpointsUpdate - want []balancerconfig.PriorityConfig + want []priorityConfig }{ {name: "watch EDS", clusterName: testClusterName, edsName: testEDSServcie, wantName: testEDSServcie, edsUpdate: testEDSUpdates[0], - want: []balancerconfig.PriorityConfig{{ - Mechanism: balancerconfig.DiscoveryMechanism{ - Type: balancerconfig.DiscoveryMechanismTypeEDS, + want: []priorityConfig{{ + mechanism: DiscoveryMechanism{ + Type: DiscoveryMechanismTypeEDS, Cluster: testClusterName, EDSServiceName: testEDSServcie, }, - EDSResp: testEDSUpdates[0], + edsResp: testEDSUpdates[0], }}, }, { @@ -79,20 +78,20 @@ func (s) TestResourceResolverOneEDSResource(t *testing.T) { clusterName: testClusterName, wantName: testClusterName, edsUpdate: testEDSUpdates[1], - want: []balancerconfig.PriorityConfig{{ - Mechanism: balancerconfig.DiscoveryMechanism{ - Type: balancerconfig.DiscoveryMechanismTypeEDS, + want: []priorityConfig{{ + mechanism: DiscoveryMechanism{ + Type: DiscoveryMechanismTypeEDS, Cluster: testClusterName, }, - EDSResp: testEDSUpdates[1], + edsResp: testEDSUpdates[1], }}, }, } { t.Run(test.name, func(t *testing.T) { fakeClient := fakeclient.NewClient() rr := newResourceResolver(&clusterResolverBalancer{xdsClient: fakeClient}) - rr.updateMechanisms([]balancerconfig.DiscoveryMechanism{{ - Type: balancerconfig.DiscoveryMechanismTypeEDS, + rr.updateMechanisms([]DiscoveryMechanism{{ + Type: DiscoveryMechanismTypeEDS, Cluster: test.clusterName, EDSServiceName: test.edsName, }}) @@ -110,7 +109,7 @@ func (s) TestResourceResolverOneEDSResource(t *testing.T) { fakeClient.InvokeWatchEDSCallback("", test.edsUpdate, nil) select { case u := <-rr.updateChannel: - if diff := cmp.Diff(u.priorities, test.want); diff != "" { + if diff := cmp.Diff(u.priorities, test.want, cmp.AllowUnexported(priorityConfig{})); diff != "" { t.Fatalf("got unexpected resource update, diff (-got, +want): %v", diff) } case <-ctx.Done(): @@ -152,19 +151,19 @@ func (s) TestResourceResolverOneDNSResource(t *testing.T) { target string wantTarget resolver.Target addrs []resolver.Address - want []balancerconfig.PriorityConfig + want []priorityConfig }{ { name: "watch DNS", target: testDNSTarget, wantTarget: resolver.Target{Scheme: "dns", Endpoint: testDNSTarget}, addrs: []resolver.Address{{Addr: "1.1.1.1"}, {Addr: "2.2.2.2"}}, - want: []balancerconfig.PriorityConfig{{ - Mechanism: balancerconfig.DiscoveryMechanism{ - Type: balancerconfig.DiscoveryMechanismTypeLogicalDNS, + want: []priorityConfig{{ + mechanism: DiscoveryMechanism{ + Type: DiscoveryMechanismTypeLogicalDNS, DNSHostname: testDNSTarget, }, - Addresses: []string{"1.1.1.1", "2.2.2.2"}, + addresses: []string{"1.1.1.1", "2.2.2.2"}, }}, }, } { @@ -173,8 +172,8 @@ func (s) TestResourceResolverOneDNSResource(t *testing.T) { defer cleanup() fakeClient := fakeclient.NewClient() rr := newResourceResolver(&clusterResolverBalancer{xdsClient: fakeClient}) - rr.updateMechanisms([]balancerconfig.DiscoveryMechanism{{ - Type: balancerconfig.DiscoveryMechanismTypeLogicalDNS, + rr.updateMechanisms([]DiscoveryMechanism{{ + Type: DiscoveryMechanismTypeLogicalDNS, DNSHostname: test.target, }}) ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout) @@ -192,7 +191,7 @@ func (s) TestResourceResolverOneDNSResource(t *testing.T) { dnsR.UpdateState(resolver.State{Addresses: test.addrs}) select { case u := <-rr.updateChannel: - if diff := cmp.Diff(u.priorities, test.want); diff != "" { + if diff := cmp.Diff(u.priorities, test.want, cmp.AllowUnexported(priorityConfig{})); diff != "" { t.Fatalf("got unexpected resource update, diff (-got, +want): %v", diff) } case <-ctx.Done(): @@ -221,8 +220,8 @@ func (s) TestResourceResolverOneDNSResource(t *testing.T) { func (s) TestResourceResolverChangeEDSName(t *testing.T) { fakeClient := fakeclient.NewClient() rr := newResourceResolver(&clusterResolverBalancer{xdsClient: fakeClient}) - rr.updateMechanisms([]balancerconfig.DiscoveryMechanism{{ - Type: balancerconfig.DiscoveryMechanismTypeEDS, + rr.updateMechanisms([]DiscoveryMechanism{{ + Type: DiscoveryMechanismTypeEDS, Cluster: testClusterName, EDSServiceName: testEDSServcie, }}) @@ -240,14 +239,14 @@ func (s) TestResourceResolverChangeEDSName(t *testing.T) { fakeClient.InvokeWatchEDSCallback(gotEDSName1, testEDSUpdates[0], nil) select { case u := <-rr.updateChannel: - if diff := cmp.Diff(u.priorities, []balancerconfig.PriorityConfig{{ - Mechanism: balancerconfig.DiscoveryMechanism{ - Type: balancerconfig.DiscoveryMechanismTypeEDS, + if diff := cmp.Diff(u.priorities, []priorityConfig{{ + mechanism: DiscoveryMechanism{ + Type: DiscoveryMechanismTypeEDS, Cluster: testClusterName, EDSServiceName: testEDSServcie, }, - EDSResp: testEDSUpdates[0], - }}); diff != "" { + edsResp: testEDSUpdates[0], + }}, cmp.AllowUnexported(priorityConfig{})); diff != "" { t.Fatalf("got unexpected resource update, diff (-got, +want): %v", diff) } case <-ctx.Done(): @@ -255,8 +254,8 @@ func (s) TestResourceResolverChangeEDSName(t *testing.T) { } // Change name to watch. - rr.updateMechanisms([]balancerconfig.DiscoveryMechanism{{ - Type: balancerconfig.DiscoveryMechanismTypeEDS, + rr.updateMechanisms([]DiscoveryMechanism{{ + Type: DiscoveryMechanismTypeEDS, Cluster: testClusterName, }}) edsNameCanceled1, err := fakeClient.WaitForCancelEDSWatch(ctx) @@ -287,13 +286,13 @@ func (s) TestResourceResolverChangeEDSName(t *testing.T) { fakeClient.InvokeWatchEDSCallback(gotEDSName2, testEDSUpdates[1], nil) select { case u := <-rr.updateChannel: - if diff := cmp.Diff(u.priorities, []balancerconfig.PriorityConfig{{ - Mechanism: balancerconfig.DiscoveryMechanism{ - Type: balancerconfig.DiscoveryMechanismTypeEDS, + if diff := cmp.Diff(u.priorities, []priorityConfig{{ + mechanism: DiscoveryMechanism{ + Type: DiscoveryMechanismTypeEDS, Cluster: testClusterName, }, - EDSResp: testEDSUpdates[1], - }}); diff != "" { + edsResp: testEDSUpdates[1], + }}, cmp.AllowUnexported(priorityConfig{})); diff != "" { t.Fatalf("got unexpected resource update, diff (-got, +want): %v", diff) } case <-ctx.Done(): @@ -302,8 +301,8 @@ func (s) TestResourceResolverChangeEDSName(t *testing.T) { // Change circuit breaking count, should get an update with new circuit // breaking count, but shouldn't trigger new watch. - rr.updateMechanisms([]balancerconfig.DiscoveryMechanism{{ - Type: balancerconfig.DiscoveryMechanismTypeEDS, + rr.updateMechanisms([]DiscoveryMechanism{{ + Type: DiscoveryMechanismTypeEDS, Cluster: testClusterName, MaxConcurrentRequests: newUint32(123), }}) @@ -314,14 +313,14 @@ func (s) TestResourceResolverChangeEDSName(t *testing.T) { } select { case u := <-rr.updateChannel: - if diff := cmp.Diff(u.priorities, []balancerconfig.PriorityConfig{{ - Mechanism: balancerconfig.DiscoveryMechanism{ - Type: balancerconfig.DiscoveryMechanismTypeEDS, + if diff := cmp.Diff(u.priorities, []priorityConfig{{ + mechanism: DiscoveryMechanism{ + Type: DiscoveryMechanismTypeEDS, Cluster: testClusterName, MaxConcurrentRequests: newUint32(123), }, - EDSResp: testEDSUpdates[1], - }}); diff != "" { + edsResp: testEDSUpdates[1], + }}, cmp.AllowUnexported(priorityConfig{})); diff != "" { t.Fatalf("got unexpected resource update, diff (-got, +want): %v", diff) } case <-ctx.Done(): @@ -344,13 +343,13 @@ func (s) TestResourceResolverChangeEDSName(t *testing.T) { func (s) TestResourceResolverNoChangeNoUpdate(t *testing.T) { fakeClient := fakeclient.NewClient() rr := newResourceResolver(&clusterResolverBalancer{xdsClient: fakeClient}) - rr.updateMechanisms([]balancerconfig.DiscoveryMechanism{ + rr.updateMechanisms([]DiscoveryMechanism{ { - Type: balancerconfig.DiscoveryMechanismTypeEDS, + Type: DiscoveryMechanismTypeEDS, Cluster: testClusterNames[0], }, { - Type: balancerconfig.DiscoveryMechanismTypeEDS, + Type: DiscoveryMechanismTypeEDS, Cluster: testClusterNames[1], MaxConcurrentRequests: newUint32(100), }, @@ -385,23 +384,23 @@ func (s) TestResourceResolverNoChangeNoUpdate(t *testing.T) { fakeClient.InvokeWatchEDSCallback(gotEDSName2, testEDSUpdates[1], nil) select { case u := <-rr.updateChannel: - if diff := cmp.Diff(u.priorities, []balancerconfig.PriorityConfig{ + if diff := cmp.Diff(u.priorities, []priorityConfig{ { - Mechanism: balancerconfig.DiscoveryMechanism{ - Type: balancerconfig.DiscoveryMechanismTypeEDS, + mechanism: DiscoveryMechanism{ + Type: DiscoveryMechanismTypeEDS, Cluster: testClusterNames[0], }, - EDSResp: testEDSUpdates[0], + edsResp: testEDSUpdates[0], }, { - Mechanism: balancerconfig.DiscoveryMechanism{ - Type: balancerconfig.DiscoveryMechanismTypeEDS, + mechanism: DiscoveryMechanism{ + Type: DiscoveryMechanismTypeEDS, Cluster: testClusterNames[1], MaxConcurrentRequests: newUint32(100), }, - EDSResp: testEDSUpdates[1], + edsResp: testEDSUpdates[1], }, - }); diff != "" { + }, cmp.AllowUnexported(priorityConfig{})); diff != "" { t.Fatalf("got unexpected resource update, diff (-got, +want): %v", diff) } case <-ctx.Done(): @@ -409,13 +408,13 @@ func (s) TestResourceResolverNoChangeNoUpdate(t *testing.T) { } // Send the same resources with the same priorities, shouldn't any change. - rr.updateMechanisms([]balancerconfig.DiscoveryMechanism{ + rr.updateMechanisms([]DiscoveryMechanism{ { - Type: balancerconfig.DiscoveryMechanismTypeEDS, + Type: DiscoveryMechanismTypeEDS, Cluster: testClusterNames[0], }, { - Type: balancerconfig.DiscoveryMechanismTypeEDS, + Type: DiscoveryMechanismTypeEDS, Cluster: testClusterNames[1], MaxConcurrentRequests: newUint32(100), }, @@ -457,13 +456,13 @@ func (s) TestResourceResolverNoChangeNoUpdate(t *testing.T) { func (s) TestResourceResolverChangePriority(t *testing.T) { fakeClient := fakeclient.NewClient() rr := newResourceResolver(&clusterResolverBalancer{xdsClient: fakeClient}) - rr.updateMechanisms([]balancerconfig.DiscoveryMechanism{ + rr.updateMechanisms([]DiscoveryMechanism{ { - Type: balancerconfig.DiscoveryMechanismTypeEDS, + Type: DiscoveryMechanismTypeEDS, Cluster: testClusterNames[0], }, { - Type: balancerconfig.DiscoveryMechanismTypeEDS, + Type: DiscoveryMechanismTypeEDS, Cluster: testClusterNames[1], }, }) @@ -497,22 +496,22 @@ func (s) TestResourceResolverChangePriority(t *testing.T) { fakeClient.InvokeWatchEDSCallback(gotEDSName2, testEDSUpdates[1], nil) select { case u := <-rr.updateChannel: - if diff := cmp.Diff(u.priorities, []balancerconfig.PriorityConfig{ + if diff := cmp.Diff(u.priorities, []priorityConfig{ { - Mechanism: balancerconfig.DiscoveryMechanism{ - Type: balancerconfig.DiscoveryMechanismTypeEDS, + mechanism: DiscoveryMechanism{ + Type: DiscoveryMechanismTypeEDS, Cluster: testClusterNames[0], }, - EDSResp: testEDSUpdates[0], + edsResp: testEDSUpdates[0], }, { - Mechanism: balancerconfig.DiscoveryMechanism{ - Type: balancerconfig.DiscoveryMechanismTypeEDS, + mechanism: DiscoveryMechanism{ + Type: DiscoveryMechanismTypeEDS, Cluster: testClusterNames[1], }, - EDSResp: testEDSUpdates[1], + edsResp: testEDSUpdates[1], }, - }); diff != "" { + }, cmp.AllowUnexported(priorityConfig{})); diff != "" { t.Fatalf("got unexpected resource update, diff (-got, +want): %v", diff) } case <-ctx.Done(): @@ -521,13 +520,13 @@ func (s) TestResourceResolverChangePriority(t *testing.T) { // Send the same resources with different priorities, shouldn't trigger // watch, but should trigger an update with the new priorities. - rr.updateMechanisms([]balancerconfig.DiscoveryMechanism{ + rr.updateMechanisms([]DiscoveryMechanism{ { - Type: balancerconfig.DiscoveryMechanismTypeEDS, + Type: DiscoveryMechanismTypeEDS, Cluster: testClusterNames[1], }, { - Type: balancerconfig.DiscoveryMechanismTypeEDS, + Type: DiscoveryMechanismTypeEDS, Cluster: testClusterNames[0], }, }) @@ -538,22 +537,22 @@ func (s) TestResourceResolverChangePriority(t *testing.T) { } select { case u := <-rr.updateChannel: - if diff := cmp.Diff(u.priorities, []balancerconfig.PriorityConfig{ + if diff := cmp.Diff(u.priorities, []priorityConfig{ { - Mechanism: balancerconfig.DiscoveryMechanism{ - Type: balancerconfig.DiscoveryMechanismTypeEDS, + mechanism: DiscoveryMechanism{ + Type: DiscoveryMechanismTypeEDS, Cluster: testClusterNames[1], }, - EDSResp: testEDSUpdates[1], + edsResp: testEDSUpdates[1], }, { - Mechanism: balancerconfig.DiscoveryMechanism{ - Type: balancerconfig.DiscoveryMechanismTypeEDS, + mechanism: DiscoveryMechanism{ + Type: DiscoveryMechanismTypeEDS, Cluster: testClusterNames[0], }, - EDSResp: testEDSUpdates[0], + edsResp: testEDSUpdates[0], }, - }); diff != "" { + }, cmp.AllowUnexported(priorityConfig{})); diff != "" { t.Fatalf("got unexpected resource update, diff (-got, +want): %v", diff) } case <-ctx.Done(): @@ -584,13 +583,13 @@ func (s) TestResourceResolverEDSAndDNS(t *testing.T) { defer cleanup() fakeClient := fakeclient.NewClient() rr := newResourceResolver(&clusterResolverBalancer{xdsClient: fakeClient}) - rr.updateMechanisms([]balancerconfig.DiscoveryMechanism{ + rr.updateMechanisms([]DiscoveryMechanism{ { - Type: balancerconfig.DiscoveryMechanismTypeEDS, + Type: DiscoveryMechanismTypeEDS, Cluster: testClusterName, }, { - Type: balancerconfig.DiscoveryMechanismTypeLogicalDNS, + Type: DiscoveryMechanismTypeLogicalDNS, DNSHostname: testDNSTarget, }, }) @@ -625,22 +624,22 @@ func (s) TestResourceResolverEDSAndDNS(t *testing.T) { dnsR.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "1.1.1.1"}, {Addr: "2.2.2.2"}}}) select { case u := <-rr.updateChannel: - if diff := cmp.Diff(u.priorities, []balancerconfig.PriorityConfig{ + if diff := cmp.Diff(u.priorities, []priorityConfig{ { - Mechanism: balancerconfig.DiscoveryMechanism{ - Type: balancerconfig.DiscoveryMechanismTypeEDS, + mechanism: DiscoveryMechanism{ + Type: DiscoveryMechanismTypeEDS, Cluster: testClusterName, }, - EDSResp: testEDSUpdates[0], + edsResp: testEDSUpdates[0], }, { - Mechanism: balancerconfig.DiscoveryMechanism{ - Type: balancerconfig.DiscoveryMechanismTypeLogicalDNS, + mechanism: DiscoveryMechanism{ + Type: DiscoveryMechanismTypeLogicalDNS, DNSHostname: testDNSTarget, }, - Addresses: []string{"1.1.1.1", "2.2.2.2"}, + addresses: []string{"1.1.1.1", "2.2.2.2"}, }, - }); diff != "" { + }, cmp.AllowUnexported(priorityConfig{})); diff != "" { t.Fatalf("got unexpected resource update, diff (-got, +want): %v", diff) } case <-ctx.Done(): @@ -669,8 +668,8 @@ func (s) TestResourceResolverChangeFromEDSToDNS(t *testing.T) { defer cleanup() fakeClient := fakeclient.NewClient() rr := newResourceResolver(&clusterResolverBalancer{xdsClient: fakeClient}) - rr.updateMechanisms([]balancerconfig.DiscoveryMechanism{{ - Type: balancerconfig.DiscoveryMechanismTypeEDS, + rr.updateMechanisms([]DiscoveryMechanism{{ + Type: DiscoveryMechanismTypeEDS, Cluster: testClusterName, }}) ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout) @@ -687,13 +686,13 @@ func (s) TestResourceResolverChangeFromEDSToDNS(t *testing.T) { fakeClient.InvokeWatchEDSCallback(gotEDSName1, testEDSUpdates[0], nil) select { case u := <-rr.updateChannel: - if diff := cmp.Diff(u.priorities, []balancerconfig.PriorityConfig{{ - Mechanism: balancerconfig.DiscoveryMechanism{ - Type: balancerconfig.DiscoveryMechanismTypeEDS, + if diff := cmp.Diff(u.priorities, []priorityConfig{{ + mechanism: DiscoveryMechanism{ + Type: DiscoveryMechanismTypeEDS, Cluster: testClusterName, }, - EDSResp: testEDSUpdates[0], - }}); diff != "" { + edsResp: testEDSUpdates[0], + }}, cmp.AllowUnexported(priorityConfig{})); diff != "" { t.Fatalf("got unexpected resource update, diff (-got, +want): %v", diff) } case <-ctx.Done(): @@ -701,8 +700,8 @@ func (s) TestResourceResolverChangeFromEDSToDNS(t *testing.T) { } // Update to watch DNS instead. Should cancel EDS, and start DNS. - rr.updateMechanisms([]balancerconfig.DiscoveryMechanism{{ - Type: balancerconfig.DiscoveryMechanismTypeLogicalDNS, + rr.updateMechanisms([]DiscoveryMechanism{{ + Type: DiscoveryMechanismTypeLogicalDNS, DNSHostname: testDNSTarget, }}) select { @@ -724,13 +723,13 @@ func (s) TestResourceResolverChangeFromEDSToDNS(t *testing.T) { dnsR.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "1.1.1.1"}, {Addr: "2.2.2.2"}}}) select { case u := <-rr.updateChannel: - if diff := cmp.Diff(u.priorities, []balancerconfig.PriorityConfig{{ - Mechanism: balancerconfig.DiscoveryMechanism{ - Type: balancerconfig.DiscoveryMechanismTypeLogicalDNS, + if diff := cmp.Diff(u.priorities, []priorityConfig{{ + mechanism: DiscoveryMechanism{ + Type: DiscoveryMechanismTypeLogicalDNS, DNSHostname: testDNSTarget, }, - Addresses: []string{"1.1.1.1", "2.2.2.2"}, - }}); diff != "" { + addresses: []string{"1.1.1.1", "2.2.2.2"}, + }}, cmp.AllowUnexported(priorityConfig{})); diff != "" { t.Fatalf("got unexpected resource update, diff (-got, +want): %v", diff) } case <-ctx.Done(): @@ -752,13 +751,13 @@ func (s) TestResourceResolverError(t *testing.T) { defer cleanup() fakeClient := fakeclient.NewClient() rr := newResourceResolver(&clusterResolverBalancer{xdsClient: fakeClient}) - rr.updateMechanisms([]balancerconfig.DiscoveryMechanism{ + rr.updateMechanisms([]DiscoveryMechanism{ { - Type: balancerconfig.DiscoveryMechanismTypeEDS, + Type: DiscoveryMechanismTypeEDS, Cluster: testClusterName, }, { - Type: balancerconfig.DiscoveryMechanismTypeLogicalDNS, + Type: DiscoveryMechanismTypeLogicalDNS, DNSHostname: testDNSTarget, }, }) @@ -826,8 +825,8 @@ func (s) TestResourceResolverDNSResolveNow(t *testing.T) { defer cleanup() fakeClient := fakeclient.NewClient() rr := newResourceResolver(&clusterResolverBalancer{xdsClient: fakeClient}) - rr.updateMechanisms([]balancerconfig.DiscoveryMechanism{{ - Type: balancerconfig.DiscoveryMechanismTypeLogicalDNS, + rr.updateMechanisms([]DiscoveryMechanism{{ + Type: DiscoveryMechanismTypeLogicalDNS, DNSHostname: testDNSTarget, }}) ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout) @@ -845,13 +844,13 @@ func (s) TestResourceResolverDNSResolveNow(t *testing.T) { dnsR.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "1.1.1.1"}, {Addr: "2.2.2.2"}}}) select { case u := <-rr.updateChannel: - if diff := cmp.Diff(u.priorities, []balancerconfig.PriorityConfig{{ - Mechanism: balancerconfig.DiscoveryMechanism{ - Type: balancerconfig.DiscoveryMechanismTypeLogicalDNS, + if diff := cmp.Diff(u.priorities, []priorityConfig{{ + mechanism: DiscoveryMechanism{ + Type: DiscoveryMechanismTypeLogicalDNS, DNSHostname: testDNSTarget, }, - Addresses: []string{"1.1.1.1", "2.2.2.2"}, - }}); diff != "" { + addresses: []string{"1.1.1.1", "2.2.2.2"}, + }}, cmp.AllowUnexported(priorityConfig{})); diff != "" { t.Fatalf("got unexpected resource update, diff (-got, +want): %v", diff) } case <-ctx.Done(): diff --git a/xds/internal/balancer/weightedtarget/weightedtarget.go b/xds/internal/balancer/weightedtarget/weightedtarget.go index fd9da9c59d71..eb6516af56e9 100644 --- a/xds/internal/balancer/weightedtarget/weightedtarget.go +++ b/xds/internal/balancer/weightedtarget/weightedtarget.go @@ -81,7 +81,7 @@ type weightedTargetBalancer struct { } // UpdateClientConnState takes the new targets in balancer group, -// creates/deletes sub-balancers and sends them update. Addresses are split into +// creates/deletes sub-balancers and sends them update. addresses are split into // groups based on hierarchy path. func (b *weightedTargetBalancer) UpdateClientConnState(s balancer.ClientConnState) error { b.logger.Infof("Received update from resolver, balancer config: %+v", pretty.ToJSON(s.BalancerConfig)) @@ -137,7 +137,7 @@ func (b *weightedTargetBalancer) UpdateClientConnState(s balancer.ClientConnStat } // Forwards all the update: - // - Addresses are from the map after splitting with hierarchy path, + // - addresses are from the map after splitting with hierarchy path, // - Top level service config and attributes are the same, // - Balancer config comes from the targets map. //