Skip to content

Commit 2bf3b72

Browse files
jkremserwozniakjan
andauthored
migrate from v1.Endpoints to discoveryv1.EndpointSlices (kedacore#1298)
* Migration to EndpointSlices Signed-off-by: Jirka Kremser <jiri.kremser@gmail.com> * finish migration from v1.Endpoints to discoveryv1.EndpointSlices Signed-off-by: Jan Wozniak <wozniak.jan@gmail.com> * handle multiple EndpointSlices per service Signed-off-by: Jan Wozniak <wozniak.jan@gmail.com> * remove deletecollection rbac for endpoint slices Signed-off-by: Jan Wozniak <wozniak.jan@gmail.com> --------- Signed-off-by: Jirka Kremser <jiri.kremser@gmail.com> Signed-off-by: Jan Wozniak <wozniak.jan@gmail.com> Co-authored-by: Jan Wozniak <wozniak.jan@gmail.com>
1 parent b2fa2b5 commit 2bf3b72

18 files changed

+285
-276
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ This changelog keeps track of work items that have been completed and are ready
2121

2222
### Breaking Changes
2323

24+
- **General**: `v1.Endpoints` -> `EndpointSlices` ([#1297](https://github.com/kedacore/http-add-on/issues/1297)) Potentially breaking for old versions of k8s.
2425
- **General**: TODO ([#TODO](https://github.com/kedacore/http-add-on/issues/TODO))
2526

2627
### New

config/interceptor/role.yaml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,15 @@ rules:
77
- apiGroups:
88
- ""
99
resources:
10-
- endpoints
10+
- services
1111
verbs:
1212
- get
1313
- list
1414
- watch
1515
- apiGroups:
16-
- ""
16+
- discovery.k8s.io
1717
resources:
18-
- services
18+
- endpointslices
1919
verbs:
2020
- get
2121
- list

config/scaler/role.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,9 @@ metadata:
55
name: scaler
66
rules:
77
- apiGroups:
8-
- ""
8+
- discovery.k8s.io
99
resources:
10-
- endpoints
10+
- endpointslices
1111
verbs:
1212
- get
1313
- list

interceptor/forward_wait_func.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import (
55
"fmt"
66

77
"github.com/go-logr/logr"
8-
v1 "k8s.io/api/core/v1"
8+
discov1 "k8s.io/api/discovery/v1"
99

1010
"github.com/kedacore/http-add-on/pkg/k8s"
1111
)
@@ -14,10 +14,10 @@ import (
1414
// before proceeding to serve the request.
1515
type forwardWaitFunc func(context.Context, string, string) (bool, error)
1616

17-
func workloadActiveEndpoints(endpoints v1.Endpoints) int {
17+
func workloadActiveEndpoints(endpoints discov1.EndpointSlice) int {
1818
total := 0
19-
for _, subset := range endpoints.Subsets {
20-
total += len(subset.Addresses)
19+
for _, e := range endpoints.Endpoints {
20+
total += len(e.Addresses)
2121
}
2222
return total
2323
}
@@ -55,7 +55,7 @@ func newWorkloadReplicasForwardWaitFunc(
5555
for {
5656
select {
5757
case event := <-eventCh:
58-
endpoints, ok := event.Object.(*v1.Endpoints)
58+
endpoints, ok := event.Object.(*discov1.EndpointSlice)
5959
if !ok {
6060
lggr.Info(
6161
"Didn't get a endpoints back in event",

interceptor/forward_wait_func_test.go

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import (
88
"github.com/go-logr/logr"
99
"github.com/stretchr/testify/require"
1010
"golang.org/x/sync/errgroup"
11-
v1 "k8s.io/api/core/v1"
11+
discov1 "k8s.io/api/discovery/v1"
1212
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1313
"k8s.io/apimachinery/pkg/watch"
1414

@@ -27,7 +27,7 @@ func TestForwardWaitFuncOneReplica(t *testing.T) {
2727
endpoints := *newEndpoint(ns, endpointsName)
2828
cache := k8s.NewFakeEndpointsCache()
2929
cache.Set(endpoints)
30-
r.NoError(cache.SetSubsets(ns, endpointsName, 1))
30+
r.NoError(cache.SetEndpoints(ns, endpointsName, 1))
3131

3232
ctx, done := context.WithTimeout(ctx, waitFuncWait)
3333
defer done()
@@ -97,10 +97,10 @@ func TestWaitFuncWaitsUntilReplicas(t *testing.T) {
9797
watcher := cache.GetWatcher(ns, endpointsName)
9898
r.NotNil(watcher, "watcher was not found")
9999
modifiedEndpoints := endpoints.DeepCopy()
100-
modifiedEndpoints.Subsets = []v1.EndpointSubset{
100+
modifiedEndpoints.Endpoints = []discov1.Endpoint{
101101
{
102-
Addresses: []v1.EndpointAddress{
103-
{IP: "1.2.3.4"},
102+
Addresses: []string{
103+
"1.2.3.4",
104104
},
105105
},
106106
}
@@ -119,11 +119,14 @@ func TestWaitFuncWaitsUntilReplicas(t *testing.T) {
119119
func newEndpoint(
120120
namespace,
121121
name string,
122-
) *v1.Endpoints {
123-
endpoints := &v1.Endpoints{
122+
) *discov1.EndpointSlice {
123+
endpoints := &discov1.EndpointSlice{
124124
ObjectMeta: metav1.ObjectMeta{
125-
Name: name,
126-
Namespace: namespace,
125+
GenerateName: name,
126+
Namespace: namespace,
127+
Labels: map[string]string{
128+
discov1.LabelServiceName: name,
129+
},
127130
},
128131
}
129132

interceptor/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ var (
4646
)
4747

4848
// +kubebuilder:rbac:groups=http.keda.sh,resources=httpscaledobjects,verbs=get;list;watch
49-
// +kubebuilder:rbac:groups="",resources=endpoints,verbs=get;list;watch
49+
// +kubebuilder:rbac:groups=discovery.k8s.io,resources=endpointslices,verbs=get;list;watch
5050
// +kubebuilder:rbac:groups="",resources=services,verbs=get;list;watch
5151

5252
func main() {

interceptor/proxy_handlers_integration_test.go

Lines changed: 23 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ import (
1717
"github.com/stretchr/testify/assert"
1818
"github.com/stretchr/testify/require"
1919
"golang.org/x/sync/errgroup"
20-
v1 "k8s.io/api/core/v1"
20+
discov1 "k8s.io/api/discovery/v1"
2121
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2222
"k8s.io/apimachinery/pkg/util/wait"
2323

@@ -55,13 +55,16 @@ func TestIntegrationHappyPath(t *testing.T) {
5555
)
5656
h.routingTable.Memory[hostForTest(t)] = target
5757

58-
h.endpCache.Set(v1.Endpoints{
58+
h.endpCache.Set(discov1.EndpointSlice{
5959
ObjectMeta: metav1.ObjectMeta{
60-
Name: serviceName,
61-
Namespace: target.GetNamespace(),
60+
GenerateName: serviceName,
61+
Namespace: target.GetNamespace(),
62+
Labels: map[string]string{
63+
discov1.LabelServiceName: serviceName,
64+
},
6265
},
6366
})
64-
r.NoError(h.endpCache.SetSubsets(target.GetNamespace(), serviceName, 1))
67+
r.NoError(h.endpCache.SetEndpoints(target.GetNamespace(), serviceName, 1))
6568

6669
// happy path
6770
res, err := doRequest(
@@ -122,10 +125,13 @@ func TestIntegrationNoReplicas(t *testing.T) {
122125
h.routingTable.Memory[hostForTest(t)] = target
123126

124127
// 0 replicas
125-
h.endpCache.Set(v1.Endpoints{
128+
h.endpCache.Set(discov1.EndpointSlice{
126129
ObjectMeta: metav1.ObjectMeta{
127-
Name: serviceName,
128-
Namespace: target.GetNamespace(),
130+
GenerateName: serviceName,
131+
Namespace: target.GetNamespace(),
132+
Labels: map[string]string{
133+
discov1.LabelServiceName: serviceName,
134+
},
129135
},
130136
})
131137

@@ -173,10 +179,13 @@ func TestIntegrationWaitReplicas(t *testing.T) {
173179
// set up a endpoint with zero replicas and create
174180
// a watcher we can use later to fake-send a endpoint
175181
// event
176-
h.endpCache.Set(v1.Endpoints{
182+
h.endpCache.Set(discov1.EndpointSlice{
177183
ObjectMeta: metav1.ObjectMeta{
178-
Name: serviceName,
179-
Namespace: target.GetNamespace(),
184+
GenerateName: serviceName,
185+
Namespace: target.GetNamespace(),
186+
Labels: map[string]string{
187+
discov1.LabelServiceName: serviceName,
188+
},
180189
},
181190
})
182191
endpoints, _ := h.endpCache.Get(target.GetNamespace(), serviceName)
@@ -207,10 +216,10 @@ func TestIntegrationWaitReplicas(t *testing.T) {
207216
t.Logf("Woke up, setting replicas to 10")
208217

209218
modifiedEndpoints := endpoints.DeepCopy()
210-
modifiedEndpoints.Subsets = []v1.EndpointSubset{
219+
modifiedEndpoints.Endpoints = []discov1.Endpoint{
211220
{
212-
Addresses: []v1.EndpointAddress{
213-
{IP: "1.2.3.4"},
221+
Addresses: []string{
222+
"1.2.3.4",
214223
},
215224
},
216225
}

operator/controllers/http/ping_test.go

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"testing"
77

88
"github.com/stretchr/testify/require"
9+
"sigs.k8s.io/controller-runtime/pkg/client"
910
"sigs.k8s.io/controller-runtime/pkg/client/fake"
1011

1112
"github.com/kedacore/http-add-on/pkg/k8s"
@@ -30,9 +31,13 @@ func TestPingInterceptors(t *testing.T) {
3031
r.NoError(err)
3132
defer srv.Close()
3233
ctx := context.Background()
33-
endpoints, err := k8s.FakeEndpointsForURL(url, ns, svcName, 2)
34+
endpoints, err := k8s.FakeEndpointsForURL(url, ns, svcName, 1)
3435
r.NoError(err)
35-
cl := fake.NewClientBuilder().WithObjects(endpoints).Build()
36+
eps := []client.Object{}
37+
for i := range endpoints.Items {
38+
eps = append(eps, &endpoints.Items[i])
39+
}
40+
cl := fake.NewClientBuilder().WithObjects(eps...).Build()
3641
r.NoError(pingInterceptors(
3742
ctx,
3843
cl,
@@ -42,5 +47,11 @@ func TestPingInterceptors(t *testing.T) {
4247
url.Port(),
4348
))
4449
reqs := hdl.IncomingRequests()
45-
r.Equal(len(endpoints.Subsets[0].Addresses), len(reqs))
50+
var endpointsAddrs []string
51+
for _, es := range endpoints.Items {
52+
for _, e := range es.Endpoints {
53+
endpointsAddrs = append(endpointsAddrs, e.Addresses...)
54+
}
55+
}
56+
r.Equal(len(endpointsAddrs), len(reqs))
4657
}

pkg/k8s/endpoints.go

Lines changed: 61 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -3,79 +3,88 @@ package k8s
33
import (
44
"context"
55
"fmt"
6+
"maps"
67
"net/url"
8+
"slices"
79

8-
v1 "k8s.io/api/core/v1"
10+
discov1 "k8s.io/api/discovery/v1"
911
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1012
"k8s.io/client-go/kubernetes"
1113
"sigs.k8s.io/controller-runtime/pkg/client"
1214
)
1315

16+
// Endpoints represents a set of ready and not ready addresses
17+
type Endpoints struct {
18+
ReadyAddresses []string
19+
NotReadyAddresses []string
20+
}
21+
1422
// GetEndpointsFunc is a type that represents a function that can
1523
// fetch endpoints
16-
type GetEndpointsFunc func(
17-
ctx context.Context,
18-
namespace,
19-
serviceName string,
20-
) (*v1.Endpoints, error)
24+
type GetEndpointsFunc func(ctx context.Context, namespace, serviceName string) (Endpoints, error)
2125

22-
func EndpointsForService(
23-
ctx context.Context,
24-
ns,
25-
serviceName,
26-
servicePort string,
27-
endpointsFn GetEndpointsFunc,
28-
) ([]*url.URL, error) {
26+
// EndpointsForService fetches the ready endpoints for a given service and returns them as a slice of URLs with the specified port
27+
func EndpointsForService(ctx context.Context, ns, serviceName, servicePort string, endpointsFn GetEndpointsFunc) ([]url.URL, error) {
2928
endpoints, err := endpointsFn(ctx, ns, serviceName)
3029
if err != nil {
31-
return nil, fmt.Errorf("pkg.k8s.EndpointsForService: %w", err)
30+
return []url.URL{}, fmt.Errorf("pkg.k8s.EndpointsForService: %w", err)
3231
}
33-
ret := []*url.URL{}
34-
for _, subset := range endpoints.Subsets {
35-
for _, addr := range subset.Addresses {
36-
u, err := url.Parse(
37-
fmt.Sprintf("http://%s:%s", addr.IP, servicePort),
38-
)
39-
if err != nil {
40-
return nil, err
41-
}
42-
ret = append(ret, u)
32+
ret := make([]url.URL, 0, len(endpoints.ReadyAddresses))
33+
for _, addr := range endpoints.ReadyAddresses {
34+
u := url.URL{
35+
Scheme: "http",
36+
Host: fmt.Sprintf("%s:%s", addr, servicePort),
4337
}
38+
ret = append(ret, u)
4439
}
45-
4640
return ret, nil
4741
}
4842

49-
// EndpointsFuncForControllerClient returns a new GetEndpointsFunc
50-
// that uses the controller-runtime client.Client to fetch endpoints
51-
func EndpointsFuncForControllerClient(
52-
cl client.Client,
53-
) GetEndpointsFunc {
54-
return func(
55-
ctx context.Context,
56-
namespace,
57-
serviceName string,
58-
) (*v1.Endpoints, error) {
59-
endpts := &v1.Endpoints{}
60-
if err := cl.Get(ctx, client.ObjectKey{
61-
Namespace: namespace,
62-
Name: serviceName,
63-
}, endpts); err != nil {
64-
return nil, err
43+
// EndpointsFuncForControllerClient returns a new GetEndpointsFunc that uses the controller-runtime client.Client to fetch endpoints
44+
func EndpointsFuncForControllerClient(cl client.Client) GetEndpointsFunc {
45+
return func(ctx context.Context, namespace, serviceName string) (Endpoints, error) {
46+
ess := &discov1.EndpointSliceList{}
47+
48+
if err := cl.List(ctx, ess, client.InNamespace(namespace), client.MatchingLabels{discov1.LabelServiceName: serviceName}); err != nil {
49+
return Endpoints{}, err
50+
}
51+
return extractAddresses(ess.Items), nil
52+
}
53+
}
54+
55+
// EndpointsFuncForK8sClientset returns a new GetEndpointsFunc that uses the kubernetes.Clientset to fetch endpoints
56+
// TODO: this should be eventually removed because it causes high load on the API server, there is EndpointsFuncForControllerClient instead
57+
func EndpointsFuncForK8sClientset(cl *kubernetes.Clientset) GetEndpointsFunc {
58+
return func(ctx context.Context, namespace, serviceName string) (Endpoints, error) {
59+
endpointSlCl := cl.DiscoveryV1().EndpointSlices(namespace)
60+
ess, err := endpointSlCl.List(ctx, metav1.ListOptions{LabelSelector: fmt.Sprintf("%s=%s", discov1.LabelServiceName, serviceName)})
61+
if err != nil {
62+
return Endpoints{}, err
6563
}
66-
return endpts, nil
64+
return extractAddresses(ess.Items), nil
6765
}
6866
}
6967

70-
func EndpointsFuncForK8sClientset(
71-
cl *kubernetes.Clientset,
72-
) GetEndpointsFunc {
73-
return func(
74-
ctx context.Context,
75-
namespace,
76-
serviceName string,
77-
) (*v1.Endpoints, error) {
78-
endpointsCl := cl.CoreV1().Endpoints(namespace)
79-
return endpointsCl.Get(ctx, serviceName, metav1.GetOptions{})
68+
// extractAddresses extracts ready and not ready addresses from the given list of EndpointSlice
69+
func extractAddresses(eps []discov1.EndpointSlice) Endpoints {
70+
// addresses from EndpointSlices need to be deduplicated
71+
ready, notReady := make(map[string]bool), make(map[string]bool)
72+
for _, ep := range eps {
73+
for _, addr := range ep.Endpoints {
74+
// see also https://github.com/kubernetes/api/blob/v0.33.0/discovery/v1/types.go#L137-L144
75+
if addr.Conditions.Ready == nil || *addr.Conditions.Ready {
76+
for _, a := range addr.Addresses {
77+
ready[a] = true
78+
}
79+
} else {
80+
for _, a := range addr.Addresses {
81+
notReady[a] = true
82+
}
83+
}
84+
}
85+
}
86+
return Endpoints{
87+
ReadyAddresses: slices.Collect(maps.Keys(ready)),
88+
NotReadyAddresses: slices.Collect(maps.Keys(notReady)),
8089
}
8190
}

0 commit comments

Comments
 (0)