Skip to content

Commit

Permalink
fix: update svc failed & trigger svc update wrongly & watchable map p…
Browse files Browse the repository at this point in the history
…anics

Signed-off-by: bitliu <bitliu@tencent.com>
  • Loading branch information
Xunzhuo committed Nov 22, 2022
1 parent 6be6950 commit 2fd7044
Show file tree
Hide file tree
Showing 6 changed files with 117 additions and 25 deletions.
34 changes: 26 additions & 8 deletions internal/infrastructure/kubernetes/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"context"
_ "embed"
"fmt"
"reflect"
"strings"
"text/template"

Expand Down Expand Up @@ -289,13 +288,14 @@ func (i *Infra) createOrUpdateDeployment(ctx context.Context, infra *ir.Infra) e
deploy.Namespace, deploy.Name, err)
}
}
} else {
// Update if current value is different.
if !reflect.DeepEqual(deploy.Spec, current.Spec) {
if err := i.Client.Update(ctx, deploy); err != nil {
return fmt.Errorf("failed to update deployment %s/%s: %w",
deploy.Namespace, deploy.Name, err)
}
return err
}

// Update if current value is different.
if needsUpdateDeployment(current.Spec, *infra) {
if err := i.Client.Update(ctx, deploy); err != nil {
return fmt.Errorf("failed to update deployment %s/%s: %w",
deploy.Namespace, deploy.Name, err)
}
}

Expand All @@ -320,3 +320,21 @@ func (i *Infra) deleteDeployment(ctx context.Context, infra *ir.Infra) error {

return nil
}

func needsUpdateDeployment(deploymentSpec appsv1.DeploymentSpec, ir ir.Infra) bool {
if len(deploymentSpec.Template.ObjectMeta.Labels) != len(envoySelector(ir.GetProxyInfra().GetProxyMetadata().Labels).MatchLabels) {
return true
}
if len(deploymentSpec.Selector.MatchLabels) != len(envoySelector(ir.GetProxyInfra().GetProxyMetadata().Labels).MatchLabels) {
return true
}
for k, v := range envoySelector(ir.GetProxyInfra().GetProxyMetadata().Labels).MatchLabels {
if val, ok := deploymentSpec.Template.ObjectMeta.Labels[k]; !ok || val != v {
return true
}
if val, ok := deploymentSpec.Selector.MatchLabels[k]; !ok || val != v {
return true
}
}
return false
}
69 changes: 61 additions & 8 deletions internal/infrastructure/kubernetes/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ package kubernetes
import (
"context"
"fmt"
"reflect"

corev1 "k8s.io/api/core/v1"
kerrors "k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -90,19 +89,73 @@ func (i *Infra) createOrUpdateService(ctx context.Context, infra *ir.Infra) erro
svc.Namespace, svc.Name, err)
}
}
} else {
// Update if current value is different.
if !reflect.DeepEqual(svc.Spec, current.Spec) {
if err := i.Client.Update(ctx, svc); err != nil {
return fmt.Errorf("failed to update service %s/%s: %w",
svc.Namespace, svc.Name, err)
}
return err
}

// Update if current value is different.
if needsUpdateService(svc.Spec, current.Spec) {
if err := i.Client.Update(ctx, constructUpdateService(svc, current)); err != nil {
return fmt.Errorf("failed to update service %s/%s: %w",
svc.Namespace, svc.Name, err)
}
}

return nil
}

func constructUpdateService(new, old *corev1.Service) *corev1.Service {
now := old.DeepCopy()

now.Namespace = new.Namespace
now.Name = new.Name
now.Labels = new.Labels
now.Spec.Type = new.Spec.Type
now.Spec.Ports = new.Spec.Ports
now.Spec.Selector = new.Spec.Selector
now.Spec.SessionAffinity = new.Spec.SessionAffinity
now.Spec.ExternalTrafficPolicy = new.Spec.ExternalTrafficPolicy

return now
}

