Skip to content

Commit

Permalink
feat: update secret referenced by ingress (#1243)
Browse files Browse the repository at this point in the history
  • Loading branch information
lingsamuel authored Sep 21, 2022
1 parent 7bd6a03 commit e51a2c7
Show file tree
Hide file tree
Showing 21 changed files with 980 additions and 454 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/gin-gonic/gin v1.8.1
github.com/hashicorp/go-memdb v1.3.3
github.com/hashicorp/go-multierror v1.1.1
github.com/imdario/mergo v0.3.12
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.13.0
github.com/prometheus/client_model v0.2.0
Expand Down Expand Up @@ -49,7 +50,6 @@ require (
github.com/hashicorp/errwrap v1.0.0 // indirect
github.com/hashicorp/go-immutable-radix v1.3.0 // indirect
github.com/hashicorp/golang-lru v0.5.4 // indirect
github.com/imdario/mergo v0.3.12 // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
Expand Down
243 changes: 219 additions & 24 deletions pkg/providers/apisix/apisix_tls.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
configv2 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v2"
configv2beta3 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v2beta3"
"github.com/apache/apisix-ingress-controller/pkg/log"
"github.com/apache/apisix-ingress-controller/pkg/providers/translation"
"github.com/apache/apisix-ingress-controller/pkg/providers/utils"
"github.com/apache/apisix-ingress-controller/pkg/types"
v1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1"
Expand All @@ -49,22 +50,22 @@ type apisixTlsController struct {
apisixTlsLister kube.ApisixTlsLister
apisixTlsInformer cache.SharedIndexInformer

// this map enrolls which ApisixTls objects refer to a Kubernetes
// Secret object.
// type: Map<SecretKey, Map<ApisixTlsKey, ApisixTls>>
// SecretKey is `namespace_name`, ApisixTlsKey is kube style meta key: `namespace/name`
// secretSSLMap stores reference from K8s secret to ApisixTls
// type: Map<SecretKey, Map<ApisixTlsKey, SSL object in APISIX>>
// SecretKey -> ApisixTlsKey -> SSL object in APISIX
// SecretKey and ApisixTlsKey are kube-style meta key: `namespace/name`
secretSSLMap *sync.Map
}

func newApisixTlsController(common *apisixCommon) *apisixTlsController {
func newApisixTlsController(common *apisixCommon, apisixTlsInformer cache.SharedIndexInformer, apisixTlsLister kube.ApisixTlsLister) *apisixTlsController {
c := &apisixTlsController{
apisixCommon: common,
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemFastSlowRateLimiter(1*time.Second, 60*time.Second, 5), "ApisixTls"),
workers: 1,

secretInformer: common.SecretInformer,
apisixTlsLister: common.ApisixTlsLister,
apisixTlsInformer: common.ApisixTlsInformer,
apisixTlsLister: apisixTlsLister,
apisixTlsInformer: apisixTlsInformer,

secretSSLMap: new(sync.Map),
}
Expand Down Expand Up @@ -109,10 +110,10 @@ func (c *apisixTlsController) runWorker(ctx context.Context) {

func (c *apisixTlsController) sync(ctx context.Context, ev *types.Event) error {
event := ev.Object.(kube.ApisixTlsEvent)
key := event.Key
namespace, name, err := cache.SplitMetaNamespaceKey(key)
apisixTlsKey := event.Key
namespace, name, err := cache.SplitMetaNamespaceKey(apisixTlsKey)
if err != nil {
log.Errorf("found ApisixTls resource with invalid meta namespace key %s: %s", key, err)
log.Errorf("found ApisixTls resource with invalid meta namespace key %s: %s", apisixTlsKey, err)
return err
}

Expand All @@ -130,14 +131,14 @@ func (c *apisixTlsController) sync(ctx context.Context, ev *types.Event) error {
if !k8serrors.IsNotFound(err) {
log.Errorw("failed to get ApisixTls",
zap.Error(err),
zap.String("key", key),
zap.String("key", apisixTlsKey),
zap.String("version", event.GroupVersion),
)
return err
}
if ev.Type != types.EventDelete {
log.Warnw("ApisixTls %s was deleted before it can be delivered",
zap.String("key", key),
zap.String("key", apisixTlsKey),
zap.String("version", event.GroupVersion),
)
// Don't need to retry.
Expand All @@ -149,7 +150,7 @@ func (c *apisixTlsController) sync(ctx context.Context, ev *types.Event) error {
// We still find the resource while we are processing the DELETE event,
// that means object with same namespace and name was created, discarding
// this stale DELETE event.
log.Warnf("discard the stale ApisixTls delete event since the %s exists", key)
log.Warnf("discard the stale ApisixTls delete event since the %s exists", apisixTlsKey)
return nil
}
multiVersionedTls = ev.Tombstone.(kube.ApisixTls)
Expand All @@ -173,12 +174,12 @@ func (c *apisixTlsController) sync(ctx context.Context, ev *types.Event) error {
zap.Any("ApisixTls", tls),
)

secretKey := tls.Spec.Secret.Namespace + "_" + tls.Spec.Secret.Name
c.syncSecretSSL(secretKey, key, ssl, ev.Type)
secretKey := tls.Spec.Secret.Namespace + "/" + tls.Spec.Secret.Name
c.storeSecretCache(secretKey, apisixTlsKey, ssl, ev.Type)
if tls.Spec.Client != nil {
caSecretKey := tls.Spec.Client.CASecret.Namespace + "_" + tls.Spec.Client.CASecret.Name
caSecretKey := tls.Spec.Client.CASecret.Namespace + "/" + tls.Spec.Client.CASecret.Name
if caSecretKey != secretKey {
c.syncSecretSSL(caSecretKey, key, ssl, ev.Type)
c.storeSecretCache(caSecretKey, apisixTlsKey, ssl, ev.Type)
}
}

Expand Down Expand Up @@ -211,12 +212,12 @@ func (c *apisixTlsController) sync(ctx context.Context, ev *types.Event) error {
zap.Any("ApisixTls", tls),
)

secretKey := tls.Spec.Secret.Namespace + "_" + tls.Spec.Secret.Name
c.syncSecretSSL(secretKey, key, ssl, ev.Type)
secretKey := tls.Spec.Secret.Namespace + "/" + tls.Spec.Secret.Name
c.storeSecretCache(secretKey, apisixTlsKey, ssl, ev.Type)
if tls.Spec.Client != nil {
caSecretKey := tls.Spec.Client.CASecret.Namespace + "_" + tls.Spec.Client.CASecret.Name
caSecretKey := tls.Spec.Client.CASecret.Namespace + "/" + tls.Spec.Client.CASecret.Name
if caSecretKey != secretKey {
c.syncSecretSSL(caSecretKey, key, ssl, ev.Type)
c.storeSecretCache(caSecretKey, apisixTlsKey, ssl, ev.Type)
}
}

Expand All @@ -237,18 +238,18 @@ func (c *apisixTlsController) sync(ctx context.Context, ev *types.Event) error {
}
}

func (c *apisixTlsController) syncSecretSSL(secretKey string, apisixTlsKey string, ssl *v1.Ssl, event types.EventType) {
func (c *apisixTlsController) storeSecretCache(secretKey string, apisixTlsKey string, ssl *v1.Ssl, evType types.EventType) {
if ssls, ok := c.secretSSLMap.Load(secretKey); ok {
sslMap := ssls.(*sync.Map)
switch event {
switch evType {
case types.EventDelete:
sslMap.Delete(apisixTlsKey)
c.secretSSLMap.Store(secretKey, sslMap)
default:
sslMap.Store(apisixTlsKey, ssl)
c.secretSSLMap.Store(secretKey, sslMap)
}
} else if event != types.EventDelete {
} else if evType != types.EventDelete {
sslMap := new(sync.Map)
sslMap.Store(apisixTlsKey, ssl)
c.secretSSLMap.Store(secretKey, sslMap)
Expand Down Expand Up @@ -469,3 +470,197 @@ func (c *apisixTlsController) recordStatus(at interface{}, reason string, err er
log.Errorf("unsupported resource record: %s", v)
}
}

func (c *apisixTlsController) SyncSecretChange(ctx context.Context, ev *types.Event, secret *corev1.Secret, secretKey string) {
ssls, ok := c.secretSSLMap.Load(secretKey)
if !ok {
// This secret is not concerned.
return
}

sslMap, ok := ssls.(*sync.Map) // apisix tls key -> SSLs
if !ok {
return
}

switch c.Config.Kubernetes.APIVersion {
case config.ApisixV2beta3:
sslMap.Range(c.syncSSLsAndUpdateStatusV2beta3(ctx, ev, secret, secretKey))
case config.ApisixV2:
sslMap.Range(c.syncSSLsAndUpdateStatusV2(ctx, ev, secret, secretKey))
}
}

func (c *apisixTlsController) syncSSLsAndUpdateStatusV2beta3(ctx context.Context, ev *types.Event, secret *corev1.Secret, secretKey string) func(k, v interface{}) bool {
return func(k, v interface{}) bool {
ssl := v.(*v1.Ssl)
tlsMetaKey := k.(string)
tlsNamespace, tlsName, err := cache.SplitMetaNamespaceKey(tlsMetaKey)
if err != nil {
log.Errorf("invalid cached ApisixTls key: %s", tlsMetaKey)
return true
}

multiVersioned, err := c.apisixTlsLister.V2beta3(tlsNamespace, tlsName)
if err != nil {
log.Warnw("secret related ApisixTls resource not found, skip",
zap.String("ApisixTls", tlsMetaKey),
)
return true
}
tls := multiVersioned.V2beta3()

// We don't expect a secret to be used as both SSL and mTLS in ApisixTls
if tls.Spec.Secret.Namespace == secret.Namespace && tls.Spec.Secret.Name == secret.Name {
cert, pkey, err := translation.ExtractKeyPair(secret, true)
if err != nil {
log.Errorw("secret required by ApisixTls invalid",
zap.String("ApisixTls", tlsMetaKey),
zap.String("secret", secretKey),
zap.Error(err),
)
go func(tls *configv2beta3.ApisixTls) {
c.RecordEventS(tls, corev1.EventTypeWarning, utils.ResourceSyncAborted,
fmt.Sprintf("sync from secret %s changes failed, error: %s", secretKey, err.Error()))
c.recordStatus(tls, utils.ResourceSyncAborted, err, metav1.ConditionFalse, tls.GetGeneration())
}(tls)
return true
}
// sync ssl
ssl.Cert = string(cert)
ssl.Key = string(pkey)
} else if tls.Spec.Client != nil &&
tls.Spec.Client.CASecret.Namespace == secret.Namespace && tls.Spec.Client.CASecret.Name == secret.Name {
ca, _, err := translation.ExtractKeyPair(secret, false)
if err != nil {
log.Errorw("ca secret required by ApisixTls invalid",
zap.String("ApisixTls", tlsMetaKey),
zap.String("secret", secretKey),
zap.Error(err),
)
go func(tls *configv2beta3.ApisixTls) {
c.RecordEventS(tls, corev1.EventTypeWarning, utils.ResourceSyncAborted,
fmt.Sprintf("sync from ca secret %s changes failed, error: %s", secretKey, err.Error()))
c.recordStatus(tls, utils.ResourceSyncAborted, err, metav1.ConditionFalse, tls.GetGeneration())
}(tls)
return true
}
ssl.Client = &v1.MutualTLSClientConfig{
CA: string(ca),
}
} else {
log.Warnw("stale secret cache, ApisixTls doesn't requires target secret",
zap.String("ApisixTls", tlsMetaKey),
zap.String("secret", secretKey),
)
return true
}
// Use another goroutine to send requests, to avoid
// long time lock occupying.
go func(ssl *v1.Ssl, tls *configv2beta3.ApisixTls) {
err := c.SyncSSL(ctx, ssl, ev.Type)
if err != nil {
log.Errorw("failed to sync ssl to APISIX",
zap.Error(err),
zap.Any("ssl", ssl),
zap.Any("secret", secret),
)
c.RecordEventS(tls, corev1.EventTypeWarning, utils.ResourceSyncAborted,
fmt.Sprintf("sync from secret %s changes failed, error: %s", secretKey, err.Error()))
c.recordStatus(tls, utils.ResourceSyncAborted, err, metav1.ConditionFalse, tls.GetGeneration())
} else {
c.RecordEventS(tls, corev1.EventTypeNormal, utils.ResourceSynced,
fmt.Sprintf("sync from secret %s changes", secretKey))
c.recordStatus(tls, utils.ResourceSynced, nil, metav1.ConditionTrue, tls.GetGeneration())
}
}(ssl, tls)
return true
}
}

func (c *apisixTlsController) syncSSLsAndUpdateStatusV2(ctx context.Context, ev *types.Event, secret *corev1.Secret, secretKey string) func(k, v interface{}) bool {
return func(k, v interface{}) bool {
ssl := v.(*v1.Ssl)
tlsMetaKey := k.(string)
tlsNamespace, tlsName, err := cache.SplitMetaNamespaceKey(tlsMetaKey)
if err != nil {
log.Errorf("invalid cached ApisixTls key: %s", tlsMetaKey)
return true
}

multiVersioned, err := c.apisixTlsLister.V2(tlsNamespace, tlsName)
if err != nil {
log.Warnw("secret related ApisixTls resource not found, skip",
zap.String("ApisixTls", tlsMetaKey),
)
return true
}
tls := multiVersioned.V2()

// We don't expect a secret to be used as both SSL and mTLS in ApisixTls
if tls.Spec.Secret.Namespace == secret.Namespace && tls.Spec.Secret.Name == secret.Name {
cert, pkey, err := translation.ExtractKeyPair(secret, true)
if err != nil {
log.Errorw("secret required by ApisixTls invalid",
zap.String("ApisixTls", tlsMetaKey),
zap.String("secret", secretKey),
zap.Error(err),
)
go func(tls *configv2.ApisixTls) {
c.RecordEventS(tls, corev1.EventTypeWarning, utils.ResourceSyncAborted,
fmt.Sprintf("sync from secret %s changes failed, error: %s", secretKey, err.Error()))
c.recordStatus(tls, utils.ResourceSyncAborted, err, metav1.ConditionFalse, tls.GetGeneration())
}(tls)
return true
}
// sync ssl
ssl.Cert = string(cert)
ssl.Key = string(pkey)
} else if tls.Spec.Client != nil &&
tls.Spec.Client.CASecret.Namespace == secret.Namespace && tls.Spec.Client.CASecret.Name == secret.Name {
ca, _, err := translation.ExtractKeyPair(secret, false)
if err != nil {
log.Errorw("ca secret required by ApisixTls invalid",
zap.String("ApisixTls", tlsMetaKey),
zap.String("secret", secretKey),
zap.Error(err),
)
go func(tls *configv2.ApisixTls) {
c.RecordEventS(tls, corev1.EventTypeWarning, utils.ResourceSyncAborted,
fmt.Sprintf("sync from ca secret %s changes failed, error: %s", secretKey, err.Error()))
c.recordStatus(tls, utils.ResourceSyncAborted, err, metav1.ConditionFalse, tls.GetGeneration())
}(tls)
return true
}
ssl.Client = &v1.MutualTLSClientConfig{
CA: string(ca),
}
} else {
log.Warnw("stale secret cache, ApisixTls doesn't requires target secret",
zap.String("ApisixTls", tlsMetaKey),
zap.String("secret", secretKey),
)
return true
}
// Use another goroutine to send requests, to avoid
// long time lock occupying.
go func(ssl *v1.Ssl, tls *configv2.ApisixTls) {
err := c.SyncSSL(ctx, ssl, ev.Type)
if err != nil {
log.Errorw("failed to sync ssl to APISIX",
zap.Error(err),
zap.Any("ssl", ssl),
zap.Any("secret", secret),
)
c.RecordEventS(tls, corev1.EventTypeWarning, utils.ResourceSyncAborted,
fmt.Sprintf("sync from secret %s changes failed, error: %s", secretKey, err.Error()))
c.recordStatus(tls, utils.ResourceSyncAborted, err, metav1.ConditionFalse, tls.GetGeneration())
} else {
c.RecordEventS(tls, corev1.EventTypeNormal, utils.ResourceSynced,
fmt.Sprintf("sync from secret %s changes", secretKey))
c.recordStatus(tls, utils.ResourceSynced, nil, metav1.ConditionTrue, tls.GetGeneration())
}
}(ssl, tls)
return true
}
}
Loading

0 comments on commit e51a2c7

Please sign in to comment.