Skip to content

Commit

Permalink
skipper: use kubernetes cluster client to discover redis shards
Browse files Browse the repository at this point in the history
Add LoadEndpointAddresses to kubernetes dataclient that is
similar to caching GetEndpointAddresses but does not rely on
previous call to Load or LoadAll.

Fixes #2476

Signed-off-by: Alexander Yastrebov <alexander.yastrebov@zalando.de>
  • Loading branch information
AlexanderYastrebov committed Feb 13, 2024
1 parent 79c14f4 commit 106e764
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 20 deletions.
38 changes: 38 additions & 0 deletions dataclients/kubernetes/clusterclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,10 @@ func (c *clusterClient) loadEndpointSlices() (map[definitions.ResourceID]*skippe
}
log.Debugf("all endpointslices received: %d", len(endpointSlices.Items))

return mapEndpointSlices(&endpointSlices), nil
}

func mapEndpointSlices(endpointSlices *endpointSliceList) map[definitions.ResourceID]*skipperEndpointSlice {
mapSlices := make(map[definitions.ResourceID][]*endpointSlice)
for _, endpointSlice := range endpointSlices.Items {
resID := endpointSlice.ToResourceID() // service resource ID
Expand Down Expand Up @@ -550,6 +554,40 @@ func (c *clusterClient) loadEndpointSlices() (map[definitions.ResourceID]*skippe
result[resID].Endpoints = append(result[resID].Endpoints, o)
}
}
return result
}

// LoadEndpointAddresses returns the list of all addresses for the given service using endpoints or endpointslices API.
func (c *clusterClient) loadEndpointAddresses(namespace, name string) ([]string, error) {
var result []string
if c.enableEndpointSlices {
url := fmt.Sprintf(EndpointSlicesNamespaceFmt, namespace) +
toLabelSelectorQuery(map[string]string{endpointSliceServiceNameLabel: name})

var endpointSlices endpointSliceList
if err := c.getJSON(url, &endpointSlices); err != nil {
return nil, fmt.Errorf("requesting endpointslices for %s/%s failed: %v", namespace, name, err)
}

mapped := mapEndpointSlices(&endpointSlices)
if len(mapped) != 1 {
return nil, fmt.Errorf("unexpected number of endpoint slices for %s/%s: %d", namespace, name, len(mapped))
}

for _, eps := range mapped {
result = eps.addresses()
break
}
} else {
url := fmt.Sprintf(EndpointsNamespaceFmt, namespace) + "/" + name

var ep endpoint
if err := c.getJSON(url, &ep); err != nil {
return nil, fmt.Errorf("requesting endpoints for %s/%s failed: %v", namespace, name, err)
}
result = ep.addresses()
}
sort.Strings(result)

return result, nil
}
Expand Down
4 changes: 3 additions & 1 deletion dataclients/kubernetes/endpointslices.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"github.com/zalando/skipper/dataclients/kubernetes/definitions"
)

const endpointSliceServiceNameLabel = "kubernetes.io/service-name"

