Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions cmd/module-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func main() {

zlogger.Info("start to start manager")
ctrl.SetLogger(zlogger)
mgr, err := manager.New(kubeConfig, manager.Options{
k8sControllerManager, err := manager.New(kubeConfig, manager.Options{
Cache: cache.Options{},
HealthProbeBindAddress: ":8081",
Metrics: server.Options{
Expand All @@ -120,7 +120,7 @@ func main() {
tracker.SetTracker(&tracker.DefaultTracker{})

// Configure and create VNode controller
rcc := vkModel.BuildVNodeControllerConfig{
vNodeControllerConfig := vkModel.BuildVNodeControllerConfig{
ClientID: clientID,
Env: env,
VPodType: model.ComponentModule,
Expand All @@ -129,43 +129,43 @@ func main() {
VNodeWorkerNum: vnodeWorkerNum,
}

mdc, err := module_deployment_controller.NewModuleDeploymentController(env)
moduleDeploymentController, err := module_deployment_controller.NewModuleDeploymentController(env)
if err != nil {
zlogger.Error(err, "unable to set up module_deployment_controller")
return
}

err = mdc.SetupWithManager(ctx, mgr)
err = moduleDeploymentController.SetupWithManager(ctx, k8sControllerManager)
if err != nil {
zlogger.Error(err, "unable to setup module_deployment_controller")
return
}

tunnel := startTunnels(ctx, clientID, env, mgr, mdc)
tunnel := startTunnels(ctx, clientID, env, k8sControllerManager, moduleDeploymentController)

vc, err := vnode_controller.NewVNodeController(&rcc, tunnel)
vNodeController, err := vnode_controller.NewVNodeController(&vNodeControllerConfig, tunnel)
if err != nil {
zlogger.Error(err, "unable to set up VNodeController")
return
}

err = vc.SetupWithManager(ctx, mgr)
err = vNodeController.SetupWithManager(ctx, k8sControllerManager)
if err != nil {
zlogger.Error(err, "unable to setup vnode controller")
return
}

if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil {
if err := k8sControllerManager.AddHealthzCheck("healthz", healthz.Ping); err != nil {
zlogger.Error(err, "unable to set up health check")
os.Exit(1)
}
if err := mgr.AddReadyzCheck("readyz", healthz.Ping); err != nil {
if err := k8sControllerManager.AddReadyzCheck("readyz", healthz.Ping); err != nil {
zlogger.Error(err, "unable to set up ready check")
os.Exit(1)
}

zlogger.Info("Module controller running")
err = mgr.Start(signals.SetupSignalHandler())
err = k8sControllerManager.Start(signals.SetupSignalHandler())
if err != nil {
log.G(ctx).WithError(err).Error("failed to start manager")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type ModuleDeploymentController struct {
}

// Reconcile is the main reconciliation function for the controller.
func (mdc *ModuleDeploymentController) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
func (moduleDeploymentController *ModuleDeploymentController) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
zaplogger.FromContext(ctx).Info("Reconciling module deployment", "request", request)
// This function is a placeholder for actual reconciliation logic.
return reconcile.Result{}, nil
Expand All @@ -58,23 +58,23 @@ func NewModuleDeploymentController(env string) (*ModuleDeploymentController, err
}

// SetupWithManager sets up the controller with a manager.
func (mdc *ModuleDeploymentController) SetupWithManager(ctx context.Context, mgr manager.Manager) (err error) {
func (moduleDeploymentController *ModuleDeploymentController) SetupWithManager(ctx context.Context, mgr manager.Manager) (err error) {
logger := zaplogger.FromContext(ctx)
mdc.updateToken <- nil
mdc.client = mgr.GetClient()
mdc.cache = mgr.GetCache()
moduleDeploymentController.updateToken <- nil
moduleDeploymentController.client = mgr.GetClient()
moduleDeploymentController.cache = mgr.GetCache()

logger.Info("Setting up module deployment controller")

c, err := controller.New("module-deployment-controller", mgr, controller.Options{
Reconciler: mdc,
customController, err := controller.New("module-deployment-controller", mgr, controller.Options{
Reconciler: moduleDeploymentController,
})
if err != nil {
logger.Error(err, "unable to set up module-deployment controller")
return err
}

envRequirement, _ := labels.NewRequirement(vkModel.LabelKeyOfEnv, selection.In, []string{mdc.env})
envRequirement, _ := labels.NewRequirement(vkModel.LabelKeyOfEnv, selection.In, []string{moduleDeploymentController.env})

// first sync node cache
nodeRequirement, _ := labels.NewRequirement(vkModel.LabelKeyOfComponent, selection.In, []string{vkModel.ComponentVNode})
Expand All @@ -85,14 +85,14 @@ func (mdc *ModuleDeploymentController) SetupWithManager(ctx context.Context, mgr
deploymentSelector := labels.NewSelector().Add(*deploymentRequirement, *envRequirement)

go func() {
syncd := mdc.cache.WaitForCacheSync(ctx)
syncd := moduleDeploymentController.cache.WaitForCacheSync(ctx)
if !syncd {
logger.Error(nil, "failed to wait for cache sync")
return
}
// init
vnodeList := &corev1.NodeList{}
err = mdc.cache.List(ctx, vnodeList, &client.ListOptions{
err = moduleDeploymentController.cache.List(ctx, vnodeList, &client.ListOptions{
LabelSelector: vnodeSelector,
})
if err != nil {
Expand All @@ -102,12 +102,12 @@ func (mdc *ModuleDeploymentController) SetupWithManager(ctx context.Context, mgr

for _, vnode := range vnodeList.Items {
// no deployment, just add
mdc.runtimeStorage.PutNode(vnode.DeepCopy())
moduleDeploymentController.runtimeStorage.PutNode(vnode.DeepCopy())
}

// init deployments
depList := &appsv1.DeploymentList{}
err = mdc.cache.List(ctx, depList, &client.ListOptions{
err = moduleDeploymentController.cache.List(ctx, depList, &client.ListOptions{
LabelSelector: deploymentSelector,
})
if err != nil {
Expand All @@ -116,38 +116,38 @@ func (mdc *ModuleDeploymentController) SetupWithManager(ctx context.Context, mgr
}

for _, deployment := range depList.Items {
mdc.runtimeStorage.PutDeployment(deployment)
moduleDeploymentController.runtimeStorage.PutDeployment(deployment)
}

mdc.updateDeploymentReplicas(ctx, depList.Items)
moduleDeploymentController.updateDeploymentReplicas(ctx, depList.Items)
}()

var vnodeEventHandler = handler.TypedFuncs[*corev1.Node, reconcile.Request]{
CreateFunc: func(ctx context.Context, e event.TypedCreateEvent[*corev1.Node], w workqueue.TypedRateLimitingInterface[reconcile.Request]) {
mdc.vnodeCreateHandler(ctx, e.Object)
moduleDeploymentController.vnodeCreateHandler(ctx, e.Object)
},
UpdateFunc: func(ctx context.Context, e event.TypedUpdateEvent[*corev1.Node], w workqueue.TypedRateLimitingInterface[reconcile.Request]) {
mdc.vnodeUpdateHandler(ctx, e.ObjectOld, e.ObjectNew)
moduleDeploymentController.vnodeUpdateHandler(ctx, e.ObjectOld, e.ObjectNew)
},
DeleteFunc: func(ctx context.Context, e event.TypedDeleteEvent[*corev1.Node], w workqueue.TypedRateLimitingInterface[reconcile.Request]) {
mdc.vnodeDeleteHandler(ctx, e.Object)
moduleDeploymentController.vnodeDeleteHandler(ctx, e.Object)
},
GenericFunc: func(ctx context.Context, e event.TypedGenericEvent[*corev1.Node], w workqueue.TypedRateLimitingInterface[reconcile.Request]) {
logger.WithValues("node_name", e.Object.Name).Info("Generic func call")
},
}

if err = c.Watch(source.Kind(mgr.GetCache(), &corev1.Node{}, vnodeEventHandler, &VNodePredicates{LabelSelector: vnodeSelector})); err != nil {
if err = customController.Watch(source.Kind(mgr.GetCache(), &corev1.Node{}, vnodeEventHandler, &VNodePredicates{LabelSelector: vnodeSelector})); err != nil {
logger.Error(err, "unable to watch nodes")
return err
}

var deploymentEventHandler = handler.TypedFuncs[*appsv1.Deployment, reconcile.Request]{
CreateFunc: func(ctx context.Context, e event.TypedCreateEvent[*appsv1.Deployment], w workqueue.TypedRateLimitingInterface[reconcile.Request]) {
mdc.deploymentAddHandler(ctx, e.Object)
moduleDeploymentController.deploymentAddHandler(ctx, e.Object)
},
UpdateFunc: func(ctx context.Context, e event.TypedUpdateEvent[*appsv1.Deployment], w workqueue.TypedRateLimitingInterface[reconcile.Request]) {
mdc.deploymentUpdateHandler(ctx, e.ObjectOld, e.ObjectNew)
moduleDeploymentController.deploymentUpdateHandler(ctx, e.ObjectOld, e.ObjectNew)
},
DeleteFunc: func(ctx context.Context, e event.TypedDeleteEvent[*appsv1.Deployment], w workqueue.TypedRateLimitingInterface[reconcile.Request]) {
},
Expand All @@ -156,7 +156,7 @@ func (mdc *ModuleDeploymentController) SetupWithManager(ctx context.Context, mgr
},
}

if err = c.Watch(source.Kind(mgr.GetCache(), &appsv1.Deployment{}, &deploymentEventHandler, &ModuleDeploymentPredicates{LabelSelector: deploymentSelector})); err != nil {
if err = customController.Watch(source.Kind(mgr.GetCache(), &appsv1.Deployment{}, &deploymentEventHandler, &ModuleDeploymentPredicates{LabelSelector: deploymentSelector})); err != nil {
logger.Error(err, "unable to watch module Deployments")
return err
}
Expand All @@ -166,14 +166,14 @@ func (mdc *ModuleDeploymentController) SetupWithManager(ctx context.Context, mgr
}

// QueryContainerBaseline queries the baseline for a given container.
func (mdc *ModuleDeploymentController) QueryContainerBaseline(ctx context.Context, req model.QueryBaselineRequest) []corev1.Container {
func (moduleDeploymentController *ModuleDeploymentController) QueryContainerBaseline(ctx context.Context, req model.QueryBaselineRequest) []corev1.Container {
logger := zaplogger.FromContext(ctx)
labelMap := map[string]string{
// TODO: should add those label to deployments by module controller
vkModel.LabelKeyOfEnv: mdc.env,
vkModel.LabelKeyOfEnv: moduleDeploymentController.env,
}
allDeploymentList := appsv1.DeploymentList{}
err := mdc.cache.List(context.Background(), &allDeploymentList, &client.ListOptions{
err := moduleDeploymentController.cache.List(context.Background(), &allDeploymentList, &client.ListOptions{
LabelSelector: labels.SelectorFromSet(labelMap),
})
if err != nil {
Expand All @@ -190,7 +190,7 @@ func (mdc *ModuleDeploymentController) QueryContainerBaseline(ctx context.Contex
containerNames := []string{}
for _, deployment := range allDeploymentList.Items {
clusterName := getClusterNameFromDeployment(&deployment)
if clusterName != "" && clusterName != req.ClusterName {
if clusterName != "" && clusterName == req.ClusterName {
for _, container := range deployment.Spec.Template.Spec.Containers {
containers = append(containers, container)
containerNames = append(containerNames, utils.GetBizUniqueKey(&container))
Expand All @@ -202,25 +202,25 @@ func (mdc *ModuleDeploymentController) QueryContainerBaseline(ctx context.Contex
}

// vnodeCreateHandler handles the creation of a new vnode.
func (mdc *ModuleDeploymentController) vnodeCreateHandler(ctx context.Context, vnode *corev1.Node) {
relatedDeploymentsByNode := mdc.GetRelatedDeploymentsByNode(ctx, vnode)
mdc.updateDeploymentReplicas(ctx, relatedDeploymentsByNode)
func (moduleDeploymentController *ModuleDeploymentController) vnodeCreateHandler(ctx context.Context, vnode *corev1.Node) {
relatedDeploymentsByNode := moduleDeploymentController.GetRelatedDeploymentsByNode(ctx, vnode)
moduleDeploymentController.updateDeploymentReplicas(ctx, relatedDeploymentsByNode)
}

// vnodeUpdateHandler handles the update of an existing vnode.
func (mdc *ModuleDeploymentController) vnodeUpdateHandler(ctx context.Context, _, vnode *corev1.Node) {
relatedDeploymentsByNode := mdc.GetRelatedDeploymentsByNode(ctx, vnode)
mdc.updateDeploymentReplicas(ctx, relatedDeploymentsByNode)
func (moduleDeploymentController *ModuleDeploymentController) vnodeUpdateHandler(ctx context.Context, _, vnode *corev1.Node) {
relatedDeploymentsByNode := moduleDeploymentController.GetRelatedDeploymentsByNode(ctx, vnode)
moduleDeploymentController.updateDeploymentReplicas(ctx, relatedDeploymentsByNode)
}

// vnodeDeleteHandler handles the deletion of a vnode.
func (mdc *ModuleDeploymentController) vnodeDeleteHandler(ctx context.Context, vnode *corev1.Node) {
func (moduleDeploymentController *ModuleDeploymentController) vnodeDeleteHandler(ctx context.Context, vnode *corev1.Node) {
vnodeCopy := vnode.DeepCopy()
relatedDeploymentsByNode := mdc.GetRelatedDeploymentsByNode(ctx, vnodeCopy)
mdc.updateDeploymentReplicas(ctx, relatedDeploymentsByNode)
relatedDeploymentsByNode := moduleDeploymentController.GetRelatedDeploymentsByNode(ctx, vnodeCopy)
moduleDeploymentController.updateDeploymentReplicas(ctx, relatedDeploymentsByNode)
}

func (mdc *ModuleDeploymentController) GetRelatedDeploymentsByNode(ctx context.Context, node *corev1.Node) []appsv1.Deployment {
func (moduleDeploymentController *ModuleDeploymentController) GetRelatedDeploymentsByNode(ctx context.Context, node *corev1.Node) []appsv1.Deployment {
logger := zaplogger.FromContext(ctx)
matchedDeployments := make([]appsv1.Deployment, 0)

Expand All @@ -231,7 +231,7 @@ func (mdc *ModuleDeploymentController) GetRelatedDeploymentsByNode(ctx context.C
}

deploymentList := appsv1.DeploymentList{}
err := mdc.cache.List(ctx, &deploymentList, &client.ListOptions{
err := moduleDeploymentController.cache.List(ctx, &deploymentList, &client.ListOptions{
LabelSelector: labels.SelectorFromSet(map[string]string{
model.LabelKeyOfVPodDeploymentStrategy: string(model.VPodDeploymentStrategyPeer),
}),
Expand All @@ -253,31 +253,31 @@ func (mdc *ModuleDeploymentController) GetRelatedDeploymentsByNode(ctx context.C
}

// deploymentAddHandler handles the addition of a new deployment.
func (mdc *ModuleDeploymentController) deploymentAddHandler(ctx context.Context, dep *appsv1.Deployment) {
func (moduleDeploymentController *ModuleDeploymentController) deploymentAddHandler(ctx context.Context, dep *appsv1.Deployment) {
if dep == nil {
return
}

mdc.updateDeploymentReplicas(ctx, []appsv1.Deployment{*dep})
moduleDeploymentController.updateDeploymentReplicas(ctx, []appsv1.Deployment{*dep})
}

// deploymentUpdateHandler handles the update of an existing deployment.
func (mdc *ModuleDeploymentController) deploymentUpdateHandler(ctx context.Context, _, newDep *appsv1.Deployment) {
func (moduleDeploymentController *ModuleDeploymentController) deploymentUpdateHandler(ctx context.Context, _, newDep *appsv1.Deployment) {
if newDep == nil {
return
}

mdc.updateDeploymentReplicas(ctx, []appsv1.Deployment{*newDep})
moduleDeploymentController.updateDeploymentReplicas(ctx, []appsv1.Deployment{*newDep})
}

// updateDeploymentReplicas updates the replicas of deployments based on node count.
func (mdc *ModuleDeploymentController) updateDeploymentReplicas(ctx context.Context, deployments []appsv1.Deployment) {
func (moduleDeploymentController *ModuleDeploymentController) updateDeploymentReplicas(ctx context.Context, deployments []appsv1.Deployment) {
logger := zaplogger.FromContext(ctx)

// TODO Implement this function.
<-mdc.updateToken
<-moduleDeploymentController.updateToken
defer func() {
mdc.updateToken <- nil
moduleDeploymentController.updateToken <- nil
}()

enableModuleReplicasSameWithBase := utils.GetEnv("ENABLE_MODULE_REPLICAS_SYNC_WITH_BASE", "false")
Expand All @@ -296,15 +296,15 @@ func (mdc *ModuleDeploymentController) updateDeploymentReplicas(ctx context.Cont
continue
}

sameClusterNodeCount, err := mdc.getReadyNodeCount(ctx, clusterName)
sameClusterNodeCount, err := moduleDeploymentController.getReadyNodeCount(ctx, clusterName)
if err != nil {
logger.Error(err, fmt.Sprintf("failed to get nodes of cluster %s, skip to update relicas", clusterName))
continue
}

if int32(sameClusterNodeCount) != *deployment.Spec.Replicas {
err := tracker.G().FuncTrack(deployment.Labels[vkModel.LabelKeyOfTraceID], vkModel.TrackSceneVPodDeploy, model.TrackEventVPodPeerDeploymentReplicaModify, deployment.Labels, func() (error, vkModel.ErrorCode) {
return mdc.updateDeploymentReplicasOfKubernetes(ctx, sameClusterNodeCount, deployment)
return moduleDeploymentController.updateDeploymentReplicasOfKubernetes(ctx, sameClusterNodeCount, deployment)
})
if err != nil {
logger.Error(err, fmt.Sprintf("failed to update deployment replicas of %s", deployment.Name))
Expand Down Expand Up @@ -348,10 +348,10 @@ func getClusterNameFromNode(node *corev1.Node) string {
return ""
}

func (mdc *ModuleDeploymentController) getReadyNodeCount(ctx context.Context, clusterName string) (int, error) {
func (moduleDeploymentController *ModuleDeploymentController) getReadyNodeCount(ctx context.Context, clusterName string) (int, error) {
logger := zaplogger.FromContext(ctx)
nodeList := &corev1.NodeList{}
err := mdc.cache.List(ctx, nodeList, &client.ListOptions{
err := moduleDeploymentController.cache.List(ctx, nodeList, &client.ListOptions{
LabelSelector: labels.SelectorFromSet(map[string]string{vkModel.LabelKeyOfBaseClusterName: clusterName}),
})

Expand All @@ -378,12 +378,12 @@ func (mdc *ModuleDeploymentController) getReadyNodeCount(ctx context.Context, cl
}

// updateDeploymentReplicasOfKubernetes updates the replicas of a deployment in Kubernetes.
func (mdc *ModuleDeploymentController) updateDeploymentReplicasOfKubernetes(ctx context.Context, replicas int, deployment appsv1.Deployment) (error, vkModel.ErrorCode) {
func (moduleDeploymentController *ModuleDeploymentController) updateDeploymentReplicasOfKubernetes(ctx context.Context, replicas int, deployment appsv1.Deployment) (error, vkModel.ErrorCode) {
old := deployment.DeepCopy()
patch := client.MergeFrom(old)

deployment.Spec.Replicas = ptr.To[int32](int32(replicas))
err := mdc.client.Patch(ctx, &deployment, patch)
err := moduleDeploymentController.client.Patch(ctx, &deployment, patch)
if err != nil && !errors2.IsNotFound(err) {
return err, model.CodeKubernetesOperationFailed
}
Expand Down
Loading