Skip to content

Commit

Permalink
fix ingress
Browse files Browse the repository at this point in the history
Signed-off-by: Ling Samuel <lingsamuelgrace@gmail.com>
  • Loading branch information
lingsamuel committed Apr 13, 2023
1 parent 5e8dd3f commit edeef70
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 109 deletions.
179 changes: 70 additions & 109 deletions pkg/providers/ingress/ingress.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ import (
"github.com/apache/apisix-ingress-controller/pkg/providers/translation"
"github.com/apache/apisix-ingress-controller/pkg/providers/utils"
"github.com/apache/apisix-ingress-controller/pkg/types"
v1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1"
)

const (
Expand All @@ -55,12 +54,12 @@ type ingressController struct {

pool pool.Pool

// secretSSLMap stores reference from K8s secret to Ingress
// type: Map<SecretKey, Map<IngressVersionKey, SSL in APISIX>>
// SecretKey -> IngressVersionKey -> []string
// secretRefMap stores reference from K8s secret to Ingress
// type: Map<SecretKey, Map<IngressVersionKey, empty struct>>
// SecretKey -> IngressVersionKey -> empty struct
// Secret key is kube-style meta key: `namespace/name`
// Ingress Version Key is: `namespace/name_groupVersion`
secretSSLMap *sync.Map
secretRefMap *sync.Map
}

func newIngressController(common *ingressCommon) *ingressController {
Expand All @@ -71,7 +70,7 @@ func newIngressController(common *ingressCommon) *ingressController {
workers: 1,
pool: pool.NewLimited(2),

secretSSLMap: new(sync.Map),
secretRefMap: new(sync.Map),
}

c.IngressInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
Expand Down Expand Up @@ -131,7 +130,7 @@ func (c *ingressController) sync(ctx context.Context, ev *types.Event) error {
}

if err != nil {
if !k8serrors.IsNotFound(err) {
if !k8serrors.IsNotFound(err) || (k8serrors.IsNotFound(err) && ev.Type != types.EventDelete) {
log.Errorf("failed to get ingress %s (group version: %s): %s", ingEv.Key, ingEv.GroupVersion, err)
return err
}
Expand All @@ -142,6 +141,7 @@ func (c *ingressController) sync(ctx context.Context, ev *types.Event) error {
return nil
}
}

if ev.Type == types.EventDelete {
if ing != nil {
// We still find the resource while we are processing the DELETE event,
Expand All @@ -152,6 +152,36 @@ func (c *ingressController) sync(ctx context.Context, ev *types.Event) error {
}
ing = ev.Tombstone.(kube.Ingress)
}
if ing == nil {
log.Errorw("failed to get ingress",
zap.Any("event", ev),
zap.Error(err),
)
return fmt.Errorf("failed to get ingress")
}

var secrets []string
switch ingEv.GroupVersion {
case kube.IngressV1:
for _, tls := range ing.V1().Spec.TLS {
secrets = append(secrets, tls.SecretName)
}
case kube.IngressV1beta1:
for _, tls := range ing.V1beta1().Spec.TLS {
secrets = append(secrets, tls.SecretName)
}
case kube.IngressExtensionsV1beta1:
for _, tls := range ing.ExtensionsV1beta1().Spec.TLS {
secrets = append(secrets, tls.SecretName)
}
}

for _, secret := range secrets {
// We don't support annotation in Ingress
// _caAnnotation = "nginx.ingress.kubernetes.io/auth-tls-secret"
c.storeSecretReference(namespace+"/"+secret, ingEv.Key+"_"+ingEv.GroupVersion, ev.Type)
}

{
if ev.Type == types.EventDelete {
tctx, err = c.translator.TranslateIngressDeleteEvent(ing)
Expand All @@ -166,16 +196,6 @@ func (c *ingressController) sync(ctx context.Context, ev *types.Event) error {
goto updateStatus
}

for _, ssl := range tctx.SSL {
ns, ok1 := ssl.Labels[translation.MetaSecretNamespace]
sec, ok2 := ssl.Labels[translation.MetaSecretName]
if ok1 && ok2 {
// We don't support annotation in Ingress
// _caAnnotation = "nginx.ingress.kubernetes.io/auth-tls-secret"
c.storeSecretReference(ns+"/"+sec, ingEv.Key, ev.Type, ssl)
}
}

log.Debugw("translated ingress resource to a couple of routes, upstreams and pluginConfigs",
zap.Any("ingress", ing),
zap.Any("routes", tctx.Routes),
Expand Down Expand Up @@ -573,118 +593,59 @@ func (c *ingressController) ingressLBStatusIPs() ([]corev1.LoadBalancerIngress,
return utils.IngressLBStatusIPs(c.IngressPublishService, c.IngressStatusAddress, c.SvcLister)
}

func (c *ingressController) storeSecretReference(secretKey string, ingressKey string, evType types.EventType, ssl *v1.Ssl) {
if ssls, ok := c.secretSSLMap.Load(secretKey); ok {
sslMap := ssls.(*sync.Map)
func (c *ingressController) storeSecretReference(secretKey string, ingressKey string, evType types.EventType) {
if refs, ok := c.secretRefMap.Load(secretKey); ok {
refMap := refs.(*sync.Map)
switch evType {
case types.EventDelete:
sslMap.Delete(ingressKey)
c.secretSSLMap.Store(secretKey, sslMap)
refMap.Delete(ingressKey)
c.secretRefMap.Store(secretKey, refMap)
default:
sslMap.Store(ingressKey, ssl)
c.secretSSLMap.Store(secretKey, sslMap)
refMap.Store(ingressKey, struct{}{})
c.secretRefMap.Store(secretKey, refMap)
}
} else if evType != types.EventDelete {
sslMap := new(sync.Map)
sslMap.Store(ingressKey, ssl)
c.secretSSLMap.Store(secretKey, sslMap)
refMap := new(sync.Map)
refMap.Store(ingressKey, struct{}{})
c.secretRefMap.Store(secretKey, refMap)
}
}

func (c *ingressController) SyncSecretChange(ctx context.Context, ev *types.Event, secret *corev1.Secret, secretKey string) {
ssls, ok := c.secretSSLMap.Load(secretKey)
refs, ok := c.secretRefMap.Load(secretKey)
if !ok {
log.Debugw("Ingress: sync secret change, not concerned", zap.String("key", secretKey))
// This secret is not concerned.
return
}

sslMap, ok := ssls.(*sync.Map) // ingress version key -> SSL
refMap, ok := refs.(*sync.Map) // ingress version key -> empty struct
if !ok {
log.Debugw("Ingress: sync secret change, not such key map", zap.String("key", secretKey))
return
}

sslMap.Range(func(k, v interface{}) bool {
refMap.Range(func(k, v interface{}) bool {
ingressVersionKey := k.(string)
ssl := v.(*v1.Ssl)
return c.syncSSLs(ctx, ev.Type, secret, secretKey, ingressVersionKey, ssl)
})
}

func (c *ingressController) syncSSLs(ctx context.Context, evType types.EventType, secret *corev1.Secret, secretKey, ingressVersionKey string, ssl *v1.Ssl) bool {
vals := strings.Split(ingressVersionKey, "_")
if len(vals) != 2 {
log.Errorw("cache recorded invalid ingress version key",
zap.String("key", ingressVersionKey),
)
}
ingressKey := vals[0]
ingressVersion := vals[1]

ingressNamespace, ingressName, err := cache.SplitMetaNamespaceKey(ingressKey)
if err != nil {
log.Errorf("invalid cached ApisixTls key: %s", ingressKey)
return true
}
vals := strings.Split(ingressVersionKey, "_")
if len(vals) != 2 {
log.Errorw("cache recorded invalid ingress version key",
zap.String("key", ingressVersionKey),
)
return true
}
ingressKey := vals[0]
ingressVersion := vals[1]

var (
obj metav1.Object
ing kube.Ingress
)
switch ingressVersion {
case kube.IngressV1:
ing, err = c.IngressLister.V1(ingressNamespace, ingressName)
obj = ing.V1()
case kube.IngressV1beta1:
ing, err = c.IngressLister.V1(ingressNamespace, ingressName)
obj = ing.V1beta1()
case kube.IngressExtensionsV1beta1:
ing, err = c.IngressLister.V1(ingressNamespace, ingressName)
obj = ing.ExtensionsV1beta1()
}
if err != nil {
log.Warnw("secret related ingress resource not found, skip",
zap.String("ingress", ingressKey),
)
return true
}
c.workqueue.Add(&types.Event{
Type: types.EventSync,
Object: kube.IngressEvent{
Key: ingressKey,
GroupVersion: ingressVersion,
},
})

cert, pkey, err := translation.ExtractKeyPair(secret, true)
if err != nil {
log.Errorw("secret required by Ingress invalid",
zap.String("ingress", ingressKey),
zap.String("secret", secretKey),
zap.Error(err),
)
go func(obj metav1.Object) {
runtimeObj := obj.(runtime.Object)
c.RecordEventS(runtimeObj, corev1.EventTypeWarning, utils.ResourceSyncAborted,
fmt.Sprintf("sync from secret %s changes failed, error: %s", secretKey, err.Error()))
c.recordStatus(runtimeObj)
}(obj)
return true
}

// update ssl
ssl.Cert = string(cert)
ssl.Key = string(pkey)

go func(ssl *v1.Ssl, obj metav1.Object) {
runtimeObj := obj.(runtime.Object)

err := c.SyncSSL(ctx, ssl, evType)
if err != nil {
log.Errorw("failed to sync ssl to APISIX",
zap.Error(err),
zap.Any("ssl", ssl),
zap.Any("secret", secret),
)
c.RecordEventS(runtimeObj, corev1.EventTypeWarning, utils.ResourceSyncAborted,
fmt.Sprintf("sync from secret %s changes failed, error: %s", secretKey, err.Error()))
c.recordStatus(runtimeObj)
} else {
c.RecordEventS(runtimeObj, corev1.EventTypeNormal, utils.ResourceSynced,
fmt.Sprintf("sync from secret %s changes", secretKey))
c.recordStatus(runtimeObj)
}
}(ssl, obj)
return true
})
}
3 changes: 3 additions & 0 deletions pkg/providers/k8s/namespace/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,14 +94,17 @@ func (c *namespaceController) sync(ctx context.Context, ev *types.Event) error {

// if labels of namespace contains the watchingLabels, the namespace should be set to controller.watchingNamespaces
if c.controller.watchingLabels.IsSubsetOf(namespace.Labels) {
log.Infow("watching namespace", zap.String("name", namespace.Name))
c.controller.watchingNamespaces.Store(namespace.Name, struct{}{})
} else {
log.Infow("un-watching namespace", zap.String("name", namespace.Name))
c.controller.watchingNamespaces.Delete(namespace.Name)
}

} else { // type == types.EventDelete
namespace := ev.Tombstone.(*corev1.Namespace)
if _, ok := c.controller.watchingNamespaces.Load(namespace.Name); ok {
log.Infow("un-watching namespace", zap.String("name", namespace.Name))
c.controller.watchingNamespaces.Delete(namespace.Name)
}
// do nothing, if the namespace did not in controller.watchingNamespaces
Expand Down
3 changes: 3 additions & 0 deletions test/e2e/suite-ingress/suite-ingress-features/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,9 @@ spec:
_ = s.NewAPISIXClient().GET("/ip").WithHeader("Host", "httpbin.com").Expect().Status(http.StatusOK).Body().Raw()

assert.Nil(ginkgo.GinkgoT(), s.DeleteResourceFromStringWithNamespace(route1, namespace1), "deleting ingress")

time.Sleep(6 * time.Second)

// un-label
_, err = client.CoreV1().Namespaces().Update(
context.Background(),
Expand Down

0 comments on commit edeef70

Please sign in to comment.