Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,6 @@ COPY --from=builder /workspace/module_controller .

EXPOSE 9090
EXPOSE 8080
EXPOSE 7777

ENTRYPOINT ["./module_controller"]
64 changes: 49 additions & 15 deletions cmd/module-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/koupleless/module_controller/common/model"
"github.com/koupleless/module_controller/controller/module_deployment_controller"
"github.com/koupleless/module_controller/module_tunnels"
"github.com/koupleless/module_controller/module_tunnels/koupleless_http_tunnel"
"github.com/koupleless/module_controller/module_tunnels/koupleless_mqtt_tunnel"
"github.com/koupleless/module_controller/report_server"
"github.com/koupleless/virtual-kubelet/common/log"
Expand Down Expand Up @@ -71,13 +72,20 @@ func main() {

if err != nil {
log.G(ctx).WithError(err).Error("failed to parse WORKLOAD_MAX_LEVEL, will be set to 3 default")
workloadMaxLevel = 3
}

vnodeWorkerNum, err := strconv.Atoi(utils.GetEnv("VNODE_WORKER_NUM", "8"))
if err != nil {
log.G(ctx).WithError(err).Error("failed to parse VNODE_WORKER_NUM, will be set to 8 default")
vnodeWorkerNum = 8
Comment on lines +75 to +81
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Refactor Environment Variable Parsing into Utility Function

The code for parsing integer environment variables and handling errors is repeated for WORKLOAD_MAX_LEVEL and VNODE_WORKER_NUM. Consider refactoring this logic into a utility function to reduce code duplication and improve maintainability.

Example utility function in the utils package:

func GetEnvAsInt(key string, defaultValue int) int {
	valStr := GetEnv(key, strconv.Itoa(defaultValue))
	val, err := strconv.Atoi(valStr)
	if err != nil {
		log.G(ctx).WithError(err).Errorf("failed to parse %s, defaulting to %d", key, defaultValue)
		return defaultValue
	}
	return val
}

Update the code to use the utility function:

-	workloadMaxLevel, err := strconv.Atoi(utils.GetEnv("WORKLOAD_MAX_LEVEL", "3"))
-	if err != nil {
-		log.G(ctx).WithError(err).Error("failed to parse WORKLOAD_MAX_LEVEL, will be set to 3 default")
-		workloadMaxLevel = 3
-	}
+	workloadMaxLevel := utils.GetEnvAsInt("WORKLOAD_MAX_LEVEL", 3)

-	vnodeWorkerNum, err := strconv.Atoi(utils.GetEnv("VNODE_WORKER_NUM", "8"))
-	if err != nil {
-		log.G(ctx).WithError(err).Error("failed to parse VNODE_WORKER_NUM, will be set to 8 default")
-		vnodeWorkerNum = 8
-	}
+	vnodeWorkerNum := utils.GetEnvAsInt("VNODE_WORKER_NUM", 8)

}

kubeConfig := config.GetConfigOrDie()
mgr, err := manager.New(kubeConfig, manager.Options{
Cache: cache.Options{},
Metrics: server.Options{
BindAddress: "0",
BindAddress: ":9090",
},
})

Expand All @@ -88,9 +96,36 @@ func main() {

tracker.SetTracker(&tracker.DefaultTracker{})

tl := &koupleless_mqtt_tunnel.MqttTunnel{
Cache: mgr.GetCache(),
Client: mgr.GetClient(),
tunnels := make([]tunnel.Tunnel, 0)
moduleTunnels := make([]module_tunnels.ModuleTunnel, 0)

mqttTunnelEnable := utils.GetEnv("ENABLE_MQTT_TUNNEL", "false")
if mqttTunnelEnable == "true" {
Comment on lines +102 to +103
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Use Boolean Parsing for Environment Variables

Currently, environment variables ENABLE_MQTT_TUNNEL and ENABLE_HTTP_TUNNEL are compared as strings to "true". This approach can be error-prone if unexpected values are provided. Consider parsing these variables into boolean values using strconv.ParseBool to improve robustness.

Example using strconv.ParseBool:

mqttTunnelEnable, err := strconv.ParseBool(utils.GetEnv("ENABLE_MQTT_TUNNEL", "false"))
if err != nil {
	log.G(ctx).WithError(err).Error("failed to parse ENABLE_MQTT_TUNNEL, defaulting to false")
	mqttTunnelEnable = false
}
if mqttTunnelEnable {
	// ...
}

httpTunnelEnable, err := strconv.ParseBool(utils.GetEnv("ENABLE_HTTP_TUNNEL", "false"))
if err != nil {
	log.G(ctx).WithError(err).Error("failed to parse ENABLE_HTTP_TUNNEL, defaulting to false")
	httpTunnelEnable = false
}
if httpTunnelEnable {
	// ...
}

Alternatively, create a utility function GetEnvAsBool in the utils package to encapsulate this logic.

Also applies to: 113-114

mqttTl := &koupleless_mqtt_tunnel.MqttTunnel{
Cache: mgr.GetCache(),
Client: mgr.GetClient(),
}

tunnels = append(tunnels, mqttTl)
moduleTunnels = append(moduleTunnels, mqttTl)
}

httpTunnelEnable := utils.GetEnv("ENABLE_HTTP_TUNNEL", "false")
if httpTunnelEnable == "true" {
httpTunnelListenPort, err := strconv.Atoi(utils.GetEnv("HTTP_TUNNEL_LISTEN_PORT", "7777"))

if err != nil {
log.G(ctx).WithError(err).Error("failed to parse HTTP_TUNNEL_LISTEN_PORT, set default port 7777")
httpTunnelListenPort = 7777
}
Comment on lines +115 to +120
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add port range validation for HTTP tunnel.

The HTTP tunnel port parsing should include validation to ensure it's within a valid range (1-65535).

 httpTunnelListenPort, err := strconv.Atoi(utils.GetEnv("HTTP_TUNNEL_LISTEN_PORT", "7777"))
 if err != nil {
     log.G(ctx).WithError(err).Error("failed to parse HTTP_TUNNEL_LISTEN_PORT, set default port 7777")
     httpTunnelListenPort = 7777
+} else if httpTunnelListenPort < 1 || httpTunnelListenPort > 65535 {
+    log.G(ctx).Error("HTTP_TUNNEL_LISTEN_PORT out of valid range (1-65535), set default port 7777")
+    httpTunnelListenPort = 7777
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
httpTunnelListenPort, err := strconv.Atoi(utils.GetEnv("HTTP_TUNNEL_LISTEN_PORT", "7777"))
if err != nil {
log.G(ctx).WithError(err).Error("failed to parse HTTP_TUNNEL_LISTEN_PORT, set default port 7777")
httpTunnelListenPort = 7777
}
httpTunnelListenPort, err := strconv.Atoi(utils.GetEnv("HTTP_TUNNEL_LISTEN_PORT", "7777"))
if err != nil {
log.G(ctx).WithError(err).Error("failed to parse HTTP_TUNNEL_LISTEN_PORT, set default port 7777")
httpTunnelListenPort = 7777
} else if httpTunnelListenPort < 1 || httpTunnelListenPort > 65535 {
log.G(ctx).Error("HTTP_TUNNEL_LISTEN_PORT out of valid range (1-65535), set default port 7777")
httpTunnelListenPort = 7777
}
🧰 Tools
🪛 GitHub Check: codecov/patch

[warning] 113-120: cmd/module-controller/main.go#L113-L120
Added lines #L113 - L120 were not covered by tests


httpTl := &koupleless_http_tunnel.HttpTunnel{
Cache: mgr.GetCache(),
Client: mgr.GetClient(),
Port: httpTunnelListenPort,
}
tunnels = append(tunnels, httpTl)
moduleTunnels = append(moduleTunnels, httpTl)
}

rcc := vkModel.BuildVNodeControllerConfig{
Expand All @@ -99,11 +134,10 @@ func main() {
VPodIdentity: model.ComponentModule,
IsCluster: isCluster,
WorkloadMaxLevel: workloadMaxLevel,
VNodeWorkerNum: vnodeWorkerNum,
}

vc, err := vnode_controller.NewVNodeController(&rcc, []tunnel.Tunnel{
tl,
})
vc, err := vnode_controller.NewVNodeController(&rcc, tunnels)
if err != nil {
log.G(ctx).Error(err, "unable to set up VNodeController")
return
Expand All @@ -118,9 +152,7 @@ func main() {
enableModuleDeploymentController := utils.GetEnv("ENABLE_MODULE_DEPLOYMENT_CONTROLLER", "false")

if enableModuleDeploymentController == "true" {
mdc, err := module_deployment_controller.NewModuleDeploymentController(env, []module_tunnels.ModuleTunnel{
tl,
})
mdc, err := module_deployment_controller.NewModuleDeploymentController(env, moduleTunnels)
if err != nil {
log.G(ctx).Error(err, "unable to set up module_deployment_controller")
return
Expand All @@ -133,11 +165,13 @@ func main() {
}
}

err = tl.Start(ctx, clientID, env)
if err != nil {
log.G(ctx).WithError(err).Error("failed to start tunnel", tl.Key())
} else {
log.G(ctx).Info("Tunnel started: ", tl.Key())
for _, t := range tunnels {
err = t.Start(ctx, clientID, env)
if err != nil {
log.G(ctx).WithError(err).Error("failed to start tunnel", t.Key())
} else {
log.G(ctx).Info("Tunnel started: ", t.Key())
}
Comment on lines +168 to +174
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Improve tunnel startup coordination.

The current implementation starts tunnels sequentially without proper coordination. Consider:

  1. Adding a startup timeout for each tunnel
  2. Implementing graceful shutdown
  3. Adding health checks for tunnel readiness
+const tunnelStartTimeout = 30 * time.Second

 for _, t := range tunnels {
-    err = t.Start(ctx, clientID, env)
+    startCtx, cancel := context.WithTimeout(ctx, tunnelStartTimeout)
+    err = t.Start(startCtx, clientID, env)
+    cancel()
     if err != nil {
         log.G(ctx).WithError(err).Error("failed to start tunnel", t.Key())
     } else {
         log.G(ctx).Info("Tunnel started: ", t.Key())
     }
+    
+    // Add health check
+    if err := waitForTunnelReady(ctx, t); err != nil {
+        log.G(ctx).WithError(err).Error("tunnel failed health check", t.Key())
+    }
 }

Consider adding a helper function for tunnel health checks:

func waitForTunnelReady(ctx context.Context, t tunnel.Tunnel) error {
    // Implementation depends on tunnel interface capabilities
    // Could ping the tunnel or check its internal state
    return nil
}
🧰 Tools
🪛 GitHub Check: codecov/patch

[warning] 168-174: cmd/module-controller/main.go#L168-L174
Added lines #L168 - L174 were not covered by tests

}

log.G(ctx).Info("Module controller running")
Expand Down
3 changes: 2 additions & 1 deletion common/model/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,5 +44,6 @@ const (
)

const (
LabelKeyOfTechStack = "base.koupleless.io/stack"
LabelKeyOfTechStack = "base.koupleless.io/stack"
LabelKeyOfArkletPort = "base.koupleless.io/arklet-port"
)
11 changes: 6 additions & 5 deletions common/model/model.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package model

import (
"github.com/koupleless/arkctl/v1/service/ark"
"github.com/koupleless/module_controller/module_tunnels/koupleless_http_tunnel/ark_service"
)

// ArkMqttMsg is the response of mqtt message payload.
Expand All @@ -17,6 +17,7 @@ type Metadata struct {

// HeartBeatData is the data of base heart beat.
type HeartBeatData struct {
BaseID string `json:"baseID"`
State string `json:"state"`
MasterBizInfo Metadata `json:"masterBizInfo"`
NetworkInfo NetworkInfo `json:"networkInfo"`
Expand All @@ -29,10 +30,10 @@ type NetworkInfo struct {
}

type BizOperationResponse struct {
Command string `json:"command"`
BizName string `json:"bizName"`
BizVersion string `json:"bizVersion"`
Response ark.ArkResponseBase `json:"response"`
Command string `json:"command"`
BizName string `json:"bizName"`
BizVersion string `json:"bizVersion"`
Response ark_service.ArkResponse `json:"response"`
}

// QueryBaselineRequest is the request parameters of query baseline func
Expand Down
64 changes: 57 additions & 7 deletions common/utils/utils.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
package utils

import (
"context"
"fmt"
"github.com/koupleless/arkctl/common/fileutil"
"github.com/koupleless/arkctl/v1/service/ark"
"github.com/koupleless/module_controller/common/model"
"github.com/koupleless/virtual-kubelet/common/log"
"github.com/koupleless/virtual-kubelet/common/utils"
vkModel "github.com/koupleless/virtual-kubelet/model"
"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
apiErrors "k8s.io/apimachinery/pkg/api/errors"
"sigs.k8s.io/controller-runtime/pkg/client"
"strconv"
"strings"
"time"
)
Expand Down Expand Up @@ -61,6 +66,10 @@ func TranslateHeartBeatDataToNodeInfo(data model.HeartBeatData) vkModel.NodeInfo
if data.State == "ACTIVATED" {
state = vkModel.NodeStatusActivated
}
labels := map[string]string{}
if data.NetworkInfo.ArkletPort != 0 {
labels[model.LabelKeyOfArkletPort] = strconv.Itoa(data.NetworkInfo.ArkletPort)
}
return vkModel.NodeInfo{
Metadata: vkModel.NodeMetadata{
Name: data.MasterBizInfo.Name,
Expand All @@ -71,6 +80,7 @@ func TranslateHeartBeatDataToNodeInfo(data model.HeartBeatData) vkModel.NodeInfo
NodeIP: data.NetworkInfo.LocalIP,
HostName: data.NetworkInfo.LocalHostName,
},
CustomLabels: labels,
}
}

Expand Down Expand Up @@ -150,12 +160,14 @@ func TranslateSimpleBizDataToArkBizInfo(data model.ArkSimpleBizInfoData) *ark.Ar
}

func GetContainerStateFromBizState(bizStateIndex string) vkModel.ContainerState {
switch bizStateIndex {
case "RESOLVED":
switch strings.ToLower(bizStateIndex) {
case "resolved":
return vkModel.ContainerStateResolved
case "ACTIVATED":
case "activated":
return vkModel.ContainerStateActivated
case "DEACTIVATED":
case "deactivated":
return vkModel.ContainerStateDeactivated
case "broken":
return vkModel.ContainerStateDeactivated
}
return vkModel.ContainerStateWaiting
Expand All @@ -164,11 +176,13 @@ func GetContainerStateFromBizState(bizStateIndex string) vkModel.ContainerState
func GetArkBizStateFromSimpleBizState(bizStateIndex string) string {
switch bizStateIndex {
case "2":
return "RESOLVED"
return "resolved"
case "3":
return "ACTIVATED"
return "activated"
case "4":
return "DEACTIVATED"
return "deactivated"
case "5":
return "broken"
}
return ""
}
Expand Down Expand Up @@ -197,3 +211,39 @@ func GetLatestState(state string, records []ark.ArkBizStateRecord) (time.Time, s
}
return latestStateTime, reason, message
}

func OnBaseUnreachable(ctx context.Context, info vkModel.UnreachableNodeInfo, env string, k8sClient client.Client) {
// base not ready, delete from api server
node := corev1.Node{}
nodeName := utils.FormatNodeName(info.NodeID, env)
err := k8sClient.Get(ctx, client.ObjectKey{Name: nodeName}, &node)
logger := log.G(ctx).WithField("nodeID", info.NodeID).WithField("func", "OnNodeNotReady")
if err == nil {
// delete node from api server
logger.Info("DeleteBaseNode")
deleteErr := k8sClient.Delete(ctx, &node)
if deleteErr != nil && !apiErrors.IsNotFound(err) {
logger.WithError(deleteErr).Info("delete base node failed")
}
} else if apiErrors.IsNotFound(err) {
logger.Info("Node not found, skipping delete operation")
} else {
logger.WithError(err).Error("Failed to get node, cannot delete")
}
}
Comment on lines +215 to +233
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add retry mechanism for transient errors

While the error handling is good, consider adding a retry mechanism for transient errors when getting the node:

 func OnBaseUnreachable(ctx context.Context, info vkModel.UnreachableNodeInfo, env string, k8sClient client.Client) {
     node := corev1.Node{}
     nodeName := utils.FormatNodeName(info.NodeID, env)
-    err := k8sClient.Get(ctx, client.ObjectKey{Name: nodeName}, &node)
     logger := log.G(ctx).WithField("nodeID", info.NodeID).WithField("func", "OnNodeNotReady")
+    
+    var err error
+    maxRetries := 3
+    for i := 0; i < maxRetries; i++ {
+        err = k8sClient.Get(ctx, client.ObjectKey{Name: nodeName}, &node)
+        if err == nil || apiErrors.IsNotFound(err) {
+            break
+        }
+        logger.WithError(err).WithField("retry", i+1).Error("Failed to get node")
+        time.Sleep(time.Duration(i+1) * time.Second)
+    }
+
     if err == nil {
         // delete node from api server
         logger.Info("DeleteBaseNode")
         deleteErr := k8sClient.Delete(ctx, &node)
         if deleteErr != nil && !apiErrors.IsNotFound(err) {
             logger.WithError(deleteErr).Info("delete base node failed")
         }
     } else if apiErrors.IsNotFound(err) {
         logger.Info("Node not found, skipping delete operation")
     } else {
         logger.WithError(err).Error("Failed to get node, cannot delete")
     }
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func OnBaseUnreachable(ctx context.Context, info vkModel.UnreachableNodeInfo, env string, k8sClient client.Client) {
// base not ready, delete from api server
node := corev1.Node{}
nodeName := utils.FormatNodeName(info.NodeID, env)
err := k8sClient.Get(ctx, client.ObjectKey{Name: nodeName}, &node)
logger := log.G(ctx).WithField("nodeID", info.NodeID).WithField("func", "OnNodeNotReady")
if err == nil {
// delete node from api server
logger.Info("DeleteBaseNode")
deleteErr := k8sClient.Delete(ctx, &node)
if deleteErr != nil && !apiErrors.IsNotFound(err) {
logger.WithError(deleteErr).Info("delete base node failed")
}
} else if apiErrors.IsNotFound(err) {
logger.Info("Node not found, skipping delete operation")
} else {
logger.WithError(err).Error("Failed to get node, cannot delete")
}
}
func OnBaseUnreachable(ctx context.Context, info vkModel.UnreachableNodeInfo, env string, k8sClient client.Client) {
// base not ready, delete from api server
node := corev1.Node{}
nodeName := utils.FormatNodeName(info.NodeID, env)
logger := log.G(ctx).WithField("nodeID", info.NodeID).WithField("func", "OnNodeNotReady")
var err error
maxRetries := 3
for i := 0; i < maxRetries; i++ {
err = k8sClient.Get(ctx, client.ObjectKey{Name: nodeName}, &node)
if err == nil || apiErrors.IsNotFound(err) {
break
}
logger.WithError(err).WithField("retry", i+1).Error("Failed to get node")
time.Sleep(time.Duration(i+1) * time.Second)
}
if err == nil {
// delete node from api server
logger.Info("DeleteBaseNode")
deleteErr := k8sClient.Delete(ctx, &node)
if deleteErr != nil && !apiErrors.IsNotFound(err) {
logger.WithError(deleteErr).Info("delete base node failed")
}
} else if apiErrors.IsNotFound(err) {
logger.Info("Node not found, skipping delete operation")
} else {
logger.WithError(err).Error("Failed to get node, cannot delete")
}
}
🧰 Tools
🪛 GitHub Check: codecov/patch

[warning] 226-227: common/utils/utils.go#L226-L227
Added lines #L226 - L227 were not covered by tests


[warning] 231-232: common/utils/utils.go#L231-L232
Added lines #L231 - L232 were not covered by tests


func ExtractNetworkInfoFromNodeInfoData(initData vkModel.NodeInfo) model.NetworkInfo {
portStr := initData.CustomLabels[model.LabelKeyOfArkletPort]

port, err := strconv.Atoi(portStr)
if err != nil {
logrus.Errorf("failed to parse port %s from node info", portStr)
port = 1238
}

return model.NetworkInfo{
LocalIP: initData.NetworkInfo.NodeIP,
LocalHostName: initData.NetworkInfo.HostName,
ArkletPort: port,
}
}
Comment on lines +235 to +249
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Improve network info extraction robustness

Several improvements can be made to make this function more robust:

+const (
+    DefaultArkletPort = 1238
+    MinPort          = 1
+    MaxPort          = 65535
+)
+
 func ExtractNetworkInfoFromNodeInfoData(initData vkModel.NodeInfo) model.NetworkInfo {
     portStr := initData.CustomLabels[model.LabelKeyOfArkletPort]
     
     port, err := strconv.Atoi(portStr)
     if err != nil {
         logrus.Errorf("failed to parse port %s from node info", portStr)
-        port = 1238
+        port = DefaultArkletPort
+    } else if port < MinPort || port > MaxPort {
+        logrus.Warnf("port %d out of valid range, using default", port)
+        port = DefaultArkletPort
     }
     
+    if initData.NetworkInfo.NodeIP == "" {
+        logrus.Warn("empty NodeIP in node info")
+    }
+    if initData.NetworkInfo.HostName == "" {
+        logrus.Warn("empty HostName in node info")
+    }
+
     return model.NetworkInfo{
         LocalIP:       initData.NetworkInfo.NodeIP,
         LocalHostName: initData.NetworkInfo.HostName,
         ArkletPort:    port,
     }
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func ExtractNetworkInfoFromNodeInfoData(initData vkModel.NodeInfo) model.NetworkInfo {
portStr := initData.CustomLabels[model.LabelKeyOfArkletPort]
port, err := strconv.Atoi(portStr)
if err != nil {
logrus.Errorf("failed to parse port %s from node info", portStr)
port = 1238
}
return model.NetworkInfo{
LocalIP: initData.NetworkInfo.NodeIP,
LocalHostName: initData.NetworkInfo.HostName,
ArkletPort: port,
}
}
const (
DefaultArkletPort = 1238
MinPort = 1
MaxPort = 65535
)
func ExtractNetworkInfoFromNodeInfoData(initData vkModel.NodeInfo) model.NetworkInfo {
portStr := initData.CustomLabels[model.LabelKeyOfArkletPort]
port, err := strconv.Atoi(portStr)
if err != nil {
logrus.Errorf("failed to parse port %s from node info", portStr)
port = DefaultArkletPort
} else if port < MinPort || port > MaxPort {
logrus.Warnf("port %d out of valid range, using default", port)
port = DefaultArkletPort
}
if initData.NetworkInfo.NodeIP == "" {
logrus.Warn("empty NodeIP in node info")
}
if initData.NetworkInfo.HostName == "" {
logrus.Warn("empty HostName in node info")
}
return model.NetworkInfo{
LocalIP: initData.NetworkInfo.NodeIP,
LocalHostName: initData.NetworkInfo.HostName,
ArkletPort: port,
}
}

26 changes: 23 additions & 3 deletions common/utils/utils_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package utils

import (
"context"
"fmt"
"github.com/koupleless/arkctl/v1/service/ark"
"github.com/koupleless/module_controller/common/model"
Expand All @@ -9,6 +10,7 @@ import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"reflect"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
"testing"
"time"
)
Expand Down Expand Up @@ -277,6 +279,7 @@ func TestTranslateHeartBeatDataToNodeInfo(t *testing.T) {
NetworkInfo: model.NetworkInfo{
LocalIP: "192.168.1.1",
LocalHostName: "host1",
ArkletPort: 1238,
},
},
expected: vkModel.NodeInfo{
Expand All @@ -289,6 +292,9 @@ func TestTranslateHeartBeatDataToNodeInfo(t *testing.T) {
NodeIP: "192.168.1.1",
HostName: "host1",
},
CustomLabels: map[string]string{
model.LabelKeyOfArkletPort: "1238",
},
},
},
{
Expand All @@ -313,6 +319,7 @@ func TestTranslateHeartBeatDataToNodeInfo(t *testing.T) {
NodeIP: "192.168.1.2",
HostName: "host2",
},
CustomLabels: map[string]string{},
},
},
}
Expand Down Expand Up @@ -625,9 +632,9 @@ func TestTranslateSimpleBizDataToArkBizInfo(t *testing.T) {

func TestGetArkBizStateFromSimpleBizState(t *testing.T) {
testCases := map[string]string{
"2": "RESOLVED",
"3": "ACTIVATED",
"4": "DEACTIVATED",
"2": "resolved",
"3": "activated",
"4": "deactivated",
"123": "",
}
for input, expected := range testCases {
Expand All @@ -647,3 +654,16 @@ func TestGetLatestState_ChangeTimeLenLt3(t *testing.T) {
assert.Empty(t, reason)
assert.Empty(t, message)
}

func TestExtractNetworkInfoFromNodeInfoData(t *testing.T) {
data := ExtractNetworkInfoFromNodeInfoData(vkModel.NodeInfo{
CustomLabels: map[string]string{
model.LabelKeyOfArkletPort: ";",
},
})
assert.Equal(t, data.ArkletPort, 1238)
}

func TestOnBaseUnreachable(t *testing.T) {
OnBaseUnreachable(context.Background(), vkModel.UnreachableNodeInfo{}, "test", fake.NewFakeClient())
}
20 changes: 5 additions & 15 deletions example/quick-start/base.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,13 @@ metadata:
spec:
containers:
- name: base
image: serverless-registry.cn-shanghai.cr.aliyuncs.com/opensource/test/base-web:latest # 已经打包好的镜像
image: serverless-registry.cn-shanghai.cr.aliyuncs.com/opensource/test/base-web:1.1.1 # 已经打包好的镜像
imagePullPolicy: Always
ports:
- name: base
containerPort: 8080
- name: arklet
containerPort: 1238
Comment on lines +15 to +16
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Port 1238 needs to be exposed through a Service definition

The verification reveals that while port 1238 is defined in the Pod spec for the arklet container, there is no corresponding Service definition to expose this port. If this port needs to be accessible by other components in the cluster, you should:

  • Add a Service definition in example/quick-start/base.yaml that exposes port 1238
  • Ensure the Service selector matches the Pod labels
  • Use the same port name "arklet" in the Service definition for consistency
🔗 Analysis chain

Verify service exposure for arklet port

While the arklet port is correctly configured in the Pod spec, ensure that this port is properly exposed through a Kubernetes Service if it needs to be accessible by other components.


The previous search might have been too specific. Let's broaden our search to find Service definitions and any arklet-related configurations to verify if the port needs to be exposed through a Service.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Check if there's a corresponding Service definition for the arklet port
rg -t yaml "ports.*name:\s*arklet" 

Length of output: 36


Script:

#!/bin/bash
# Let's search more broadly for service definitions and ports in yaml files
fd -e yaml -x cat {} | grep -A 10 -B 2 "kind:\s*Service"

# Also search for any arklet related configurations
rg -t yaml "arklet" -C 5

Length of output: 1765

env:
- name: KUPLELESS_ARKLET_MQTT_BROKER
value: 10.244.0.27
- name: KUPLELESS_ARKLET_MQTT_PORT
value: "1883"
- name: KUPLELESS_ARKLET_MQTT_USERNAME
value: koupleless_base
- name: KUPLELESS_ARKLET_MQTT_PASSWORD
value: public
- name: KUPLELESS_ARKLET_MQTT_CLIENT_PREFIX
value: koupleless
- name: KUPLELESS_ARKLET_CUSTOM_TUNNEL_CLASSNAME
value: com.alipay.sofa.koupleless.arklet.tunnel.mqtt.MqttTunnel
- name: KUPLELESS_ARKLET_CUSTOM_BASE_METADATA_CLASSNAME
value: com.alipay.sofa.web.base.metadata.MetadataHook
- name: MODULE_CONTROLLER_ADDRESS
value: {YOUR_MODULE_CONTROLLER_IP}
Comment on lines +18 to +19
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Replace hardcoded placeholder with proper K8s configuration

The {YOUR_MODULE_CONTROLLER_IP} placeholder could cause deployment issues. Consider:

  1. Using Kubernetes Service discovery
  2. Using ConfigMap/Secret for the address
  3. Using environment variables with proper K8s substitution syntax (${VAR_NAME})

Example improvement using ConfigMap:

apiVersion: v1
kind: ConfigMap
metadata:
  name: module-controller-config
data:
  address: "your-module-controller-service"
---
# In the Pod spec:
env:
  - name: MODULE_CONTROLLER_ADDRESS
    valueFrom:
      configMapKeyRef:
        name: module-controller-config
        key: address
🧰 Tools
🪛 yamllint

[error] 19-19: no new line character at the end of file

(new-line-at-end-of-file)

Loading