Skip to content

Commit

Permalink
feat: add pod controller and pod cache (#490)
Browse files Browse the repository at this point in the history
  • Loading branch information
tokers authored May 28, 2021
1 parent 23e5ebd commit cddcd29
Show file tree
Hide file tree
Showing 6 changed files with 379 additions and 0 deletions.
14 changes: 14 additions & 0 deletions pkg/ingress/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ type Controller struct {
wg sync.WaitGroup
watchingNamespace map[string]struct{}
apisix apisix.APISIX
podCache types.PodCache
translator translation.Translator
apiServer *api.Server
metricsCollector metrics.Collector
Expand All @@ -84,6 +85,8 @@ type Controller struct {
leaderContextCancelFunc context.CancelFunc

// common informers and listers
podInformer cache.SharedIndexInformer
podLister listerscorev1.PodLister
epInformer cache.SharedIndexInformer
epLister listerscorev1.EndpointsLister
svcInformer cache.SharedIndexInformer
Expand All @@ -102,6 +105,7 @@ type Controller struct {
apisixClusterConfigInformer cache.SharedIndexInformer

// resource controllers
podController *podController
endpointsController *endpointsController
ingressController *ingressController
secretController *secretController
Expand Down Expand Up @@ -160,6 +164,7 @@ func NewController(cfg *config.Config) (*Controller, error) {
watchingNamespace: watchingNamespace,
secretSSLMap: new(sync.Map),
recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: _component}),
podCache: types.NewPodCache(),
}
return c, nil
}
Expand All @@ -173,6 +178,7 @@ func (c *Controller) initWhenStartLeading() {
kubeFactory := c.kubeClient.NewSharedIndexInformerFactory()
apisixFactory := c.kubeClient.NewAPISIXSharedIndexInformerFactory()

c.podLister = kubeFactory.Core().V1().Pods().Lister()
c.epLister = kubeFactory.Core().V1().Endpoints().Lister()
c.svcLister = kubeFactory.Core().V1().Services().Lister()
c.ingressLister = kube.NewIngressLister(
Expand Down Expand Up @@ -209,6 +215,7 @@ func (c *Controller) initWhenStartLeading() {
apisixRouteInformer = apisixFactory.Apisix().V1().ApisixRoutes().Informer()
}

c.podInformer = kubeFactory.Core().V1().Pods().Informer()
c.epInformer = kubeFactory.Core().V1().Endpoints().Informer()
c.svcInformer = kubeFactory.Core().V1().Services().Informer()
c.ingressInformer = ingressInformer
Expand All @@ -218,6 +225,7 @@ func (c *Controller) initWhenStartLeading() {
c.secretInformer = kubeFactory.Core().V1().Secrets().Informer()
c.apisixTlsInformer = apisixFactory.Apisix().V1().ApisixTlses().Informer()

c.podController = c.newPodController()
c.endpointsController = c.newEndpointsController()
c.apisixUpstreamController = c.newApisixUpstreamController()
c.ingressController = c.newIngressController()
Expand Down Expand Up @@ -369,6 +377,9 @@ func (c *Controller) run(ctx context.Context) {
c.goAttach(func() {
c.checkClusterHealth(ctx, cancelFunc)
})
c.goAttach(func() {
c.podInformer.Run(ctx.Done())
})
c.goAttach(func() {
c.epInformer.Run(ctx.Done())
})
Expand All @@ -393,6 +404,9 @@ func (c *Controller) run(ctx context.Context) {
c.goAttach(func() {
c.apisixTlsInformer.Run(ctx.Done())
})
c.goAttach(func() {
c.podController.run(ctx)
})
c.goAttach(func() {
c.endpointsController.run(ctx)
})
Expand Down
100 changes: 100 additions & 0 deletions pkg/ingress/pod.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package ingress

import (
"context"

"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/cache"

"github.com/apache/apisix-ingress-controller/pkg/log"
)

type podController struct {
controller *Controller
}

func (c *Controller) newPodController() *podController {
ctl := &podController{
controller: c,
}
ctl.controller.podInformer.AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: ctl.onAdd,
DeleteFunc: ctl.onDelete,
},
)
return ctl
}

func (c *podController) run(ctx context.Context) {
log.Info("pod controller started")
defer log.Info("pod controller exited")

if ok := cache.WaitForCacheSync(ctx.Done(), c.controller.podInformer.HasSynced); !ok {
log.Error("informers sync failed")
return
}

<-ctx.Done()
}

func (c *podController) onAdd(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err != nil {
log.Errorf("found pod with bad namespace/name: %s, ignore it", err)
return
}
if !c.controller.namespaceWatching(key) {
return
}
log.Debugw("pod add event arrived",
zap.Any("object", obj),
)
pod := obj.(*corev1.Pod)
if err := c.controller.podCache.Add(pod); err != nil {
log.Errorw("failed to add pod to cache",
zap.Error(err),
zap.Any("pod", pod),
)
}
}

func (c *podController) onDelete(obj interface{}) {
pod, ok := obj.(*corev1.Pod)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
log.Errorf("found pod: %+v in bad tombstone state", obj)
return
}
pod = tombstone.Obj.(*corev1.Pod)
}

if !c.controller.namespaceWatching(pod.Namespace + "/" + pod.Name) {
return
}
log.Debugw("pod delete event arrived",
zap.Any("final state", pod),
)
if err := c.controller.podCache.Delete(pod); err != nil {
log.Errorw("failed to delete pod from cache",
zap.Error(err),
zap.Any("pod", pod),
)
}
}
111 changes: 111 additions & 0 deletions pkg/ingress/pod_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package ingress

import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/apache/apisix-ingress-controller/pkg/types"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func TestPodOnAdd(t *testing.T) {
ctl := &podController{
controller: &Controller{
watchingNamespace: map[string]struct{}{
"default": {},
},
podCache: types.NewPodCache(),
},
}

pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "nginx",
},
Status: corev1.PodStatus{
Phase: corev1.PodRunning,
PodIP: "10.0.5.12",
},
}
ctl.onAdd(pod)
name, err := ctl.controller.podCache.GetNameByIP("10.0.5.12")
assert.Nil(t, err)
assert.Equal(t, name, "nginx")