func needsUpdateService(new corev1.ServiceSpec, current corev1.ServiceSpec) bool {
for _, newPort := range new.Ports {
found := false
for _, currentPort := range current.Ports {
if newPort.Name == currentPort.Name &&
newPort.Port == currentPort.Port &&
newPort.Protocol == currentPort.Protocol &&
newPort.TargetPort == currentPort.TargetPort {
found = true
}
}
if !found {
return true
}
}

if new.Type != current.Type {
return true
}
if len(new.Selector) != len(current.Selector) {
return true
}
if new.SessionAffinity != current.SessionAffinity {
return true
}
if new.ExternalTrafficPolicy != current.ExternalTrafficPolicy {
return true
}

for newK, newV := range new.Selector {
if currentV, ok := current.Selector[newK]; !ok || currentV != newV {
return true
}
}

return false
}

// deleteService deletes the Envoy Service in the kube api server, if it exists.
func (i *Infra) deleteService(ctx context.Context, infra *ir.Infra) error {
svc := &corev1.Service{
Expand Down
3 changes: 3 additions & 0 deletions internal/provider/kubernetes/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -651,6 +651,9 @@ func (r *gatewayReconciler) subscribeAndUpdateStatus(ctx context.Context) {
}
key := update.Key
val := update.Value
if val == nil {
return
}
r.statusUpdater.Send(status.Update{
NamespacedName: key,
Resource: new(gwapiv1b1.Gateway),
Expand Down
4 changes: 4 additions & 0 deletions internal/provider/kubernetes/httproute.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,10 @@ func (r *httpRouteReconciler) subscribeAndUpdateStatus(ctx context.Context) {
}
key := update.Key
val := update.Value
if val == nil {
return
}

r.statusUpdater.Send(status.Update{
NamespacedName: key,
Resource: new(gwapiv1b1.HTTPRoute),
Expand Down
4 changes: 4 additions & 0 deletions internal/provider/kubernetes/tlsroute.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,10 @@ func (r *tlsRouteReconciler) subscribeAndUpdateStatus(ctx context.Context) {
}
key := update.Key
val := update.Value
if val == nil {
return
}

r.statusUpdater.Send(status.Update{
NamespacedName: key,
Resource: new(gwapiv1a2.TLSRoute),
Expand Down
28 changes: 19 additions & 9 deletions internal/xds/server/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,20 +70,26 @@ func (r *Runner) Name() string {
// Start starts the xds-server runner
func (r *Runner) Start(ctx context.Context) error {
r.Logger = r.Logger.WithValues("runner", r.Name())
go r.subscribeAndTranslate(ctx)
go r.setupXdsServer(ctx)
r.Logger.Info("started")
return nil
}

func (r *Runner) setupXdsServer(ctx context.Context) {
// Set up the gRPC server and register the xDS handler.
// Create SnapshotCache beforce start subscribeAndTranslate,
// prevent panics in case cache is nil.
cfg := r.tlsConfig(xdsTLSCertFilename, xdsTLSKeyFilename, xdsTLSCaFilename)
r.grpc = grpc.NewServer(grpc.Creds(credentials.NewTLS(cfg)))

r.cache = cache.NewSnapshotCache(false, r.Logger)
registerServer(controlplane_server_v3.NewServer(ctx, r.cache, r.cache), r.grpc)

// Start and listen xDS gRPC Server.
go r.serveXdsServer(ctx)

// Start message Subscription.
go r.subscribeAndTranslate(ctx)
r.Logger.Info("started")
return nil
}

func (r *Runner) serveXdsServer(ctx context.Context) {
addr := net.JoinHostPort(XdsServerAddress, strconv.Itoa(XdsServerPort))
l, err := net.Listen("tcp", addr)
if err != nil {
Expand Down Expand Up @@ -126,9 +132,13 @@ func (r *Runner) subscribeAndTranslate(ctx context.Context) {
var err error
if update.Delete {
err = r.cache.GenerateNewSnapshot(key, nil)
} else {
// Update snapshot cache
err = r.cache.GenerateNewSnapshot(key, val.XdsResources)
} else if val != nil && val.XdsResources != nil {
if r.cache == nil {
r.Logger.Error(err, "failed to init snapshot cache")
} else {
// Update snapshot cache
err = r.cache.GenerateNewSnapshot(key, val.XdsResources)
}
}
if err != nil {
r.Logger.Error(err, "failed to generate a snapshot")
Expand Down

0 comments on commit 2fd7044

Please sign in to comment.