Skip to content

Commit

Permalink
dataclients/kubernetes: refactor GetEndpointAddresses
Browse files Browse the repository at this point in the history
* change GetEndpointAddresses to return host:port addresses to avoid trimming implicit scheme
* use all ports for endpointslices addresses for consistency with endpoints
* use separate cluster state cache for addresses since cache key does not contain port
* rename helpers protocol argument to scheme

For #2476

Signed-off-by: Alexander Yastrebov <alexander.yastrebov@zalando.de>
  • Loading branch information
AlexanderYastrebov committed Feb 9, 2024
1 parent cbc4e2d commit 4b0187f
Show file tree
Hide file tree
Showing 9 changed files with 106 additions and 78 deletions.
1 change: 1 addition & 0 deletions dataclients/kubernetes/clusterclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -595,6 +595,7 @@ func (c *clusterClient) fetchClusterState() (*clusterState, error) {
routeGroups: routeGroups,
services: services,
cachedEndpoints: make(map[endpointID][]string),
cachedAddresses: make(map[definitions.ResourceID][]string),
enableEndpointSlices: c.enableEndpointSlices,
}

Expand Down
30 changes: 14 additions & 16 deletions dataclients/kubernetes/clusterstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type clusterState struct {
endpointSlices map[definitions.ResourceID]*skipperEndpointSlice
secrets map[definitions.ResourceID]*secret
cachedEndpoints map[endpointID][]string
cachedAddresses map[definitions.ResourceID][]string
enableEndpointSlices bool
}

Expand Down Expand Up @@ -83,38 +84,35 @@ func (state *clusterState) GetEndpointsByService(namespace, name, protocol strin
return targets
}

