diff --git a/internal/infrastructure/kubernetes/deployment.go b/internal/infrastructure/kubernetes/deployment.go index f031a41a6dd4..e35c99f88e6d 100644 --- a/internal/infrastructure/kubernetes/deployment.go +++ b/internal/infrastructure/kubernetes/deployment.go @@ -9,7 +9,6 @@ import ( "context" _ "embed" "fmt" - "reflect" "strings" "text/template" @@ -288,14 +287,16 @@ func (i *Infra) createOrUpdateDeployment(ctx context.Context, infra *ir.Infra) e return fmt.Errorf("failed to create deployment %s/%s: %w", deploy.Namespace, deploy.Name, err) } + return nil } - } else { - // Update if current value is different. - if !reflect.DeepEqual(deploy.Spec, current.Spec) { - if err := i.Client.Update(ctx, deploy); err != nil { - return fmt.Errorf("failed to update deployment %s/%s: %w", - deploy.Namespace, deploy.Name, err) - } + return err + } + + // Update if current value is different. + if needsUpdateDeployment(current.Spec, *infra) { + if err := i.Client.Update(ctx, deploy); err != nil { + return fmt.Errorf("failed to update deployment %s/%s: %w", + deploy.Namespace, deploy.Name, err) } } @@ -320,3 +321,28 @@ func (i *Infra) deleteDeployment(ctx context.Context, infra *ir.Infra) error { return nil } + +func needsUpdateDeployment(deploymentSpec appsv1.DeploymentSpec, ir ir.Infra) bool { + for _, c := range deploymentSpec.Template.Spec.Containers { + if c.Name == envoyContainerName { + if c.Image != ir.Proxy.Image { + return true + } + } + } + if len(deploymentSpec.Template.ObjectMeta.Labels) != len(envoySelector(ir.GetProxyInfra().GetProxyMetadata().Labels).MatchLabels) { + return true + } + if len(deploymentSpec.Selector.MatchLabels) != len(envoySelector(ir.GetProxyInfra().GetProxyMetadata().Labels).MatchLabels) { + return true + } + for k, v := range envoySelector(ir.GetProxyInfra().GetProxyMetadata().Labels).MatchLabels { + if val, ok := deploymentSpec.Template.ObjectMeta.Labels[k]; !ok || val != v { + return true + } + if val, ok := deploymentSpec.Selector.MatchLabels[k]; !ok || val != v { + return true + } + } + return false +} diff --git a/internal/infrastructure/kubernetes/service.go b/internal/infrastructure/kubernetes/service.go index 8441aa978afe..95bda47264d6 100644 --- a/internal/infrastructure/kubernetes/service.go +++ b/internal/infrastructure/kubernetes/service.go @@ -8,7 +8,6 @@ package kubernetes import ( "context" "fmt" - "reflect" corev1 "k8s.io/api/core/v1" kerrors "k8s.io/apimachinery/pkg/api/errors" @@ -89,20 +88,75 @@ func (i *Infra) createOrUpdateService(ctx context.Context, infra *ir.Infra) erro return fmt.Errorf("failed to create service %s/%s: %w", svc.Namespace, svc.Name, err) } + return nil } - } else { - // Update if current value is different. - if !reflect.DeepEqual(svc.Spec, current.Spec) { - if err := i.Client.Update(ctx, svc); err != nil { - return fmt.Errorf("failed to update service %s/%s: %w", - svc.Namespace, svc.Name, err) - } + return err + } + + // Update if current value is different. + if needsUpdateService(svc.Spec, current.Spec) { + if err := i.Client.Update(ctx, constructUpdateService(svc, current)); err != nil { + return fmt.Errorf("failed to update service %s/%s: %w", + svc.Namespace, svc.Name, err) } } return nil } +func constructUpdateService(new, old *corev1.Service) *corev1.Service { + now := old.DeepCopy() + + now.Namespace = new.Namespace + now.Name = new.Name + now.Labels = new.Labels + now.Spec.Type = new.Spec.Type + now.Spec.Ports = new.Spec.Ports + now.Spec.Selector = new.Spec.Selector + now.Spec.SessionAffinity = new.Spec.SessionAffinity + now.Spec.ExternalTrafficPolicy = new.Spec.ExternalTrafficPolicy + + return now +} + +func needsUpdateService(new corev1.ServiceSpec, current corev1.ServiceSpec) bool { + for _, newPort := range new.Ports { + found := false + for _, currentPort := range current.Ports { + if newPort.Name == currentPort.Name && + newPort.Port == currentPort.Port && + newPort.Protocol == currentPort.Protocol && + newPort.TargetPort == currentPort.TargetPort { + found = true + } + } + if !found { + return true + } + } + + if new.Type != current.Type { + return true + } + if len(new.Selector) != len(current.Selector) { + return true + } + if new.SessionAffinity != current.SessionAffinity { + return true + } + if new.ExternalTrafficPolicy != current.ExternalTrafficPolicy { + return true + } + + for newK, newV := range new.Selector { + if currentV, ok := current.Selector[newK]; !ok || currentV != newV { + return true + } + } + + return false +} + // deleteService deletes the Envoy Service in the kube api server, if it exists. func (i *Infra) deleteService(ctx context.Context, infra *ir.Infra) error { svc := &corev1.Service{ diff --git a/internal/provider/kubernetes/gateway.go b/internal/provider/kubernetes/gateway.go index f7f958da0a81..27bd052249ff 100644 --- a/internal/provider/kubernetes/gateway.go +++ b/internal/provider/kubernetes/gateway.go @@ -651,6 +651,9 @@ func (r *gatewayReconciler) subscribeAndUpdateStatus(ctx context.Context) { } key := update.Key val := update.Value + if val == nil { + return + } r.statusUpdater.Send(status.Update{ NamespacedName: key, Resource: new(gwapiv1b1.Gateway), diff --git a/internal/provider/kubernetes/httproute.go b/internal/provider/kubernetes/httproute.go index e99ef92cbf64..21c33fb96495 100644 --- a/internal/provider/kubernetes/httproute.go +++ b/internal/provider/kubernetes/httproute.go @@ -357,6 +357,10 @@ func (r *httpRouteReconciler) subscribeAndUpdateStatus(ctx context.Context) { } key := update.Key val := update.Value + if val == nil { + return + } + r.statusUpdater.Send(status.Update{ NamespacedName: key, Resource: new(gwapiv1b1.HTTPRoute), diff --git a/internal/provider/kubernetes/tlsroute.go b/internal/provider/kubernetes/tlsroute.go index 9cd3db39e9c4..51e45f8d5ead 100644 --- a/internal/provider/kubernetes/tlsroute.go +++ b/internal/provider/kubernetes/tlsroute.go @@ -334,6 +334,10 @@ func (r *tlsRouteReconciler) subscribeAndUpdateStatus(ctx context.Context) { } key := update.Key val := update.Value + if val == nil { + return + } + r.statusUpdater.Send(status.Update{ NamespacedName: key, Resource: new(gwapiv1a2.TLSRoute), diff --git a/internal/xds/server/runner/runner.go b/internal/xds/server/runner/runner.go index 42b65bd44f1e..ac530e5a9e8a 100644 --- a/internal/xds/server/runner/runner.go +++ b/internal/xds/server/runner/runner.go @@ -70,20 +70,26 @@ func (r *Runner) Name() string { // Start starts the xds-server runner func (r *Runner) Start(ctx context.Context) error { r.Logger = r.Logger.WithValues("runner", r.Name()) - go r.subscribeAndTranslate(ctx) - go r.setupXdsServer(ctx) - r.Logger.Info("started") - return nil -} -func (r *Runner) setupXdsServer(ctx context.Context) { // Set up the gRPC server and register the xDS handler. + // Create SnapshotCache beforce start subscribeAndTranslate, + // prevent panics in case cache is nil. cfg := r.tlsConfig(xdsTLSCertFilename, xdsTLSKeyFilename, xdsTLSCaFilename) r.grpc = grpc.NewServer(grpc.Creds(credentials.NewTLS(cfg))) r.cache = cache.NewSnapshotCache(false, r.Logger) registerServer(controlplane_server_v3.NewServer(ctx, r.cache, r.cache), r.grpc) + // Start and listen xDS gRPC Server. + go r.serveXdsServer(ctx) + + // Start message Subscription. + go r.subscribeAndTranslate(ctx) + r.Logger.Info("started") + return nil +} + +func (r *Runner) serveXdsServer(ctx context.Context) { addr := net.JoinHostPort(XdsServerAddress, strconv.Itoa(XdsServerPort)) l, err := net.Listen("tcp", addr) if err != nil { @@ -126,9 +132,13 @@ func (r *Runner) subscribeAndTranslate(ctx context.Context) { var err error if update.Delete { err = r.cache.GenerateNewSnapshot(key, nil) - } else { - // Update snapshot cache - err = r.cache.GenerateNewSnapshot(key, val.XdsResources) + } else if val != nil && val.XdsResources != nil { + if r.cache == nil { + r.Logger.Error(err, "failed to init snapshot cache") + } else { + // Update snapshot cache + err = r.cache.GenerateNewSnapshot(key, val.XdsResources) + } } if err != nil { r.Logger.Error(err, "failed to generate a snapshot")