// There are [1..N] Kubernetes endpointslices created for a single Kubernetes service.
// Kubernetes endpointslices of a given service can have duplicates with different states.
// Therefore Kubernetes endpointslices need to be de-duplicated before usage.
Expand Down Expand Up @@ -97,7 +99,7 @@ type endpointSlice struct {

// ToResourceID returns the same string for a group endpointlisces created for the same svc
func (eps *endpointSlice) ToResourceID() definitions.ResourceID {
svcName := eps.Meta.Labels["kubernetes.io/service-name"]
svcName := eps.Meta.Labels[endpointSliceServiceNameLabel]
namespace := eps.Meta.Namespace
return newResourceID(namespace, svcName)
}
Expand Down
8 changes: 7 additions & 1 deletion dataclients/kubernetes/kube.go
Original file line number Diff line number Diff line change
Expand Up @@ -574,7 +574,8 @@ func (c *Client) fetchDefaultFilterConfigs() defaultFilters {
return filters
}

// GetEndpointAddresses returns the list of all addresses for the given service.
// GetEndpointAddresses returns the list of all addresses for the given service
// fetched by previous call to LoadAll or LoadUpdate.
func (c *Client) GetEndpointAddresses(ns, name string) []string {
c.mu.Lock()
defer c.mu.Unlock()
Expand All @@ -584,6 +585,11 @@ func (c *Client) GetEndpointAddresses(ns, name string) []string {
return c.state.getEndpointAddresses(ns, name)
}

// LoadEndpointAddresses returns the list of all addresses for the given service.
func (c *Client) LoadEndpointAddresses(namespace, name string) ([]string, error) {
return c.ClusterClient.loadEndpointAddresses(namespace, name)
}

func compareStringList(a, b []string) []string {
c := make([]string, 0)
for i := len(a) - 1; i >= 0; i-- {
Expand Down
55 changes: 37 additions & 18 deletions skipper.go
Original file line number Diff line number Diff line change
Expand Up @@ -1357,22 +1357,35 @@ func findKubernetesDataclient(dataClients []routing.DataClient) *kubernetes.Clie
return kdc
}

func getRedisUpdaterFunc(opts *Options, kdc *kubernetes.Client) func() ([]string, error) {
// TODO(sszuecs): make sure kubernetes dataclient is already initialized and
// has polled the data once or kdc.GetEndpointAdresses should be blocking
// call to kubernetes API
return func() ([]string, error) {
a := kdc.GetEndpointAddresses(opts.KubernetesRedisServiceNamespace, opts.KubernetesRedisServiceName)
log.Debugf("Redis updater called and found %d redis endpoints", len(a))
func getRedisUpdaterFunc(opts *Options, kdc *kubernetes.Client, cached bool) func() ([]string, error) {
if cached {
// TODO(sszuecs): make sure kubernetes dataclient is already initialized and
// has polled the data once or kdc.GetEndpointAdresses should be blocking
// call to kubernetes API
return func() ([]string, error) {
a := kdc.GetEndpointAddresses(opts.KubernetesRedisServiceNamespace, opts.KubernetesRedisServiceName)
log.Debugf("GetEndpointAddresses found %d redis endpoints", len(a))

return joinPort(a, opts.KubernetesRedisServicePort), nil
}
} else {
return func() ([]string, error) {
a, err := kdc.LoadEndpointAddresses(opts.KubernetesRedisServiceNamespace, opts.KubernetesRedisServiceName)
log.Debugf("LoadEndpointAddresses found %d redis endpoints, err: %v", len(a), err)

port := strconv.Itoa(opts.KubernetesRedisServicePort)
for i := 0; i < len(a); i++ {
a[i] = net.JoinHostPort(a[i], port)
return joinPort(a, opts.KubernetesRedisServicePort), err
}
return a, nil
}
}

func joinPort(addrs []string, port int) []string {
p := strconv.Itoa(port)
for i := 0; i < len(addrs); i++ {
addrs[i] = net.JoinHostPort(addrs[i], p)
}
return addrs
}

type RedisEndpoint struct {
Address string `json:"address"`
}
Expand Down Expand Up @@ -1688,25 +1701,31 @@ func run(o Options, sig chan os.Signal, idleConnsCH chan struct{}) error {

kdc := findKubernetesDataclient(dataClients)
if kdc != nil {
redisOptions.AddrUpdater = getRedisUpdaterFunc(&o, kdc)
_, err = redisOptions.AddrUpdater()
redisOptions.AddrUpdater = getRedisUpdaterFunc(&o, kdc, true)
} else {
kdc, err := kubernetes.New(o.KubernetesDataClientOptions())
if err != nil {
log.Errorf("Failed to update redis address %v", err)
return err
}
} else {
log.Errorf("Failed to find kubernetes dataclient, but redis shards should be get by kubernetes svc %s/%s", o.KubernetesRedisServiceNamespace, o.KubernetesRedisServiceName)
defer kdc.Close()

redisOptions.AddrUpdater = getRedisUpdaterFunc(&o, kdc, false)
}

_, err = redisOptions.AddrUpdater()
if err != nil {
log.Errorf("Failed to update redis addresses: %v", err)
return err
}
} else if redisOptions != nil && o.SwarmRedisEndpointsRemoteURL != "" {
log.Infof("Use remote address %s to fetch updates redis shards", o.SwarmRedisEndpointsRemoteURL)
redisOptions.AddrUpdater = updateEndpointsFromURL(o.SwarmRedisEndpointsRemoteURL)
_, err = redisOptions.AddrUpdater()
if err != nil {
log.Errorf("Failed to update redis endpoints from URL %v", err)
log.Errorf("Failed to update redis endpoints from URL: %v", err)
return err
}
}

}

var ratelimitRegistry *ratelimit.Registry
Expand Down

0 comments on commit 106e764

Please sign in to comment.