Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 31 additions & 9 deletions pkg/controller/console/console_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"reflect"
"regexp"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
Expand Down Expand Up @@ -141,9 +142,7 @@ func (r *ReconcileConsole) Reconcile(ctx context.Context, request reconcile.Requ
consoleDeployment := newDeploymentForCR(instance)

// Set Console instance as the owner and controller
if err := controllerutil.SetControllerReference(instance, consoleDeployment, r.scheme); err != nil {
return reconcile.Result{}, err
}
controllerutil.SetControllerReference(instance, consoleDeployment, r.scheme)

// Check if this Pod already exists
found := &appsv1.Deployment{}
Expand All @@ -155,8 +154,8 @@ func (r *ReconcileConsole) Reconcile(ctx context.Context, request reconcile.Requ
return reconcile.Result{}, err
}

// created successfully - don't requeue
return reconcile.Result{}, nil
// created successfully - requeue after cons.RequeueIntervalInSecond
return reconcile.Result{Requeue: true, RequeueAfter: time.Duration(cons.RequeueIntervalInSecond) * time.Second}, nil
} else if err != nil {
return reconcile.Result{}, err
}
Expand All @@ -175,11 +174,34 @@ func (r *ReconcileConsole) Reconcile(ctx context.Context, request reconcile.Requ
}
}

// TODO: update console if name server address changes
// update console if name server address changes
oldNamesrv, i := getOldNamesrvAddr(found)
if oldNamesrv != "" && oldNamesrv != share.NameServersStr {
found.Spec.Template.Spec.Containers[0].Env[i].Value = fmt.Sprintf("-Drocketmq.namesrv.addr=%s -Dcom.rocketmq.sendMessageWithVIPChannel=false", share.NameServersStr)
err = r.client.Update(context.TODO(), found)
if err != nil {
reqLogger.Error(err, "Failed to update console deploy namesrv", "Namespace", found.Namespace, "Name", found.Name)
} else {
reqLogger.Info("Successfully updated console deploy namesrv", "Namespace", found.Namespace, "Name", found.Name)
}
}

// CR already exists - don't requeue
reqLogger.Info("Skip reconcile: RocketMQ Console Deployment already exists", "Namespace", found.Namespace, "Name", found.Name)
return reconcile.Result{}, nil
return reconcile.Result{Requeue: true, RequeueAfter: time.Duration(cons.RequeueIntervalInSecond) * time.Second}, nil
}

func getOldNamesrvAddr(deploy *appsv1.Deployment) (string, int) {
for i, envVar := range deploy.Spec.Template.Spec.Containers[0].Env {
if envVar.Name == "JAVA_OPTS" {
re := regexp.MustCompile(`-Drocketmq.namesrv.addr=([^ ]+)`)
match := re.FindStringSubmatch(envVar.Value)
if len(match) > 1 {
return match[1], i
} else {
return "", -1
}
}
}
return "", -1
}

// newDeploymentForCR returns a deployment pod with modifying the ENV
Expand Down