Skip to content

Commit 7505a1c

Browse files
danwinshipk8s-publishing-bot
authored andcommitted
Port aggregated apiserver discovery to EndpointSlices
Co-Authored-by: Jordan Liggitt <liggitt@google.com> Kubernetes-commit: d70e7e223cf5a02d0bdf557a57da11a276ec1e8c
1 parent 2aa3afa commit 7505a1c

File tree

4 files changed

+166
-103
lines changed

4 files changed

+166
-103
lines changed

pkg/apiserver/apiserver.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -334,7 +334,7 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
334334
remote, err := remoteavailability.New(
335335
informerFactory.Apiregistration().V1().APIServices(),
336336
c.GenericConfig.SharedInformerFactory.Core().V1().Services(),
337-
c.GenericConfig.SharedInformerFactory.Core().V1().Endpoints(),
337+
c.GenericConfig.SharedInformerFactory.Discovery().V1().EndpointSlices(),
338338
apiregistrationClient.ApiregistrationV1(),
339339
proxyTransportDial,
340340
(func() ([]byte, []byte))(s.proxyCurrentCertKeyContent),

pkg/apiserver/resolvers.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,20 +30,20 @@ type ServiceResolver interface {
3030

3131
// NewEndpointServiceResolver returns a ServiceResolver that chooses one of the
3232
// service's endpoints.
33-
func NewEndpointServiceResolver(services listersv1.ServiceLister, endpoints listersv1.EndpointsLister) ServiceResolver {
33+
func NewEndpointServiceResolver(services listersv1.ServiceLister, endpointSliceGetter proxy.EndpointSliceGetter) ServiceResolver {
3434
return &aggregatorEndpointRouting{
35-
services: services,
36-
endpoints: endpoints,
35+
services: services,
36+
endpointSliceGetter: endpointSliceGetter,
3737
}
3838
}
3939

4040
type aggregatorEndpointRouting struct {
41-
services listersv1.ServiceLister
42-
endpoints listersv1.EndpointsLister
41+
services listersv1.ServiceLister
42+
endpointSliceGetter proxy.EndpointSliceGetter
4343
}
4444

4545
func (r *aggregatorEndpointRouting) ResolveEndpoint(namespace, name string, port int32) (*url.URL, error) {
46-
return proxy.ResolveEndpoint(r.services, r.endpoints, namespace, name, port)
46+
return proxy.ResolveEndpoint(r.services, r.endpointSliceGetter, namespace, name, port)
4747
}
4848

4949
// NewClusterIPServiceResolver returns a ServiceResolver that directly calls the

pkg/controllers/status/remote/remote_available_controller.go

Lines changed: 74 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,16 @@ import (
2626
"time"
2727

2828
v1 "k8s.io/api/core/v1"
29+
discoveryv1 "k8s.io/api/discovery/v1"
2930
"k8s.io/apimachinery/pkg/api/equality"
3031
apierrors "k8s.io/apimachinery/pkg/api/errors"
31-
"k8s.io/apimachinery/pkg/api/meta"
3232
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3333
"k8s.io/apimachinery/pkg/labels"
34-
"k8s.io/apimachinery/pkg/runtime"
3534
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
3635
"k8s.io/apimachinery/pkg/util/wait"
36+
"k8s.io/apiserver/pkg/util/proxy"
3737
v1informers "k8s.io/client-go/informers/core/v1"
38+
discoveryv1informers "k8s.io/client-go/informers/discovery/v1"
3839
v1listers "k8s.io/client-go/listers/core/v1"
3940
"k8s.io/client-go/tools/cache"
4041
"k8s.io/client-go/transport"
@@ -67,8 +68,8 @@ type AvailableConditionController struct {
6768
serviceLister v1listers.ServiceLister
6869
servicesSynced cache.InformerSynced
6970

70-
endpointsLister v1listers.EndpointsLister
71-
endpointsSynced cache.InformerSynced
71+
endpointSliceGetter proxy.EndpointSliceGetter
72+
endpointSlicesSynced cache.InformerSynced
7273

7374
// proxyTransportDial specifies the dial function for creating unencrypted TCP connections.
7475
proxyTransportDial *transport.DialHolder
@@ -92,19 +93,25 @@ type AvailableConditionController struct {
9293
func New(
9394
apiServiceInformer informers.APIServiceInformer,
9495
serviceInformer v1informers.ServiceInformer,
95-
endpointsInformer v1informers.EndpointsInformer,
96+
endpointSliceInformer discoveryv1informers.EndpointSliceInformer,
9697
apiServiceClient apiregistrationclient.APIServicesGetter,
9798
proxyTransportDial *transport.DialHolder,
9899
proxyCurrentCertKeyContent certKeyFunc,
99100
serviceResolver ServiceResolver,
100101
metrics *availabilitymetrics.Metrics,
101102
) (*AvailableConditionController, error) {
103+
104+
endpointSliceGetter, err := proxy.NewEndpointSliceIndexerGetter(endpointSliceInformer)
105+
if err != nil {
106+
return nil, err
107+
}
108+
102109
c := &AvailableConditionController{
103-
apiServiceClient: apiServiceClient,
104-
apiServiceLister: apiServiceInformer.Lister(),
105-
serviceLister: serviceInformer.Lister(),
106-
endpointsLister: endpointsInformer.Lister(),
107-
serviceResolver: serviceResolver,
110+
apiServiceClient: apiServiceClient,
111+
apiServiceLister: apiServiceInformer.Lister(),
112+
serviceLister: serviceInformer.Lister(),
113+
endpointSliceGetter: endpointSliceGetter,
114+
serviceResolver: serviceResolver,
108115
queue: workqueue.NewTypedRateLimitingQueueWithConfig(
109116
// We want a fairly tight requeue time. The controller listens to the API, but because it relies on the routability of the
110117
// service network, it is possible for an external, non-watchable factor to affect availability. This keeps
@@ -137,12 +144,12 @@ func New(
137144
})
138145
c.servicesSynced = serviceHandler.HasSynced
139146

140-
endpointsHandler, _ := endpointsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
141-
AddFunc: c.addEndpoints,
142-
UpdateFunc: c.updateEndpoints,
143-
DeleteFunc: c.deleteEndpoints,
147+
endpointSliceHandler, _ := endpointSliceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
148+
AddFunc: c.addEndpointSlice,
149+
UpdateFunc: c.updateEndpointSlice,
150+
DeleteFunc: c.deleteEndpointSlice,
144151
})
145-
c.endpointsSynced = endpointsHandler.HasSynced
152+
c.endpointSlicesSynced = endpointSliceHandler.HasSynced
146153

147154
c.syncFn = c.sync
148155

@@ -239,30 +246,37 @@ func (c *AvailableConditionController) sync(key string) error {
239246
return err
240247
}
241248

242-
endpoints, err := c.endpointsLister.Endpoints(apiService.Spec.Service.Namespace).Get(apiService.Spec.Service.Name)
243-
if apierrors.IsNotFound(err) {
244-
availableCondition.Status = apiregistrationv1.ConditionFalse
245-
availableCondition.Reason = "EndpointsNotFound"
246-
availableCondition.Message = fmt.Sprintf("cannot find endpoints for service/%s in %q", apiService.Spec.Service.Name, apiService.Spec.Service.Namespace)
247-
apiregistrationv1apihelper.SetAPIServiceCondition(apiService, availableCondition)
248-
_, err := c.updateAPIServiceStatus(originalAPIService, apiService)
249-
return err
250-
} else if err != nil {
249+
endpointSlices, err := c.endpointSliceGetter.GetEndpointSlices(apiService.Spec.Service.Namespace, apiService.Spec.Service.Name)
250+
if err != nil {
251251
availableCondition.Status = apiregistrationv1.ConditionUnknown
252252
availableCondition.Reason = "EndpointsAccessError"
253253
availableCondition.Message = fmt.Sprintf("service/%s in %q cannot be checked due to: %v", apiService.Spec.Service.Name, apiService.Spec.Service.Namespace, err)
254254
apiregistrationv1apihelper.SetAPIServiceCondition(apiService, availableCondition)
255255
_, err := c.updateAPIServiceStatus(originalAPIService, apiService)
256256
return err
257+
} else if len(endpointSlices) == 0 {
258+
availableCondition.Status = apiregistrationv1.ConditionFalse
259+
availableCondition.Reason = "EndpointsNotFound"
260+
availableCondition.Message = fmt.Sprintf("cannot find endpointslices for service/%s in %q", apiService.Spec.Service.Name, apiService.Spec.Service.Namespace)
261+
apiregistrationv1apihelper.SetAPIServiceCondition(apiService, availableCondition)
262+
_, err := c.updateAPIServiceStatus(originalAPIService, apiService)
263+
return err
257264
}
258265
hasActiveEndpoints := false
259266
outer:
260-
for _, subset := range endpoints.Subsets {
261-
if len(subset.Addresses) == 0 {
267+
for _, slice := range endpointSlices {
268+
ready := false
269+
for _, endpoint := range slice.Endpoints {
270+
if endpoint.Conditions.Ready == nil || *endpoint.Conditions.Ready {
271+
ready = true
272+
break
273+
}
274+
}
275+
if !ready {
262276
continue
263277
}
264-
for _, endpointPort := range subset.Ports {
265-
if endpointPort.Name == portName {
278+
for _, endpointPort := range slice.Ports {
279+
if endpointPort.Name != nil && *endpointPort.Name == portName && endpointPort.Port != nil {
266280
hasActiveEndpoints = true
267281
break outer
268282
}
@@ -271,7 +285,7 @@ func (c *AvailableConditionController) sync(key string) error {
271285
if !hasActiveEndpoints {
272286
availableCondition.Status = apiregistrationv1.ConditionFalse
273287
availableCondition.Reason = "MissingEndpoints"
274-
availableCondition.Message = fmt.Sprintf("endpoints for service/%s in %q have no addresses with port name %q", apiService.Spec.Service.Name, apiService.Spec.Service.Namespace, portName)
288+
availableCondition.Message = fmt.Sprintf("endpointslices for service/%s in %q have no addresses with port name %q", apiService.Spec.Service.Name, apiService.Spec.Service.Namespace, portName)
275289
apiregistrationv1apihelper.SetAPIServiceCondition(apiService, availableCondition)
276290
_, err := c.updateAPIServiceStatus(originalAPIService, apiService)
277291
return err
@@ -415,7 +429,7 @@ func (c *AvailableConditionController) Run(workers int, stopCh <-chan struct{})
415429
// to be called; since the handlers are three different ways of
416430
// enqueueing the same thing, waiting for this permits the queue to
417431
// maximally de-duplicate the entries.
418-
if !controllers.WaitForCacheSync("RemoteAvailability", stopCh, c.apiServiceSynced, c.servicesSynced, c.endpointsSynced) {
432+
if !controllers.WaitForCacheSync("RemoteAvailability", stopCh, c.apiServiceSynced, c.servicesSynced, c.endpointSlicesSynced) {
419433
return
420434
}
421435

@@ -491,18 +505,13 @@ func (c *AvailableConditionController) deleteAPIService(obj interface{}) {
491505
c.queue.Add(castObj.Name)
492506
}
493507

494-
func (c *AvailableConditionController) getAPIServicesFor(obj runtime.Object) []string {
495-
metadata, err := meta.Accessor(obj)
496-
if err != nil {
497-
utilruntime.HandleError(err)
498-
return nil
499-
}
508+
func (c *AvailableConditionController) getAPIServicesFor(serviceNamespace, serviceName string) []string {
500509
c.cacheLock.RLock()
501510
defer c.cacheLock.RUnlock()
502-
return c.cache[metadata.GetNamespace()][metadata.GetName()]
511+
return c.cache[serviceNamespace][serviceName]
503512
}
504513

505-
// if the service/endpoint handler wins the race against the cache rebuilding, it may queue a no-longer-relevant apiservice
514+
// if the service/endpointslice handler wins the race against the cache rebuilding, it may queue a no-longer-relevant apiservice
506515
// (which will get processed an extra time - this doesn't matter),
507516
// and miss a newly relevant apiservice (which will get queued by the apiservice handler)
508517
func (c *AvailableConditionController) rebuildAPIServiceCache() {
@@ -526,13 +535,15 @@ func (c *AvailableConditionController) rebuildAPIServiceCache() {
526535
// TODO, think of a way to avoid checking on every service manipulation
527536

528537
func (c *AvailableConditionController) addService(obj interface{}) {
529-
for _, apiService := range c.getAPIServicesFor(obj.(*v1.Service)) {
538+
service := obj.(*v1.Service)
539+
for _, apiService := range c.getAPIServicesFor(service.Namespace, service.Name) {
530540
c.queue.Add(apiService)
531541
}
532542
}
533543

534544
func (c *AvailableConditionController) updateService(obj, _ interface{}) {
535-
for _, apiService := range c.getAPIServicesFor(obj.(*v1.Service)) {
545+
service := obj.(*v1.Service)
546+
for _, apiService := range c.getAPIServicesFor(service.Namespace, service.Name) {
536547
c.queue.Add(apiService)
537548
}
538549
}
@@ -551,38 +562,52 @@ func (c *AvailableConditionController) deleteService(obj interface{}) {
551562
return
552563
}
553564
}
554-
for _, apiService := range c.getAPIServicesFor(castObj) {
565+
for _, apiService := range c.getAPIServicesFor(castObj.Namespace, castObj.Name) {
555566
c.queue.Add(apiService)
556567
}
557568
}
558569

559-
func (c *AvailableConditionController) addEndpoints(obj interface{}) {
560-
for _, apiService := range c.getAPIServicesFor(obj.(*v1.Endpoints)) {
570+
func (c *AvailableConditionController) addEndpointSlice(obj interface{}) {
571+
slice := obj.(*discoveryv1.EndpointSlice)
572+
serviceName := slice.Labels[discoveryv1.LabelServiceName]
573+
if serviceName == "" {
574+
return
575+
}
576+
for _, apiService := range c.getAPIServicesFor(slice.Namespace, serviceName) {
561577
c.queue.Add(apiService)
562578
}
563579
}
564580

565-
func (c *AvailableConditionController) updateEndpoints(obj, _ interface{}) {
566-
for _, apiService := range c.getAPIServicesFor(obj.(*v1.Endpoints)) {
581+
func (c *AvailableConditionController) updateEndpointSlice(obj, _ interface{}) {
582+
slice := obj.(*discoveryv1.EndpointSlice)
583+
serviceName := slice.Labels[discoveryv1.LabelServiceName]
584+
if serviceName == "" {
585+
return
586+
}
587+
for _, apiService := range c.getAPIServicesFor(slice.Namespace, serviceName) {
567588
c.queue.Add(apiService)
568589
}
569590
}
570591

571-
func (c *AvailableConditionController) deleteEndpoints(obj interface{}) {
572-
castObj, ok := obj.(*v1.Endpoints)
592+
func (c *AvailableConditionController) deleteEndpointSlice(obj interface{}) {
593+
castObj, ok := obj.(*discoveryv1.EndpointSlice)
573594
if !ok {
574595
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
575596
if !ok {
576597
klog.Errorf("Couldn't get object from tombstone %#v", obj)
577598
return
578599
}
579-
castObj, ok = tombstone.Obj.(*v1.Endpoints)
600+
castObj, ok = tombstone.Obj.(*discoveryv1.EndpointSlice)
580601
if !ok {
581602
klog.Errorf("Tombstone contained object that is not expected %#v", obj)
582603
return
583604
}
584605
}
585-
for _, apiService := range c.getAPIServicesFor(castObj) {
606+
serviceName := castObj.Labels[discoveryv1.LabelServiceName]
607+
if serviceName == "" {
608+
return
609+
}
610+
for _, apiService := range c.getAPIServicesFor(castObj.Namespace, serviceName) {
586611
c.queue.Add(apiService)
587612
}
588613
}

0 commit comments

Comments
 (0)