Skip to content

Commit 5ecba08

Browse files
Merge pull request #129837 from danwinship/aggregated-apiserver-endpointslices
Port aggregated apiserver discovery to EndpointSlices Kubernetes-commit: c41cc0a144a8205b63289c440b813892eed790ae
2 parents 2aa3afa + 7505a1c commit 5ecba08

File tree

6 files changed

+181
-118
lines changed

6 files changed

+181
-118
lines changed

go.mod

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,11 @@ require (
1717
go.opentelemetry.io/otel/sdk v1.34.0
1818
go.opentelemetry.io/otel/trace v1.35.0
1919
golang.org/x/net v0.38.0
20-
k8s.io/api v0.0.0-20250718010530-b3927ff69476
21-
k8s.io/apimachinery v0.0.0-20250717210244-b92abb2d8139
22-
k8s.io/apiserver v0.0.0-20250718012906-8cb2ab9b0d2e
23-
k8s.io/client-go v0.0.0-20250718010928-be36413bbca7
24-
k8s.io/code-generator v0.0.0-20250717211120-962847addb95
20+
k8s.io/api v0.0.0-20250722204042-c68fbbed1649
21+
k8s.io/apimachinery v0.0.0-20250723005633-58c4eb072ebf
22+
k8s.io/apiserver v0.0.0-20250723185939-b6a8cfb34902
23+
k8s.io/client-go v0.0.0-20250723062849-06fcc8a0753e
24+
k8s.io/code-generator v0.0.0-20250722051953-bd6c0b14fb10
2525
k8s.io/component-base v0.0.0-20250717172125-4e07767df717
2626
k8s.io/klog/v2 v2.130.1
2727
k8s.io/kube-openapi v0.0.0-20250710124328-f3f2b991d03b

go.sum

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -276,16 +276,16 @@ gopkg.in/natefinch/lumberjack.v2 v2.2.1/go.mod h1:YD8tP3GAjkrDg1eZH7EGmyESg/lsYs
276276
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
277277
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
278278
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
279-
k8s.io/api v0.0.0-20250718010530-b3927ff69476 h1:Aujs/kJa+rquu3rzDmTZmy8qul/yc4fTK1LEAK5QGpg=
280-
k8s.io/api v0.0.0-20250718010530-b3927ff69476/go.mod h1:K8dwhtttsRR0RHeSRF8XQ77gfMgyAj3q78/TkxEXhoc=
281-
k8s.io/apimachinery v0.0.0-20250717210244-b92abb2d8139 h1:jWBClrBPuk+GEA9pJzMa9IvxncSBbw7fmvey15nVm0w=
282-
k8s.io/apimachinery v0.0.0-20250717210244-b92abb2d8139/go.mod h1:v1p1Jsze3IHLy5gU17yVqR2qLO7jgYeX6mw3HZy2AEU=
283-
k8s.io/apiserver v0.0.0-20250718012906-8cb2ab9b0d2e h1:3JuleQxeQLW6SNcelMdlA/7BQzrJA5vRhfOCS/xkTw4=
284-
k8s.io/apiserver v0.0.0-20250718012906-8cb2ab9b0d2e/go.mod h1:VH4/X8jQWTpsTz1+jYSohEUkJDZYVlHDFbuas1QWws4=
285-
k8s.io/client-go v0.0.0-20250718010928-be36413bbca7 h1:LNOJkn+3JlAEzdZzYheQM97gq6kKQfkrBN0GikI5nbc=
286-
k8s.io/client-go v0.0.0-20250718010928-be36413bbca7/go.mod h1:a14VvgYhux7oUSE9mWdzBuFKDZSGtperboMjQ1JtVgc=
287-
k8s.io/code-generator v0.0.0-20250717211120-962847addb95 h1:/5GFPiY14PhB6ejRyo9a/Dhja+NDHeEuEjW5fe23WdI=
288-
k8s.io/code-generator v0.0.0-20250717211120-962847addb95/go.mod h1:s7klsGtPPWCJGFm5LcWBfmVJmPffzce9XE+GGzNHXds=
279+
k8s.io/api v0.0.0-20250722204042-c68fbbed1649 h1:HLNbFkZn1/O3wPdTUDOFXtPPQxM/h89c5whVWROHdX0=
280+
k8s.io/api v0.0.0-20250722204042-c68fbbed1649/go.mod h1:FvioxCEMxTNUCXlpYjmaOHhxfVFA8yAGncDC2nhYf7w=
281+
k8s.io/apimachinery v0.0.0-20250723005633-58c4eb072ebf h1:R1l0xAevbhH2Bg0iJuabo8/i9m31D1ehh2ZJPFKh9bc=
282+
k8s.io/apimachinery v0.0.0-20250723005633-58c4eb072ebf/go.mod h1:v1p1Jsze3IHLy5gU17yVqR2qLO7jgYeX6mw3HZy2AEU=
283+
k8s.io/apiserver v0.0.0-20250723185939-b6a8cfb34902 h1:XJwc7NE8YDDeONFiZjfdBWP3b4TH6FV2ipldGMk7Z9Q=
284+
k8s.io/apiserver v0.0.0-20250723185939-b6a8cfb34902/go.mod h1:kI5+6k1Dm7a5KwFpxbz2uNQciSwZye/iiYER/2Dh0k4=
285+
k8s.io/client-go v0.0.0-20250723062849-06fcc8a0753e h1:lDD8O87D4jXxiua49rdGq2ZH5JBpM92y1OO7SXO81WA=
286+
k8s.io/client-go v0.0.0-20250723062849-06fcc8a0753e/go.mod h1:o0hSfvIl7oJ/btIHtj8nT9SVN7Kmp4W0PEUfsPxLN3c=
287+
k8s.io/code-generator v0.0.0-20250722051953-bd6c0b14fb10 h1:8SZLOLTVVJLxu7isECJ0bo292PtbBP8IALNTVHhsJA0=
288+
k8s.io/code-generator v0.0.0-20250722051953-bd6c0b14fb10/go.mod h1:hdreRWFaDb97nBS1gq8on9c6y+oF2OaGePwl6ENj3xA=
289289
k8s.io/component-base v0.0.0-20250717172125-4e07767df717 h1:07oqkM0FzuGUw/bJw2rJubzccG7ShpGcTJ7SBDGp5Fc=
290290
k8s.io/component-base v0.0.0-20250717172125-4e07767df717/go.mod h1:/ehREU84M2OxVgU8WfxuUIi4/c5XsT6rIsEGQfhgxEQ=
291291
k8s.io/gengo/v2 v2.0.0-20250604051438-85fd79dbfd9f h1:SLb+kxmzfA87x4E4brQzB33VBbT2+x7Zq9ROIHmGn9Q=

pkg/apiserver/apiserver.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -334,7 +334,7 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
334334
remote, err := remoteavailability.New(
335335
informerFactory.Apiregistration().V1().APIServices(),
336336
c.GenericConfig.SharedInformerFactory.Core().V1().Services(),
337-
c.GenericConfig.SharedInformerFactory.Core().V1().Endpoints(),
337+
c.GenericConfig.SharedInformerFactory.Discovery().V1().EndpointSlices(),
338338
apiregistrationClient.ApiregistrationV1(),
339339
proxyTransportDial,
340340
(func() ([]byte, []byte))(s.proxyCurrentCertKeyContent),

pkg/apiserver/resolvers.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,20 +30,20 @@ type ServiceResolver interface {
3030

3131
// NewEndpointServiceResolver returns a ServiceResolver that chooses one of the
3232
// service's endpoints.
33-
func NewEndpointServiceResolver(services listersv1.ServiceLister, endpoints listersv1.EndpointsLister) ServiceResolver {
33+
func NewEndpointServiceResolver(services listersv1.ServiceLister, endpointSliceGetter proxy.EndpointSliceGetter) ServiceResolver {
3434
return &aggregatorEndpointRouting{
35-
services: services,
36-
endpoints: endpoints,
35+
services: services,
36+
endpointSliceGetter: endpointSliceGetter,
3737
}
3838
}
3939

4040
type aggregatorEndpointRouting struct {
41-
services listersv1.ServiceLister
42-
endpoints listersv1.EndpointsLister
41+
services listersv1.ServiceLister
42+
endpointSliceGetter proxy.EndpointSliceGetter
4343
}
4444

4545
func (r *aggregatorEndpointRouting) ResolveEndpoint(namespace, name string, port int32) (*url.URL, error) {
46-
return proxy.ResolveEndpoint(r.services, r.endpoints, namespace, name, port)
46+
return proxy.ResolveEndpoint(r.services, r.endpointSliceGetter, namespace, name, port)
4747
}
4848

4949
// NewClusterIPServiceResolver returns a ServiceResolver that directly calls the

pkg/controllers/status/remote/remote_available_controller.go

Lines changed: 74 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,16 @@ import (
2626
"time"
2727

2828
v1 "k8s.io/api/core/v1"
29+
discoveryv1 "k8s.io/api/discovery/v1"
2930
"k8s.io/apimachinery/pkg/api/equality"
3031
apierrors "k8s.io/apimachinery/pkg/api/errors"
31-
"k8s.io/apimachinery/pkg/api/meta"
3232
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3333
"k8s.io/apimachinery/pkg/labels"
34-
"k8s.io/apimachinery/pkg/runtime"
3534
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
3635
"k8s.io/apimachinery/pkg/util/wait"
36+
"k8s.io/apiserver/pkg/util/proxy"
3737
v1informers "k8s.io/client-go/informers/core/v1"
38+
discoveryv1informers "k8s.io/client-go/informers/discovery/v1"
3839
v1listers "k8s.io/client-go/listers/core/v1"
3940
"k8s.io/client-go/tools/cache"
4041
"k8s.io/client-go/transport"
@@ -67,8 +68,8 @@ type AvailableConditionController struct {
6768
serviceLister v1listers.ServiceLister
6869
servicesSynced cache.InformerSynced
6970

70-
endpointsLister v1listers.EndpointsLister
71-
endpointsSynced cache.InformerSynced
71+
endpointSliceGetter proxy.EndpointSliceGetter
72+
endpointSlicesSynced cache.InformerSynced
7273

7374
// proxyTransportDial specifies the dial function for creating unencrypted TCP connections.
7475
proxyTransportDial *transport.DialHolder
@@ -92,19 +93,25 @@ type AvailableConditionController struct {
9293
func New(
9394
apiServiceInformer informers.APIServiceInformer,
9495
serviceInformer v1informers.ServiceInformer,
95-
endpointsInformer v1informers.EndpointsInformer,
96+
endpointSliceInformer discoveryv1informers.EndpointSliceInformer,
9697
apiServiceClient apiregistrationclient.APIServicesGetter,
9798
proxyTransportDial *transport.DialHolder,
9899
proxyCurrentCertKeyContent certKeyFunc,
99100
serviceResolver ServiceResolver,
100101
metrics *availabilitymetrics.Metrics,
101102
) (*AvailableConditionController, error) {
103+
104+
endpointSliceGetter, err := proxy.NewEndpointSliceIndexerGetter(endpointSliceInformer)
105+
if err != nil {
106+
return nil, err
107+
}
108+
102109
c := &AvailableConditionController{
103-
apiServiceClient: apiServiceClient,
104-
apiServiceLister: apiServiceInformer.Lister(),
105-
serviceLister: serviceInformer.Lister(),
106-
endpointsLister: endpointsInformer.Lister(),
107-
serviceResolver: serviceResolver,
110+
apiServiceClient: apiServiceClient,
111+
apiServiceLister: apiServiceInformer.Lister(),
112+
serviceLister: serviceInformer.Lister(),
113+
endpointSliceGetter: endpointSliceGetter,
114+
serviceResolver: serviceResolver,
108115
queue: workqueue.NewTypedRateLimitingQueueWithConfig(
109116
// We want a fairly tight requeue time. The controller listens to the API, but because it relies on the routability of the
110117
// service network, it is possible for an external, non-watchable factor to affect availability. This keeps
@@ -137,12 +144,12 @@ func New(
137144
})
138145
c.servicesSynced = serviceHandler.HasSynced
139146

140-
endpointsHandler, _ := endpointsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
141-
AddFunc: c.addEndpoints,
142-
UpdateFunc: c.updateEndpoints,
143-
DeleteFunc: c.deleteEndpoints,
147+
endpointSliceHandler, _ := endpointSliceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
148+
AddFunc: c.addEndpointSlice,
149+
UpdateFunc: c.updateEndpointSlice,
150+
DeleteFunc: c.deleteEndpointSlice,
144151
})
145-
c.endpointsSynced = endpointsHandler.HasSynced
152+
c.endpointSlicesSynced = endpointSliceHandler.HasSynced
146153

147154
c.syncFn = c.sync
148155

@@ -239,30 +246,37 @@ func (c *AvailableConditionController) sync(key string) error {
239246
return err
240247
}
241248

242-
endpoints, err := c.endpointsLister.Endpoints(apiService.Spec.Service.Namespace).Get(apiService.Spec.Service.Name)
243-
if apierrors.IsNotFound(err) {
244-
availableCondition.Status = apiregistrationv1.ConditionFalse
245-
availableCondition.Reason = "EndpointsNotFound"
246-
availableCondition.Message = fmt.Sprintf("cannot find endpoints for service/%s in %q", apiService.Spec.Service.Name, apiService.Spec.Service.Namespace)
247-
apiregistrationv1apihelper.SetAPIServiceCondition(apiService, availableCondition)
248-
_, err := c.updateAPIServiceStatus(originalAPIService, apiService)
249-
return err
250-
} else if err != nil {
249+
endpointSlices, err := c.endpointSliceGetter.GetEndpointSlices(apiService.Spec.Service.Namespace, apiService.Spec.Service.Name)
250+
if err != nil {
251251
availableCondition.Status = apiregistrationv1.ConditionUnknown
252252
availableCondition.Reason = "EndpointsAccessError"
253253
availableCondition.Message = fmt.Sprintf("service/%s in %q cannot be checked due to: %v", apiService.Spec.Service.Name, apiService.Spec.Service.Namespace, err)
254254
apiregistrationv1apihelper.SetAPIServiceCondition(apiService, availableCondition)
255255
_, err := c.updateAPIServiceStatus(originalAPIService, apiService)
256256
return err
257+
} else if len(endpointSlices) == 0 {
258+
availableCondition.Status = apiregistrationv1.ConditionFalse
259+
availableCondition.Reason = "EndpointsNotFound"
260+
availableCondition.Message = fmt.Sprintf("cannot find endpointslices for service/%s in %q", apiService.Spec.Service.Name, apiService.Spec.Service.Namespace)
261+
apiregistrationv1apihelper.SetAPIServiceCondition(apiService, availableCondition)
262+
_, err := c.updateAPIServiceStatus(originalAPIService, apiService)
263+
return err
257264
}
258265
hasActiveEndpoints := false
259266
outer:
260-
for _, subset := range endpoints.Subsets {
261-
if len(subset.Addresses) == 0 {
267+
for _, slice := range endpointSlices {
268+
ready := false
269+
for _, endpoint := range slice.Endpoints {
270+
if endpoint.Conditions.Ready == nil || *endpoint.Conditions.Ready {
271+
ready = true
272+
break
273+
}
274+
}
275+
if !ready {
262276
continue
263277
}
264-
for _, endpointPort := range subset.Ports {
265-
if endpointPort.Name == portName {
278+
for _, endpointPort := range slice.Ports {
279+
if endpointPort.Name != nil && *endpointPort.Name == portName && endpointPort.Port != nil {
266280
hasActiveEndpoints = true
267281
break outer
268282
}
@@ -271,7 +285,7 @@ func (c *AvailableConditionController) sync(key string) error {
271285
if !hasActiveEndpoints {
272286
availableCondition.Status = apiregistrationv1.ConditionFalse
273287
availableCondition.Reason = "MissingEndpoints"
274-
availableCondition.Message = fmt.Sprintf("endpoints for service/%s in %q have no addresses with port name %q", apiService.Spec.Service.Name, apiService.Spec.Service.Namespace, portName)
288+
availableCondition.Message = fmt.Sprintf("endpointslices for service/%s in %q have no addresses with port name %q", apiService.Spec.Service.Name, apiService.Spec.Service.Namespace, portName)
275289
apiregistrationv1apihelper.SetAPIServiceCondition(apiService, availableCondition)
276290
_, err := c.updateAPIServiceStatus(originalAPIService, apiService)
277291
return err
@@ -415,7 +429,7 @@ func (c *AvailableConditionController) Run(workers int, stopCh <-chan struct{})
415429
// to be called; since the handlers are three different ways of
416430
// enqueueing the same thing, waiting for this permits the queue to
417431
// maximally de-duplicate the entries.
418-
if !controllers.WaitForCacheSync("RemoteAvailability", stopCh, c.apiServiceSynced, c.servicesSynced, c.endpointsSynced) {
432+
if !controllers.WaitForCacheSync("RemoteAvailability", stopCh, c.apiServiceSynced, c.servicesSynced, c.endpointSlicesSynced) {
419433
return
420434
}
421435

@@ -491,18 +505,13 @@ func (c *AvailableConditionController) deleteAPIService(obj interface{}) {
491505
c.queue.Add(castObj.Name)
492506
}
493507

494-
func (c *AvailableConditionController) getAPIServicesFor(obj runtime.Object) []string {
495-
metadata, err := meta.Accessor(obj)
496-
if err != nil {
497-
utilruntime.HandleError(err)
498-
return nil
499-
}
508+
func (c *AvailableConditionController) getAPIServicesFor(serviceNamespace, serviceName string) []string {
500509
c.cacheLock.RLock()
501510
defer c.cacheLock.RUnlock()
502-
return c.cache[metadata.GetNamespace()][metadata.GetName()]
511+
return c.cache[serviceNamespace][serviceName]
503512
}
504513

505-
// if the service/endpoint handler wins the race against the cache rebuilding, it may queue a no-longer-relevant apiservice
514+
// if the service/endpointslice handler wins the race against the cache rebuilding, it may queue a no-longer-relevant apiservice
506515
// (which will get processed an extra time - this doesn't matter),
507516
// and miss a newly relevant apiservice (which will get queued by the apiservice handler)
508517
func (c *AvailableConditionController) rebuildAPIServiceCache() {
@@ -526,13 +535,15 @@ func (c *AvailableConditionController) rebuildAPIServiceCache() {
526535
// TODO, think of a way to avoid checking on every service manipulation
527536

528537
func (c *AvailableConditionController) addService(obj interface{}) {
529-
for _, apiService := range c.getAPIServicesFor(obj.(*v1.Service)) {
538+
service := obj.(*v1.Service)
539+
for _, apiService := range c.getAPIServicesFor(service.Namespace, service.Name) {
530540
c.queue.Add(apiService)
531541
}
532542
}
533543

534544
func (c *AvailableConditionController) updateService(obj, _ interface{}) {
535-
for _, apiService := range c.getAPIServicesFor(obj.(*v1.Service)) {
545+
service := obj.(*v1.Service)
546+
for _, apiService := range c.getAPIServicesFor(service.Namespace, service.Name) {
536547
c.queue.Add(apiService)
537548
}
538549
}
@@ -551,38 +562,52 @@ func (c *AvailableConditionController) deleteService(obj interface{}) {
551562
return
552563
}
553564
}
554-
for _, apiService := range c.getAPIServicesFor(castObj) {
565+
for _, apiService := range c.getAPIServicesFor(castObj.Namespace, castObj.Name) {
555566
c.queue.Add(apiService)
556567
}
557568
}
558569

559-
func (c *AvailableConditionController) addEndpoints(obj interface{}) {
560-
for _, apiService := range c.getAPIServicesFor(obj.(*v1.Endpoints)) {
570+
func (c *AvailableConditionController) addEndpointSlice(obj interface{}) {
571+
slice := obj.(*discoveryv1.EndpointSlice)
572+
serviceName := slice.Labels[discoveryv1.LabelServiceName]
573+
if serviceName == "" {
574+
return
575+
}
576+
for _, apiService := range c.getAPIServicesFor(slice.Namespace, serviceName) {
561577
c.queue.Add(apiService)
562578
}
563579
}
564580

565-
func (c *AvailableConditionController) updateEndpoints(obj, _ interface{}) {
566-
for _, apiService := range c.getAPIServicesFor(obj.(*v1.Endpoints)) {
581+
func (c *AvailableConditionController) updateEndpointSlice(obj, _ interface{}) {
582+
slice := obj.(*discoveryv1.EndpointSlice)
583+
serviceName := slice.Labels[discoveryv1.LabelServiceName]
584+
if serviceName == "" {
585+
return
586+
}
587+
for _, apiService := range c.getAPIServicesFor(slice.Namespace, serviceName) {
567588
c.queue.Add(apiService)
568589
}
569590
}
570591

571-
func (c *AvailableConditionController) deleteEndpoints(obj interface{}) {
572-
castObj, ok := obj.(*v1.Endpoints)
592+
func (c *AvailableConditionController) deleteEndpointSlice(obj interface{}) {
593+
castObj, ok := obj.(*discoveryv1.EndpointSlice)
573594
if !ok {
574595
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
575596
if !ok {
576597
klog.Errorf("Couldn't get object from tombstone %#v", obj)
577598
return
578599
}
579-
castObj, ok = tombstone.Obj.(*v1.Endpoints)
600+
castObj, ok = tombstone.Obj.(*discoveryv1.EndpointSlice)
580601
if !ok {
581602
klog.Errorf("Tombstone contained object that is not expected %#v", obj)
582603
return
583604
}
584605
}
585-
for _, apiService := range c.getAPIServicesFor(castObj) {
606+
serviceName := castObj.Labels[discoveryv1.LabelServiceName]
607+
if serviceName == "" {
608+
return
609+
}
610+
for _, apiService := range c.getAPIServicesFor(castObj.Namespace, serviceName) {
586611
c.queue.Add(apiService)
587612
}
588613
}

0 commit comments

Comments
 (0)