Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Fix lb ingress status #740

Merged
merged 10 commits into from
Dec 2, 2021
Prev Previous commit
Next Next commit
feat: update status for ingress LB
Signed-off-by: Jintao Zhang <zhangjintao9020@gmail.com>
  • Loading branch information
tao12345666333 committed Nov 14, 2021
commit 8c1ef63646c5d2d5d1e5910a15b0fdfa0784d4b0
2 changes: 1 addition & 1 deletion pkg/ingress/ingress.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func (c *ingressController) sync(ctx context.Context, ev *types.Event) error {

func (c *ingressController) handleSyncErr(obj interface{}, err error) {
ev := obj.(*types.Event)
event := ev.Object.(kube.ApisixRouteEvent)
event := ev.Object.(kube.IngressEvent)
namespace, name, errLocal := cache.SplitMetaNamespaceKey(event.Key)
if errLocal != nil {
log.Errorf("invalid resource key: %s", event.Key)
Expand Down
141 changes: 137 additions & 4 deletions pkg/ingress/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,19 @@ package ingress

import (
"context"
"fmt"
"net"
"time"

"go.uber.org/zap"
apiv1 "k8s.io/api/core/v1"
extensionsv1beta1 "k8s.io/api/extensions/v1beta1"
networkingv1 "k8s.io/api/networking/v1"
networkingv1beta1 "k8s.io/api/networking/v1beta1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/cache"

configv1 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v1"
configv2alpha1 "github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v2alpha1"
Expand All @@ -34,8 +39,9 @@ import (
)

const (
_conditionType = "ResourcesAvailable"
_commonSuccessMessage = "Sync Successfully"
_conditionType = "ResourcesAvailable"
_commonSuccessMessage = "Sync Successfully"
_gatewayLBNotReadyMessage = "The LoadBalancer used by the APISIX gateway is not yet ready"
)

// verifyGeneration verify generation to decide whether to update status
Expand All @@ -62,7 +68,7 @@ func (c *Controller) recordStatus(at interface{}, reason string, err error, stat
ObservedGeneration: generation,
}
client := c.kubeClient.APISIXClient
// kubeclient := c.kubeClient.Client
kubeClient := c.kubeClient.Client

switch v := at.(type) {
case *configv1.ApisixTls:
Expand Down Expand Up @@ -169,13 +175,140 @@ func (c *Controller) recordStatus(at interface{}, reason string, err error, stat
}
case *networkingv1.Ingress:
// set to status
// update v.Status.LoadBalancer
lbips, err := c.ingressLBStatusIPs()
if err != nil {
log.Errorw("failed to get APISIX gateway external IPs",
zap.Error(err),
)

}

v.Status.LoadBalancer.Ingress = lbips
if _, errRecord := kubeClient.NetworkingV1().Ingresses(v.Namespace).UpdateStatus(context.TODO(), v, metav1.UpdateOptions{}); errRecord != nil {
log.Errorw("failed to record status change for IngressV1",
zap.Error(errRecord),
zap.String("name", v.Name),
zap.String("namespace", v.Namespace),
)
}

case *networkingv1beta1.Ingress:
// set to status
lbips, err := c.ingressLBStatusIPs()
if err != nil {
log.Errorw("failed to get APISIX gateway external IPs",
zap.Error(err),
)

}

v.Status.LoadBalancer.Ingress = lbips
if _, errRecord := kubeClient.NetworkingV1beta1().Ingresses(v.Namespace).UpdateStatus(context.TODO(), v, metav1.UpdateOptions{}); errRecord != nil {
log.Errorw("failed to record status change for IngressV1",
zap.Error(errRecord),
zap.String("name", v.Name),
zap.String("namespace", v.Namespace),
)
}
case *extensionsv1beta1.Ingress:
// set to status
lbips, err := c.ingressLBStatusIPs()
if err != nil {
log.Errorw("failed to get APISIX gateway external IPs",
zap.Error(err),
)

}

v.Status.LoadBalancer.Ingress = lbips
if _, errRecord := kubeClient.ExtensionsV1beta1().Ingresses(v.Namespace).UpdateStatus(context.TODO(), v, metav1.UpdateOptions{}); errRecord != nil {
log.Errorw("failed to record status change for IngressV1",
zap.Error(errRecord),
zap.String("name", v.Name),
zap.String("namespace", v.Namespace),
)
}
default:
// This should not be executed
log.Errorf("unsupported resource record: %s", v)
}
}

// ingressPublishAddresses get addressed used to expose Ingress
func (c *Controller) ingressPublishAddresses() ([]string, error) {
ingressPublishService := c.cfg.IngressPublishService
ingressStatusAddress := c.cfg.IngressStatusAddress
addrs := []string{}

// if ingressStatusAddress is specified, it will be used first
if len(ingressStatusAddress) > 0 {
addrs = append(addrs, ingressStatusAddress...)
return addrs, nil
}

namespace, name, err := cache.SplitMetaNamespaceKey(ingressPublishService)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The separation can be handled in advance.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't understand what you mean.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This specified service is only needed here, I don’t think it needs to be checked or processed in advance.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the namespace is passed from users, we can split it and check the validity when the program is starting, so we can avoid such an error check here and we can abort the program in time (if the format is incorrect).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But users may not configure Ingress resources.

If the user configures this option, but he only uses custom resources, do we also need to report an error? And if there is any error, it will not affect the actual use of the user.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because it is not a parameter required by the real runtime, I don’t want to refuse to provide services to users in advance.

if err != nil {
log.Errorf("invalid ingressPublishService %s: %s", ingressPublishService, err)
return nil, err
}

kubeClient := c.kubeClient.Client
svc, err := kubeClient.CoreV1().Services(namespace).Get(context.TODO(), name, metav1.GetOptions{})
if err != nil {
return nil, err
}

switch svc.Spec.Type {
case apiv1.ServiceTypeLoadBalancer:
if len(svc.Status.LoadBalancer.Ingress) < 1 {
return addrs, fmt.Errorf(_gatewayLBNotReadyMessage)
}

for _, ip := range svc.Status.LoadBalancer.Ingress {
if ip.IP == "" {
// typically AWS load-balancers
addrs = append(addrs, ip.Hostname)
} else {
addrs = append(addrs, ip.IP)
}
}

addrs = append(addrs, svc.Spec.ExternalIPs...)
return addrs, nil
default:
return addrs, nil
}

}

// ingressLBStatusIPs organizes the available addresses
func (c *Controller) ingressLBStatusIPs() ([]apiv1.LoadBalancerIngress, error) {
lbips := []apiv1.LoadBalancerIngress{}
var ips []string

for {
var err error
ips, err = c.ingressPublishAddresses()
if err != nil {
if err.Error() == _gatewayLBNotReadyMessage {
log.Warnf("%s. Provided service: %s", _gatewayLBNotReadyMessage, c.cfg.IngressPublishService)
time.Sleep(time.Second)
continue
}

return nil, err
}
break
}

for _, ip := range ips {
if net.ParseIP(ip) == nil {
lbips = append(lbips, apiv1.LoadBalancerIngress{Hostname: ip})
} else {
lbips = append(lbips, apiv1.LoadBalancerIngress{IP: ip})
}

}

return lbips, nil
}