diff --git a/pilot/pkg/proxy/envoy/v2/eds_test.go b/pilot/pkg/proxy/envoy/v2/eds_test.go index a2cae7554a4e..a2c4f4168133 100644 --- a/pilot/pkg/proxy/envoy/v2/eds_test.go +++ b/pilot/pkg/proxy/envoy/v2/eds_test.go @@ -104,10 +104,11 @@ func TestEds(t *testing.T) { }) t.Run("CDSSave", func(t *testing.T) { // Moved from cds_test, using new client - if len(adscConn.Clusters) == 0 { + clusters := adscConn.GetClusters() + if len(clusters) == 0 { t.Error("No clusters in ADS response") } - strResponse, _ := json.MarshalIndent(adscConn.Clusters, " ", " ") + strResponse, _ := json.MarshalIndent(clusters, " ", " ") _ = ioutil.WriteFile(env.IstioOut+"/cdsv2_sidecar.json", strResponse, 0644) }) @@ -117,7 +118,8 @@ func TestEds(t *testing.T) { adscConn := adsConnectAndWait(t, 0x0a0a0a0a) defer adscConn.Close() - lbe, f := adscConn.EDS["outbound|80||weighted.static.svc.cluster.local"] + endpoints := adscConn.GetEndpoints() + lbe, f := endpoints["outbound|80||weighted.static.svc.cluster.local"] if !f || len(lbe.Endpoints) == 0 { t.Fatalf("No lb endpoints for %v, %v", "outbound|80||weighted.static.svc.cluster.local", adscConn.EndpointsJSON()) } @@ -167,7 +169,7 @@ func adsConnectAndWait(t *testing.T, ip int) *adsc.ADSC { t.Fatal("Error getting initial config ", err) } - if len(adscConn.EDS) == 0 { + if len(adscConn.GetEndpoints()) == 0 { t.Fatal("No endpoints") } return adscConn @@ -222,7 +224,7 @@ func testTCPEndpoints(expected string, adsc *adsc.ADSC, t *testing.T) { // address. func testEndpoints(expected string, cluster string, adsc *adsc.ADSC, t *testing.T) { t.Helper() - lbe, f := adsc.EDS[cluster] + lbe, f := adsc.GetEndpoints()[cluster] if !f || len(lbe.Endpoints) == 0 { t.Fatalf("No lb endpoints for %v, %v", cluster, adsc.EndpointsJSON()) } @@ -243,12 +245,15 @@ func testEndpoints(expected string, cluster string, adsc *adsc.ADSC, t *testing. } func testLocalityPrioritizedEndpoints(adsc *adsc.ADSC, adsc2 *adsc.ADSC, t *testing.T) { - verifyLocalityPriorities(asdcLocality, adsc.EDS["outbound|80||locality.cluster.local"].GetEndpoints(), t) - verifyLocalityPriorities(asdc2Locality, adsc2.EDS["outbound|80||locality.cluster.local"].GetEndpoints(), t) + endpoints1 := adsc.GetEndpoints() + endpoints2 := adsc2.GetEndpoints() + + verifyLocalityPriorities(asdcLocality, endpoints1["outbound|80||locality.cluster.local"].GetEndpoints(), t) + verifyLocalityPriorities(asdc2Locality, endpoints2["outbound|80||locality.cluster.local"].GetEndpoints(), t) // No outlier detection specified for this cluster, so we shouldn't apply priority. - verifyNoLocalityPriorities(adsc.EDS["outbound|80||locality-no-outlier-detection.cluster.local"].GetEndpoints(), t) - verifyNoLocalityPriorities(adsc2.EDS["outbound|80||locality-no-outlier-detection.cluster.local"].GetEndpoints(), t) + verifyNoLocalityPriorities(endpoints1["outbound|80||locality-no-outlier-detection.cluster.local"].GetEndpoints(), t) + verifyNoLocalityPriorities(endpoints2["outbound|80||locality-no-outlier-detection.cluster.local"].GetEndpoints(), t) } // Tests that Services with multiple ports sharing the same port number are properly sent endpoints. @@ -303,9 +308,9 @@ func verifyLocalityPriorities(proxyLocality string, eps []*endpoint.LocalityLbEn // Verify server sends UDS endpoints func testUdsEndpoints(_ *bootstrap.Server, adsc *adsc.ADSC, t *testing.T) { // Check the UDS endpoint ( used to be separate test - but using old unused GRPC method) - // The new test also verifies CDS is pusing the UDS cluster, since adsc.EDS is + // The new test also verifies CDS is pusing the UDS cluster, since adsc.eds is // populated using CDS response - lbe, f := adsc.EDS["outbound|0||localuds.cluster.local"] + lbe, f := adsc.GetEndpoints()["outbound|0||localuds.cluster.local"] if !f || len(lbe.Endpoints) == 0 { t.Error("No UDS lb endpoints") } else { @@ -478,7 +483,7 @@ func multipleRequest(server *bootstrap.Server, inc bool, nclients, return } - if len(adscConn.EDS) == 0 { + if len(adscConn.GetEndpoints()) == 0 { errChan <- errors.New("no endpoints") wgConnect.Done() return diff --git a/pilot/pkg/proxy/envoy/v2/lds_test.go b/pilot/pkg/proxy/envoy/v2/lds_test.go index f35d841e8d25..001a11e75a2f 100644 --- a/pilot/pkg/proxy/envoy/v2/lds_test.go +++ b/pilot/pkg/proxy/envoy/v2/lds_test.go @@ -78,10 +78,10 @@ func TestLDSIsolated(t *testing.T) { // 7071 (inbound), 2001 (service - also as http proxy), 15002 (http-proxy) // We dont get mixer on 9091 or 15004 because there are no services defined in istio-system namespace // in the none.yaml setup - if len(ldsr.HTTPListeners) != 3 { + if len(ldsr.GetHTTPListeners()) != 3 { // TODO: we are still debating if for HTTP services we have any use case to create a 127.0.0.1:port outbound // for the service (the http proxy is already covering this) - t.Error("HTTP listeners, expecting 5 got ", len(ldsr.HTTPListeners), ldsr.HTTPListeners) + t.Error("HTTP listeners, expecting 5 got ", len(ldsr.GetHTTPListeners()), ldsr.GetHTTPListeners()) } // s1tcp:2000 outbound, bind=true (to reach other instances of the service) @@ -89,7 +89,7 @@ func TestLDSIsolated(t *testing.T) { // :443 - https external, bind=false // 10.11.0.1_7070, bind=true -> inbound|2000|s1 - on port 7070, fwd to 37070 // virtual - if len(ldsr.TCPListeners) == 0 { + if len(ldsr.GetTCPListeners()) == 0 { t.Fatal("No response") } @@ -241,22 +241,22 @@ func TestLDSWithDefaultSidecar(t *testing.T) { } // Expect 7 listeners : 2 orig_dst, 1 http inbound + 4 outbound (http, tcp1, istio-policy and istio-telemetry) - if (len(adsResponse.HTTPListeners) + len(adsResponse.TCPListeners)) != 7 { - t.Fatalf("Expected 7 listeners, got %d\n", len(adsResponse.HTTPListeners)+len(adsResponse.TCPListeners)) + if (len(adsResponse.GetHTTPListeners()) + len(adsResponse.GetTCPListeners())) != 7 { + t.Fatalf("Expected 7 listeners, got %d\n", len(adsResponse.GetHTTPListeners())+len(adsResponse.GetTCPListeners())) } // Expect 12 CDS clusters: // 3 inbound(http, inbound passthroughipv4 and inbound passthroughipv6) // 9 outbound (2 http services, 1 tcp service, 2 istio-system services, // and 2 subsets of http1, 1 blackhole, 1 passthrough) - if (len(adsResponse.Clusters) + len(adsResponse.EDSClusters)) != 12 { - t.Fatalf("Expected 12 Clusters in CDS output. Got %d", len(adsResponse.Clusters)+len(adsResponse.EDSClusters)) + if (len(adsResponse.GetClusters()) + len(adsResponse.GetEdsClusters())) != 12 { + t.Fatalf("Expected 12 clusters in CDS output. Got %d", len(adsResponse.GetClusters())+len(adsResponse.GetEdsClusters())) } // Expect two vhost blocks in RDS output for 8080 (one for http1, another for http2) // plus one extra due to mem registry - if len(adsResponse.Routes["8080"].VirtualHosts) != 3 { - t.Fatalf("Expected 3 VirtualHosts in RDS output. Got %d", len(adsResponse.Routes["8080"].VirtualHosts)) + if len(adsResponse.GetRoutes()["8080"].VirtualHosts) != 3 { + t.Fatalf("Expected 3 VirtualHosts in RDS output. Got %d", len(adsResponse.GetRoutes()["8080"].VirtualHosts)) } } @@ -306,13 +306,13 @@ func TestLDSWithIngressGateway(t *testing.T) { // Expect 2 listeners : 1 for 80, 1 for 443 // where 443 listener has 3 filter chains - if (len(adsResponse.HTTPListeners) + len(adsResponse.TCPListeners)) != 2 { - t.Fatalf("Expected 2 listeners, got %d\n", len(adsResponse.HTTPListeners)+len(adsResponse.TCPListeners)) + if (len(adsResponse.GetHTTPListeners()) + len(adsResponse.GetTCPListeners())) != 2 { + t.Fatalf("Expected 2 listeners, got %d\n", len(adsResponse.GetHTTPListeners())+len(adsResponse.GetTCPListeners())) } // TODO: This is flimsy. The ADSC code treats any listener with http connection manager as a HTTP listener // instead of looking at it as a listener with multiple filter chains - l := adsResponse.HTTPListeners["0.0.0.0_443"] + l := adsResponse.GetHTTPListeners()["0.0.0.0_443"] if l != nil { if len(l.FilterChains) != 3 { @@ -431,13 +431,13 @@ func TestLDSWithSidecarForWorkloadWithoutService(t *testing.T) { } // Expect 1 HTTP listeners for 8081 - if len(adsResponse.HTTPListeners) != 1 { - t.Fatalf("Expected 1 http listeners, got %d", len(adsResponse.HTTPListeners)) + if len(adsResponse.GetHTTPListeners()) != 1 { + t.Fatalf("Expected 1 http listeners, got %d", len(adsResponse.GetHTTPListeners())) } // TODO: This is flimsy. The ADSC code treats any listener with http connection manager as a HTTP listener // instead of looking at it as a listener with multiple filter chains - if l := adsResponse.HTTPListeners["0.0.0.0_8081"]; l != nil { + if l := adsResponse.GetHTTPListeners()["0.0.0.0_8081"]; l != nil { expected := 1 if features.RestrictPodIPTrafficLoops.Get() { expected = 2 @@ -449,12 +449,12 @@ func TestLDSWithSidecarForWorkloadWithoutService(t *testing.T) { t.Fatal("Expected listener for 0.0.0.0_8081") } - // Expect only one EDS cluster for http1.ns1.svc.cluster.local - if len(adsResponse.EDSClusters) != 1 { - t.Fatalf("Expected 1 eds cluster, got %d", len(adsResponse.EDSClusters)) + // Expect only one eds cluster for http1.ns1.svc.cluster.local + if len(adsResponse.GetEdsClusters()) != 1 { + t.Fatalf("Expected 1 eds cluster, got %d", len(adsResponse.GetEdsClusters())) } - if cluster, ok := adsResponse.EDSClusters["outbound|8081||http1.ns1.svc.cluster.local"]; !ok { - t.Fatalf("Expected EDS cluster outbound|8081||http1.ns1.svc.cluster.local, got %v", cluster.Name) + if cluster, ok := adsResponse.GetEdsClusters()["outbound|8081||http1.ns1.svc.cluster.local"]; !ok { + t.Fatalf("Expected eds cluster outbound|8081||http1.ns1.svc.cluster.local, got %v", cluster.Name) } } @@ -532,12 +532,12 @@ func TestLDSEnvoyFilterWithWorkloadSelector(t *testing.T) { } // Expect 1 HTTP listeners for 8081 - if len(adsResponse.HTTPListeners) != 1 { - t.Fatalf("Expected 1 http listeners, got %d", len(adsResponse.HTTPListeners)) + if len(adsResponse.GetHTTPListeners()) != 1 { + t.Fatalf("Expected 1 http listeners, got %d", len(adsResponse.GetHTTPListeners())) } // TODO: This is flimsy. The ADSC code treats any listener with http connection manager as a HTTP listener // instead of looking at it as a listener with multiple filter chains - l := adsResponse.HTTPListeners["0.0.0.0_8081"] + l := adsResponse.GetHTTPListeners()["0.0.0.0_8081"] expectLuaFilter(t, l, test.expectLuaFilter) }) diff --git a/pkg/adsc/adsc.go b/pkg/adsc/adsc.go index 1436ebeca6ee..a869af840b90 100644 --- a/pkg/adsc/adsc.go +++ b/pkg/adsc/adsc.go @@ -79,23 +79,23 @@ type ADSC struct { // InitialLoad tracks the time to receive the initial configuration. InitialLoad time.Duration - // HTTPListeners contains received listeners with a http_connection_manager filter. - HTTPListeners map[string]*xdsapi.Listener + // httpListeners contains received listeners with a http_connection_manager filter. + httpListeners map[string]*xdsapi.Listener - // TCPListeners contains all listeners of type TCP (not-HTTP) - TCPListeners map[string]*xdsapi.Listener + // tcpListeners contains all listeners of type TCP (not-HTTP) + tcpListeners map[string]*xdsapi.Listener - // All received clusters of type EDS, keyed by name - EDSClusters map[string]*xdsapi.Cluster + // All received clusters of type eds, keyed by name + edsClusters map[string]*xdsapi.Cluster - // All received clusters of no-EDS type, keyed by name - Clusters map[string]*xdsapi.Cluster + // All received clusters of no-eds type, keyed by name + clusters map[string]*xdsapi.Cluster // All received routes, keyed by route name - Routes map[string]*xdsapi.RouteConfiguration + routes map[string]*xdsapi.RouteConfiguration // All received endpoints, keyed by cluster name - EDS map[string]*xdsapi.ClusterLoadAssignment + eds map[string]*xdsapi.ClusterLoadAssignment // DumpCfg will print all received config DumpCfg bool @@ -364,8 +364,8 @@ func (a *ADSC) handleLDS(ll []*xdsapi.Listener) { if len(routes) > 0 { a.sendRsc(routeType, routes) } - a.HTTPListeners = lh - a.TCPListeners = lt + a.httpListeners = lh + a.tcpListeners = lt select { case a.Updates <- "lds": @@ -392,18 +392,18 @@ type Target struct { // Address is a go address, extracted from the mangled cluster name. Address string - // Endpoints are the resolved endpoints from EDS or cluster static. + // Endpoints are the resolved endpoints from eds or cluster static. Endpoints map[string]Endpoint } type Endpoint struct { - // Weight extracted from EDS + // Weight extracted from eds Weight int } // Save will save the json configs to files, using the base directory func (a *ADSC) Save(base string) error { - strResponse, err := json.MarshalIndent(a.TCPListeners, " ", " ") + strResponse, err := json.MarshalIndent(a.tcpListeners, " ", " ") if err != nil { return err } @@ -411,7 +411,7 @@ func (a *ADSC) Save(base string) error { if err != nil { return err } - strResponse, err = json.MarshalIndent(a.HTTPListeners, " ", " ") + strResponse, err = json.MarshalIndent(a.httpListeners, " ", " ") if err != nil { return err } @@ -419,7 +419,7 @@ func (a *ADSC) Save(base string) error { if err != nil { return err } - strResponse, err = json.MarshalIndent(a.Routes, " ", " ") + strResponse, err = json.MarshalIndent(a.routes, " ", " ") if err != nil { return err } @@ -427,7 +427,7 @@ func (a *ADSC) Save(base string) error { if err != nil { return err } - strResponse, err = json.MarshalIndent(a.EDSClusters, " ", " ") + strResponse, err = json.MarshalIndent(a.edsClusters, " ", " ") if err != nil { return err } @@ -435,7 +435,7 @@ func (a *ADSC) Save(base string) error { if err != nil { return err } - strResponse, err = json.MarshalIndent(a.Clusters, " ", " ") + strResponse, err = json.MarshalIndent(a.clusters, " ", " ") if err != nil { return err } @@ -443,7 +443,7 @@ func (a *ADSC) Save(base string) error { if err != nil { return err } - strResponse, err = json.MarshalIndent(a.EDS, " ", " ") + strResponse, err = json.MarshalIndent(a.eds, " ", " ") if err != nil { return err } @@ -486,8 +486,8 @@ func (a *ADSC) handleCDS(ll []*xdsapi.Cluster) { a.mutex.Lock() defer a.mutex.Unlock() - a.EDSClusters = edscds - a.Clusters = cds + a.edsClusters = edscds + a.clusters = cds select { case a.Updates <- "cds": @@ -533,7 +533,7 @@ func (a *ADSC) handleEDS(eds []*xdsapi.ClusterLoadAssignment) { ep += len(cla.Endpoints) } - log.Println("EDS: ", len(eds), "size=", edsSize, "ep=", ep) + log.Println("eds: ", len(eds), "size=", edsSize, "ep=", ep) if a.DumpCfg { b, _ := json.MarshalIndent(eds, " ", " ") log.Println(string(b)) @@ -549,7 +549,7 @@ func (a *ADSC) handleEDS(eds []*xdsapi.ClusterLoadAssignment) { a.mutex.Lock() defer a.mutex.Unlock() - a.EDS = la + a.eds = la select { case a.Updates <- "eds": @@ -590,7 +590,7 @@ func (a *ADSC) handleRDS(configurations []*xdsapi.RouteConfiguration) { } a.mutex.Lock() - a.Routes = rds + a.routes = rds a.mutex.Unlock() select { @@ -630,7 +630,7 @@ func (a *ADSC) Wait(update string, to time.Duration) (string, error) { // EndpointsJSON returns the endpoints, formatted as JSON, for debugging. func (a *ADSC) EndpointsJSON() string { - out, _ := json.MarshalIndent(a.EDS, " ", " ") + out, _ := json.MarshalIndent(a.eds, " ", " ") return string(out) } @@ -662,3 +662,45 @@ func (a *ADSC) ack(msg *xdsapi.DiscoveryResponse) { VersionInfo: msg.VersionInfo, }) } + +// GetHTTPListeners returns all the http listeners. +func (a *ADSC) GetHTTPListeners() map[string]*xdsapi.Listener { + a.mutex.Lock() + defer a.mutex.Unlock() + return a.httpListeners +} + +// GetTCPListeners returns all the tcp listeners. +func (a *ADSC) GetTCPListeners() map[string]*xdsapi.Listener { + a.mutex.Lock() + defer a.mutex.Unlock() + return a.tcpListeners +} + +// GetEdsClusters returns all the eds type clusters. +func (a *ADSC) GetEdsClusters() map[string]*xdsapi.Cluster { + a.mutex.Lock() + defer a.mutex.Unlock() + return a.edsClusters +} + +// GetClusters returns all the non-eds type clusters. +func (a *ADSC) GetClusters() map[string]*xdsapi.Cluster { + a.mutex.Lock() + defer a.mutex.Unlock() + return a.clusters +} + +// GetRoutes returns all the routes. +func (a *ADSC) GetRoutes() map[string]*xdsapi.RouteConfiguration { + a.mutex.Lock() + defer a.mutex.Unlock() + return a.routes +} + +// GetEndpoints returns all the routes. +func (a *ADSC) GetEndpoints() map[string]*xdsapi.ClusterLoadAssignment { + a.mutex.Lock() + defer a.mutex.Unlock() + return a.eds +} diff --git a/tests/e2e/tests/pilot/performance/serviceentry_test.go b/tests/e2e/tests/pilot/performance/serviceentry_test.go index 0314b2c7b58d..e909c3b26aca 100644 --- a/tests/e2e/tests/pilot/performance/serviceentry_test.go +++ b/tests/e2e/tests/pilot/performance/serviceentry_test.go @@ -183,7 +183,7 @@ func adsConnectAndWait(n int, pilotAddr string, t *testing.T) (adscs []*adsc.ADS t.Fatal(err) } - if len(c.EDS) == 0 { + if len(c.GetEndpoints()) == 0 { t.Fatalf("No endpoints") } adscs = append(adscs, c)