pod2 := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: "public",
Name: "abc",
},
Status: corev1.PodStatus{
Phase: corev1.PodRunning,
PodIP: "10.0.5.13",
},
}
ctl.onAdd(pod2)
name, err = ctl.controller.podCache.GetNameByIP("10.0.5.13")
assert.Empty(t, name)
assert.Equal(t, err, types.ErrPodNotFound)
}

func TestPodOnDelete(t *testing.T) {
ctl := &podController{
controller: &Controller{
watchingNamespace: map[string]struct{}{
"default": {},
},
podCache: types.NewPodCache(),
},
}

pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "nginx",
},
Status: corev1.PodStatus{
Phase: corev1.PodRunning,
PodIP: "10.0.5.12",
},
}
assert.Nil(t, ctl.controller.podCache.Add(pod), "adding pod")

ctl.onDelete(pod)
name, err := ctl.controller.podCache.GetNameByIP("10.0.5.12")
assert.Empty(t, name)
assert.Equal(t, err, types.ErrPodNotFound)

pod2 := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: "public",
Name: "abc",
},
Status: corev1.PodStatus{
Phase: corev1.PodRunning,
PodIP: "10.0.5.13",
},
}
assert.Nil(t, ctl.controller.podCache.Add(pod2), "adding pod")
ctl.onDelete(pod2)
name, err = ctl.controller.podCache.GetNameByIP("10.0.5.13")
assert.Equal(t, name, "abc")
assert.Nil(t, err)
}
1 change: 1 addition & 0 deletions pkg/kube/translation/translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ type Translator interface {
// TranslatorOptions contains options to help Translator
// work well.
type TranslatorOptions struct {
PodLister listerscorev1.PodLister
EndpointsLister listerscorev1.EndpointsLister
ServiceLister listerscorev1.ServiceLister
ApisixUpstreamLister listersv1.ApisixUpstreamLister
Expand Down
84 changes: 84 additions & 0 deletions pkg/types/pod.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package types

import (
"errors"
"sync"

corev1 "k8s.io/api/core/v1"
)

var (
// ErrPodNotRunning represents that PodCache operation is failed due to the
// target Pod is not in Running phase.
ErrPodNotRunning = errors.New("pod not running")
// ErrPodNotFound represents that the target pod not found from the PodCache.
ErrPodNotFound = errors.New("pod not found")
)

// PodCache caches pod. Currently it doesn't cache the pod object but only its
// name.
type PodCache interface {
// Add adds a pod to cache, only pod which state is RUNNING will be
// accepted.
Add(*corev1.Pod) error
// Delete deletes a pod from the cache
Delete(*corev1.Pod) error
// GetNameByIP returns the pod name according to the given pod IP.
GetNameByIP(string) (string, error)
}

type podCache struct {
sync.RWMutex
nameByIP map[string]string
}

// NewPodCache creates a PodCache object.
func NewPodCache() PodCache {
return &podCache{
nameByIP: make(map[string]string),
}
}

func (p *podCache) Add(pod *corev1.Pod) error {
if pod.Status.Phase != corev1.PodRunning {
return ErrPodNotRunning
}
p.Lock()
defer p.Unlock()
p.nameByIP[pod.Status.PodIP] = pod.Name
return nil
}

func (p *podCache) Delete(pod *corev1.Pod) error {
p.Lock()
defer p.Unlock()
if _, ok := p.nameByIP[pod.Status.PodIP]; !ok {
return ErrPodNotFound
}
delete(p.nameByIP, pod.Status.PodIP)
return nil
}

func (p *podCache) GetNameByIP(ip string) (name string, err error) {
p.RLock()
defer p.RUnlock()
name, ok := p.nameByIP[ip]
if !ok {
err = ErrPodNotFound
}
return
}
Loading

0 comments on commit cddcd29

Please sign in to comment.