Skip to content

Commit

Permalink
feat: ingress v1
Browse files Browse the repository at this point in the history
  • Loading branch information
tokers committed Feb 8, 2021
1 parent fe0edaf commit 2fb1203
Show file tree
Hide file tree
Showing 6 changed files with 199 additions and 6 deletions.
1 change: 1 addition & 0 deletions cmd/ingress/ingress.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ the apisix cluster and others are created`,
cmd.PersistentFlags().StringVar(&cfg.Kubernetes.Kubeconfig, "kubeconfig", "", "Kubernetes configuration file (by default in-cluster configuration will be used)")
cmd.PersistentFlags().DurationVar(&cfg.Kubernetes.ResyncInterval.Duration, "resync-interval", time.Minute, "the controller resync (with Kubernetes) interval, the minimum resync interval is 30s")
cmd.PersistentFlags().StringSliceVar(&cfg.Kubernetes.AppNamespaces, "app-namespace", []string{config.NamespaceAll}, "namespaces that controller will watch for resources")
cmd.PersistentFlags().StringVar(&cfg.Kubernetes.IngressClass, "ingress-class", config.IngressClass, "The class of an Ingress object is set using the field IngressClassName in Kubernetes clusters version v1.18.0 or higher or the annotation \"kubernetes.io/ingress.class\" (deprecated)")
cmd.PersistentFlags().StringVar(&cfg.Kubernetes.ElectionID, "election-id", config.IngressAPISIXLeader, "election id used for compaign the controller leader")
cmd.PersistentFlags().StringVar(&cfg.APISIX.BaseURL, "apisix-base-url", "", "the base URL for APISIX admin api / manager api")
cmd.PersistentFlags().StringVar(&cfg.APISIX.AdminKey, "apisix-admin-key", "", "admin key used for the authorization of APISIX admin api / manager api")
Expand Down
4 changes: 4 additions & 0 deletions conf/config-default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ kubernetes:
election_id: "ingress-apisix-leader" # the election id for the controller leader compaign,
# only the leader will watch and delivery resource changes,
# other instances (as candidates) stand by.
ingress_class: "apisix" # The class of an Ingress object is set using the field
# IngressClassName in Kubernetes clusters version v1.18.0
# or higher or the annotation "kubernetes.io/ingress.class"
# (deprecated).

# APISIX related configurations.
apisix:
Expand Down
6 changes: 6 additions & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ const (
// IngressAPISIXLeader is the default election id for the controller
// leader election.
IngressAPISIXLeader = "ingress-apisix-leader"
// IngressClass is the default ingress class name, used for Ingress
// object's IngressClassName field in Kubernetes clusters version v1.18.0
// or higher, or the annotation "kubernetes.io/ingress.class" (deprecated).
IngressClass = "apisix"

_minimalResyncInterval = 30 * time.Second
)
Expand All @@ -54,6 +58,7 @@ type KubernetesConfig struct {
ResyncInterval types.TimeDuration `json:"resync_interval" yaml:"resync_interval"`
AppNamespaces []string `json:"app_namespaces" yaml:"app_namespaces"`
ElectionID string `json:"election_id" yaml:"election_id"`
IngressClass string `json:"ingress_class" yaml:"ingress_class"`
}

// APISIXConfig contains all APISIX related config items.
Expand All @@ -76,6 +81,7 @@ func NewDefaultConfig() *Config {
ResyncInterval: types.TimeDuration{Duration: 6 * time.Hour},
AppNamespaces: []string{v1.NamespaceAll},
ElectionID: IngressAPISIXLeader,
IngressClass: IngressClass,
},
}
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func TestNewConfigFromFile(t *testing.T) {
Kubeconfig: "/path/to/foo/baz",
AppNamespaces: []string{""},
ElectionID: "my-election-id",
IngressClass: IngressClass,
},
APISIX: APISIXConfig{
BaseURL: "http://127.0.0.1:8080/apisix",
Expand Down Expand Up @@ -74,6 +75,7 @@ kubernetes:
kubeconfig: /path/to/foo/baz
resync_interval: 1h0m0s
election_id: my-election-id
ingress_class: apisix
apisix:
base_url: http://127.0.0.1:8080/apisix
admin_key: "123456"
Expand Down
29 changes: 23 additions & 6 deletions pkg/ingress/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ import (
"sync"
"time"

listersv1 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/client/listers/config/v1"
"go.uber.org/zap"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
listerscorev1 "k8s.io/client-go/listers/core/v1"
networkingv1 "k8s.io/client-go/listers/networking/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
Expand All @@ -39,6 +39,7 @@ import (
clientset "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/client/clientset/versioned"
crdclientset "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/client/clientset/versioned"
"github.com/apache/apisix-ingress-controller/pkg/kube/apisix/client/informers/externalversions"
listersv1 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/client/listers/config/v1"
"github.com/apache/apisix-ingress-controller/pkg/log"
"github.com/apache/apisix-ingress-controller/pkg/metrics"
"github.com/apache/apisix-ingress-controller/pkg/seven/conf"
Expand Down Expand Up @@ -68,15 +69,21 @@ type Controller struct {
crdInformerFactory externalversions.SharedInformerFactory

// common informers and listers
epInformer cache.SharedIndexInformer
epLister listerscorev1.EndpointsLister
svcInformer cache.SharedIndexInformer
svcLister listerscorev1.ServiceLister
epInformer cache.SharedIndexInformer
epLister listerscorev1.EndpointsLister
svcInformer cache.SharedIndexInformer
svcLister listerscorev1.ServiceLister
ingressInformer cache.SharedIndexInformer
// TODO Be compatible with networking v1beta1
ingressLister networkingv1.IngressLister
ingressClassInformer cache.SharedIndexInformer
ingressClassLister networkingv1.IngressClassLister
apisixUpstreamInformer cache.SharedIndexInformer
apisixUpstreamLister listersv1.ApisixUpstreamLister

// resource conrollers
endpointsController *endpointsController
ingressController *ingressController
apisixUpstreamController *apisixUpstreamController
}

Expand Down Expand Up @@ -130,6 +137,10 @@ func NewController(cfg *config.Config) (*Controller, error) {
epLister: kube.CoreSharedInformerFactory.Core().V1().Endpoints().Lister(),
svcInformer: kube.CoreSharedInformerFactory.Core().V1().Services().Informer(),
svcLister: kube.CoreSharedInformerFactory.Core().V1().Services().Lister(),
ingressInformer: kube.CoreSharedInformerFactory.Networking().V1().Ingresses().Informer(),
ingressLister: kube.CoreSharedInformerFactory.Networking().V1().Ingresses().Lister(),
ingressClassInformer: kube.CoreSharedInformerFactory.Networking().V1().IngressClasses().Informer(),
ingressClassLister: kube.CoreSharedInformerFactory.Networking().V1().IngressClasses().Lister(),
apisixUpstreamInformer: sharedInformerFactory.Apisix().V1().ApisixUpstreams().Informer(),
apisixUpstreamLister: sharedInformerFactory.Apisix().V1().ApisixUpstreams().Lister(),
}
Expand All @@ -141,6 +152,7 @@ func NewController(cfg *config.Config) (*Controller, error) {

c.endpointsController = c.newEndpointsController()
c.apisixUpstreamController = c.newApisixUpstreamController()
c.ingressController = c.newIngressController()

return c, nil
}
Expand Down Expand Up @@ -259,7 +271,12 @@ func (c *Controller) run(ctx context.Context) {
c.goAttach(func() {
c.svcInformer.Run(ctx.Done())
})

c.goAttach(func() {
c.ingressInformer.Run(ctx.Done())
})
c.goAttach(func() {
c.ingressClassInformer.Run(ctx.Done())
})
c.goAttach(func() {
c.endpointsController.run(ctx)
})
Expand Down
163 changes: 163 additions & 0 deletions pkg/ingress/controller/ingress.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package controller

import (
"context"

"go.uber.org/zap"
networkingv1 "k8s.io/api/networking/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"

"github.com/apache/apisix-ingress-controller/pkg/log"
"github.com/apache/apisix-ingress-controller/pkg/types"
)

type ingressController struct {
controller *Controller
workqueue workqueue.RateLimitingInterface
workers int
}

func (c *Controller) newIngressController() *ingressController {
ctl := &ingressController{
controller: c,
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ingress"),
workers: 1,
}

c.ingressInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: ctl.onAdd,
UpdateFunc: ctl.onUpdate,
DeleteFunc: ctl.OnDelete,
})
return ctl
}

func (c *ingressController) run(ctx context.Context) {
log.Info("ingress controller started")
defer log.Infof("ingress controller exited")

if !cache.WaitForCacheSync(ctx.Done(), c.controller.ingressInformer.HasSynced, c.controller.ingressClassInformer.HasSynced) {
log.Errorf("cache sync failed")
return
}
for i := 0; i < c.workers; i++ {
go c.runWorker(ctx)
}
<-ctx.Done()
c.workqueue.ShutDown()
}

func (c *ingressController) runWorker(ctx context.Context) {
for {
obj, quit := c.workqueue.Get()
if quit {
return
}
err := c.sync(ctx, obj.(*types.Event))
c.workqueue.Done(obj)
c.handleSyncErr(obj, err)
}
}

func (c *ingressController) sync(ctx context.Context, ev *types.Event) error {
return nil
}

func (c *ingressController) handleSyncErr(obj interface{}, err error) {
if err == nil {
c.workqueue.Forget(obj)
return
}
if c.workqueue.NumRequeues(obj) < _maxRetries {
log.Infow("sync ingress failed, will retry",
zap.Any("object", obj),
)
c.workqueue.AddRateLimited(obj)
} else {
c.workqueue.Forget(obj)
log.Warnf("drop ingress %+v out of the queue", obj)
}
}

func (c *ingressController) onAdd(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err != nil {
log.Errorf("found ingress resource with bad meta namesapce key: %s", err)
return
}
if !c.controller.namespaceWatching(key) {
return
}
log.Debugw("ingress add event arrived",
zap.Any("object", obj))

c.workqueue.AddRateLimited(&types.Event{
Type: types.EventAdd,
Object: key,
})
}

func (c *ingressController) onUpdate(oldObj, newObj interface{}) {
prev := oldObj.(*networkingv1.Ingress)
curr := newObj.(*networkingv1.Ingress)
if prev.ResourceVersion >= curr.ResourceVersion {
return
}
key, err := cache.MetaNamespaceKeyFunc(newObj)
if err != nil {
log.Errorf("found ingress resource with bad meta namespace key: %s", err)
return
}
log.Debugw("ingress update event arrived",
zap.Any("new object", curr),
zap.Any("old object", prev),
)

c.workqueue.AddRateLimited(&types.Event{
Type: types.EventUpdate,
Object: key,
})
}

func (c *ingressController) OnDelete(obj interface{}) {
au, ok := obj.(*networkingv1.Ingress)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
return
}
au = tombstone.Obj.(*networkingv1.Ingress)
}

key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err != nil {
log.Errorf("found ingress resource with bad meta namespace key: %s", err)
return
}
if !c.controller.namespaceWatching(key) {
return
}
log.Debugw("ingress delete event arrived",
zap.Any("final state", au),
)
c.workqueue.AddRateLimited(&types.Event{
Type: types.EventDelete,
Object: key,
Tombstone: au,
})
}

0 comments on commit 2fb1203

Please sign in to comment.