// GetEndpointsByName returns the skipper endpoints for kubernetes endpoints or endpointslices.
// This function works only correctly for endpointslices (and likely endpoints) with one port with the same protocol ("TCP", "UDP").
func (state *clusterState) GetEndpointsByName(namespace, name, protocol, scheme string) []string {
epID := endpointID{
ResourceID: newResourceID(namespace, name),
Protocol: protocol,
}
// getEndpointAddresses returns the list of all host:port addresses for the given service using endpoints or endpointslices.
func (state *clusterState) getEndpointAddresses(namespace, name string) []string {
rID := newResourceID(namespace, name)

state.mu.Lock()
defer state.mu.Unlock()
if cached, ok := state.cachedEndpoints[epID]; ok {
if cached, ok := state.cachedAddresses[rID]; ok {
return cached
}

var targets []string
var addresses []string
if state.enableEndpointSlices {
if eps, ok := state.endpointSlices[epID.ResourceID]; ok {
targets = eps.targets(protocol, scheme)
if eps, ok := state.endpointSlices[rID]; ok {
addresses = eps.addresses()
} else {
return nil
}
} else {
if ep, ok := state.endpoints[epID.ResourceID]; ok {
targets = ep.targets(scheme)
if ep, ok := state.endpoints[rID]; ok {
addresses = ep.addresses()
} else {
return nil
}
}

sort.Strings(targets)
state.cachedEndpoints[epID] = targets
return targets
sort.Strings(addresses)
state.cachedAddresses[rID] = addresses

return addresses
}

// GetEndpointsByTarget returns the skipper endpoints for kubernetes endpoints or endpointslices.
Expand Down
23 changes: 10 additions & 13 deletions dataclients/kubernetes/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,18 @@ type endpointList struct {
Items []*endpoint `json:"items"`
}

func formatEndpointString(ip, protocol string, port int) string {
return protocol + "://" + net.JoinHostPort(ip, strconv.Itoa(port))
func formatEndpointString(ip, scheme string, port int) string {
return scheme + "://" + net.JoinHostPort(ip, strconv.Itoa(port))
}

func formatEndpoint(a *address, p *port, protocol string) string {
return formatEndpointString(a.IP, protocol, p.Port)
func formatEndpoint(a *address, p *port, scheme string) string {
return formatEndpointString(a.IP, scheme, p.Port)
}

func formatEndpointsForSubsetAddresses(addresses []*address, port *port, protocol string) []string {
func formatEndpointsForSubsetAddresses(addresses []*address, port *port, scheme string) []string {
result := make([]string, 0, len(addresses))
for _, address := range addresses {
result = append(result, formatEndpoint(address, port, protocol))
result = append(result, formatEndpoint(address, port, scheme))
}
return result
}
Expand All @@ -54,11 +54,10 @@ func (ep *endpoint) targetsByServicePort(protocol string, servicePort *servicePo
return formatEndpointsForSubsetAddresses(s.Addresses, p, protocol)
}
}

return nil
}

func (ep *endpoint) targetsByServiceTarget(protocol string, serviceTarget *definitions.BackendPort) []string {
func (ep *endpoint) targetsByServiceTarget(scheme string, serviceTarget *definitions.BackendPort) []string {
portName, named := serviceTarget.Value.(string)
portValue, byValue := serviceTarget.Value.(int)
for _, s := range ep.Subsets {
Expand All @@ -69,26 +68,24 @@ func (ep *endpoint) targetsByServiceTarget(protocol string, serviceTarget *defin

var result []string
for _, a := range s.Addresses {
result = append(result, formatEndpoint(a, p, protocol))
result = append(result, formatEndpoint(a, p, scheme))
}

return result
}
}

return nil
}

func (ep *endpoint) targets(protocol string) []string {
func (ep *endpoint) addresses() []string {
result := make([]string, 0)
for _, s := range ep.Subsets {
for _, p := range s.Ports {
for _, a := range s.Addresses {
result = append(result, formatEndpoint(a, p, protocol))
result = append(result, net.JoinHostPort(a.IP, strconv.Itoa(p.Port)))
}
}
}

return result
}

Expand Down
20 changes: 8 additions & 12 deletions dataclients/kubernetes/endpointslices.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package kubernetes

import (
"net"
"strconv"

"github.com/zalando/skipper/dataclients/kubernetes/definitions"
)

Expand Down Expand Up @@ -74,20 +77,13 @@ func (eps *skipperEndpointSlice) targetsByServiceTarget(protocol, scheme string,
return result
}

func (eps *skipperEndpointSlice) targets(protocol, scheme string) []string {
result := make([]string, 0, len(eps.Endpoints))

var port int
for _, p := range eps.Ports {
if p.Protocol == protocol {
port = p.Port
break
}
}
func (eps *skipperEndpointSlice) addresses() []string {
result := make([]string, 0, len(eps.Endpoints)*len(eps.Ports))
for _, ep := range eps.Endpoints {
result = append(result, formatEndpointString(ep.Address, scheme, port))
for _, p := range eps.Ports {
result = append(result, net.JoinHostPort(ep.Address, strconv.Itoa(p.Port)))
}
}

return result
}

Expand Down
98 changes: 69 additions & 29 deletions dataclients/kubernetes/endpointslices_test.go
Original file line number Diff line number Diff line change
@@ -1,51 +1,91 @@
package kubernetes

import (
"net/url"
"strconv"
"testing"

"github.com/zalando/skipper/dataclients/kubernetes/definitions"
"github.com/stretchr/testify/assert"
)

func TestTargets(t *testing.T) {
want := "http://10.0.0.1:8080"
u, err := url.Parse(want)
if err != nil {
t.Fatalf("Failed to parse: %v", err)
}
port, err := strconv.Atoi(u.Port())
if err != nil {
t.Fatalf("Failed to parse: %v", err)
}
func TestAddresses(t *testing.T) {
assert.Equal(t, []string{"10.0.0.1:8080"}, (&skipperEndpointSlice{
Endpoints: []*skipperEndpoint{
{
Address: "10.0.0.1",
Zone: "zone-1",
},
},
Ports: []*endpointSlicePort{
{
Name: "main",
Port: 8080,
Protocol: "TCP",
},
},
}).addresses())

ses := &skipperEndpointSlice{
Meta: &definitions.Metadata{
Namespace: "ns1",
Name: "a-slice",
assert.Equal(t, []string{"10.0.0.1:8080", "10.0.0.1:8081"}, (&skipperEndpointSlice{
Endpoints: []*skipperEndpoint{
{
Address: "10.0.0.1",
Zone: "zone-1",
},
},
Ports: []*endpointSlicePort{
{
Name: "main",
Port: 8080,
Protocol: "TCP",
},
{
Name: "support",
Port: 8081,
Protocol: "TCP",
},
},
}).addresses())

assert.Equal(t, []string{"10.0.0.1:8080", "10.0.0.2:8080"}, (&skipperEndpointSlice{
Endpoints: []*skipperEndpoint{
{
Address: u.Hostname(),
Address: "10.0.0.1",
Zone: "zone-1",
},
{
Address: "10.0.0.2",
Zone: "zone-2",
},
},
Ports: []*endpointSlicePort{
{
Name: "main",
Port: port,
Port: 8080,
Protocol: "TCP",
},
},
}
res := ses.targets("TCP", "http")
if l := len(res); l != 1 {
t.Fatalf("Failed to get same number of results than expected %d, got: %d", 1, l)
}
}).addresses())

for i := 0; i < len(res); i++ {
if want != res[i] {
t.Fatalf("Failed to get the right target: %s != %s", want, res[i])
}
}
assert.Equal(t, []string{"10.0.0.1:8080", "10.0.0.1:8081", "10.0.0.2:8080", "10.0.0.2:8081"}, (&skipperEndpointSlice{
Endpoints: []*skipperEndpoint{
{
Address: "10.0.0.1",
Zone: "zone-1",
},
{
Address: "10.0.0.2",
Zone: "zone-2",
},
},
Ports: []*endpointSlicePort{
{
Name: "main",
Port: 8080,
Protocol: "TCP",
},
{
Name: "support",
Port: 8081,
Protocol: "TCP",
},
},
}).addresses())
}
3 changes: 2 additions & 1 deletion dataclients/kubernetes/kube.go
Original file line number Diff line number Diff line change
Expand Up @@ -574,13 +574,14 @@ func (c *Client) fetchDefaultFilterConfigs() defaultFilters {
return filters
}

// GetEndpointAddresses returns the list of all host:port addresses for the given service.
func (c *Client) GetEndpointAddresses(ns, name string) []string {
c.mu.Lock()
defer c.mu.Unlock()
if c.state == nil {
return nil
}
return c.state.GetEndpointsByName(ns, name, "TCP", "http")
return c.state.getEndpointAddresses(ns, name)
}

func compareStringList(a, b []string) []string {
Expand Down
4 changes: 2 additions & 2 deletions dataclients/kubernetes/kube_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -599,7 +599,7 @@ func TestGetEndpointAddresses(t *testing.T) {
ns := "namespace1"
name := "service1"
got := client.GetEndpointAddresses(ns, name)
expected := []string{"http://1.1.1.0:8080"}
expected := []string{"1.1.1.0:8080"}
if len(got) != len(expected) {
t.Fatalf("Failed to get same size: %d != %d", len(expected), len(got))
}
Expand Down Expand Up @@ -646,7 +646,7 @@ func TestGetEndpointAddresses(t *testing.T) {
ns := "namespace1"
name := "service1"
got := client.GetEndpointAddresses(ns, name)
expected := []string{"http://1.1.1.0:8080"}
expected := []string{"1.1.1.0:8080"}
if len(got) != len(expected) {
t.Fatalf("Failed to get same size: %d != %d", len(expected), len(got))
}
Expand Down
2 changes: 0 additions & 2 deletions routesrv/redishandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"encoding/json"
"fmt"
"net/http"
"strings"

log "github.com/sirupsen/logrus"
"github.com/zalando/skipper/dataclients/kubernetes"
Expand Down Expand Up @@ -46,7 +45,6 @@ func getRedisAddresses(namespace, name string, kdc *kubernetes.Client, m metrics

result := RedisEndpoints{}
for i := 0; i < len(a); i++ {
a[i] = strings.TrimPrefix(a[i], "http://")
result.Endpoints = append(result.Endpoints, RedisEndpoint{Address: a[i]})
}

Expand Down
3 changes: 0 additions & 3 deletions skipper.go
Original file line number Diff line number Diff line change
Expand Up @@ -1364,9 +1364,6 @@ func getRedisUpdaterFunc(namespace, name string, kdc *kubernetes.Client) func()
return func() ([]string, error) {
a := kdc.GetEndpointAddresses(namespace, name)
log.Debugf("Redis updater called and found %d redis endpoints", len(a))
for i := 0; i < len(a); i++ {
a[i] = strings.TrimPrefix(a[i], "http://")
}
return a, nil
}
}
Expand Down

0 comments on commit 4b0187f

Please sign in to comment.