Skip to content

Commit

Permalink
Adapt scheduler report
Browse files Browse the repository at this point in the history
Signed-off-by: ii2day <ji.li@daocloud.io>
  • Loading branch information
ii2day authored and Icarus9913 committed Aug 14, 2023
1 parent be513ee commit 87becb5
Show file tree
Hide file tree
Showing 25 changed files with 440 additions and 190 deletions.
20 changes: 17 additions & 3 deletions charts/templates/configmap-app-template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,6 @@ data:
targetPort: metrics
protocol: TCP
{{- end }}
{{- if .Values.feature.enableIPv4 }}
- name: http
port: {{ .Values.kdoctorAgent.httpServer.appHttpPort }}
targetPort: http
Expand All @@ -305,9 +304,24 @@ data:
port: {{ .Values.kdoctorAgent.httpServer.appHttpsPort }}
targetPort: https
protocol: TCP
{{- end }}
ipFamilyPolicy: SingleStack
ipFamilies:
- IPv4
selector:
{{- include "project.kdoctorAgent.selectorLabels" . | nindent 8 }}
{{- include "project.kdoctorAgent.selectorLabels" . | nindent 8 }}
ingress.yml: |
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
namespace: {{ .Release.Namespace | quote }}
spec:
rules:
- http:
paths:
- path: {{ .Values.kdoctorAgent.ingress.route | quote }}
pathType: Exact
backend:
service:
name: kdoctor
port:
number: {{ .Values.kdoctorAgent.httpServer.appHttpPort }}
1 change: 1 addition & 0 deletions charts/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ spec:
- --configmap-daemonset-template=/tmp/configmap-app-template/daemonset.yml
- --configmap-pod-template=/tmp/configmap-app-template/pod.yml
- --configmap-service-template=/tmp/configmap-app-template/service.yml
- --configmap-ingress-template=/tmp/configmap-app-template/ingress.yml
- --tls-ca-cert=/etc/tls/ca.crt
- --tls-server-cert=/etc/tls/tls.crt
- --tls-server-key=/etc/tls/tls.key
Expand Down
25 changes: 0 additions & 25 deletions charts/templates/ingress.yaml

This file was deleted.

4 changes: 2 additions & 2 deletions cmd/agent/cmd/cert.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func GenServerCert(logger *zap.Logger) {
if types.AgentConfig.Configmap.EnableIPv6 {
serviceIPv6, err := k8sObjManager.GetK8sObjManager().GetServiceAccessUrl(context.Background(), types.AgentConfig.ServiceV6Name, types.AgentConfig.PodNamespace, servicePortName)
if err != nil {
logger.Sugar().Fatalf("failed to get kdoctor ipv4 service %s/%s, reason=%v ", types.AgentConfig.PodNamespace, types.AgentConfig.ServiceV6Name, err)
logger.Sugar().Fatalf("failed to get kdoctor ipv6 service %s/%s, reason=%v ", types.AgentConfig.PodNamespace, types.AgentConfig.ServiceV6Name, err)
}
// ipv6 ip
logger.Sugar().Debugf("get ipv6 serviceAccessurl %v", serviceIPv6)
Expand Down Expand Up @@ -129,7 +129,7 @@ func checkServiceReady(logger *zap.Logger) {

if types.AgentConfig.Configmap.EnableIPv6 {
for {
_, err := k8sObjManager.GetK8sObjManager().GetService(ctx, types.AgentConfig.ServiceV4Name, types.AgentConfig.PodNamespace)
_, err := k8sObjManager.GetK8sObjManager().GetService(ctx, types.AgentConfig.ServiceV6Name, types.AgentConfig.PodNamespace)
if nil != err {
if errors.IsNotFound(err) {
logger.Sugar().Errorf("agent runtime IPv6 service %s/%s not exists, wait for controller to create it", types.AgentConfig.PodNamespace, types.AgentConfig.ServiceV6Name)
Expand Down
6 changes: 0 additions & 6 deletions cmd/agent/cmd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,6 @@ func init() {
globalFlag.StringVar(&types.AgentConfig.TaskName, "task-name", "", "task name")
globalFlag.StringVar(&types.AgentConfig.ServiceV4Name, "service-ipv4-name", "", "agent IPv4 service name")
globalFlag.StringVar(&types.AgentConfig.ServiceV6Name, "service-ipv6-name", "", "agent IPv6 service name")
if err := rootCmd.MarkPersistentFlagRequired("task-kind"); nil != err {
logger.Sugar().Fatalf("failed to mark persistentFlag 'task-kind' as required, error: %v", err)
}
if err := rootCmd.MarkPersistentFlagRequired("task-name"); nil != err {
logger.Sugar().Fatalf("failed to mark persistentFlag 'task-name' as required, error: %v", err)
}

globalFlag.BoolVarP(&types.AgentConfig.AppMode, "app-mode", "A", false, "app mode")
globalFlag.BoolVarP(&types.AgentConfig.TlsInsecure, "tls-insecure", "K", true, "skip verify tls")
Expand Down
3 changes: 3 additions & 0 deletions cmd/controller/cmd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ func init() {
globalFlag.StringVar(&types.ControllerConfig.ConfigMapDaemonsetPath, "configmap-daemonset-template", "", "configmap daemonset template file path")
globalFlag.StringVar(&types.ControllerConfig.ConfigMapPodPath, "configmap-pod-template", "", "configmap pod template file path")
globalFlag.StringVar(&types.ControllerConfig.ConfigMapServicePath, "configmap-service-template", "", "configmap service template file path")
globalFlag.StringVar(&types.ControllerConfig.ConfigMapIngressPath, "configmap-ingress-template", "", "configmap ingress template file path")
globalFlag.StringVarP(&types.ControllerConfig.TlsCaCertPath, "tls-ca-cert", "R", "", "ca file path")
globalFlag.StringVarP(&types.ControllerConfig.TlsServerCertPath, "tls-server-cert", "T", "", "server cert file path")
globalFlag.StringVarP(&types.ControllerConfig.TlsServerKeyPath, "tls-server-key", "Y", "", "server key file path")
Expand All @@ -90,6 +91,7 @@ func init() {
logger.Info("configmap-daemonset-template = " + types.ControllerConfig.ConfigMapDaemonsetPath)
logger.Info("configmap-pod-template = " + types.ControllerConfig.ConfigMapPodPath)
logger.Info("configmap-service-template = " + types.ControllerConfig.ConfigMapServicePath)
logger.Info("configmap-ingress-template = " + types.ControllerConfig.ConfigMapIngressPath)
logger.Info("tls-ca-cert = " + types.ControllerConfig.TlsCaCertPath)
logger.Info("tls-server-cert = " + types.ControllerConfig.TlsServerCertPath)
logger.Info("tls-server-key = " + types.ControllerConfig.TlsServerKeyPath)
Expand Down Expand Up @@ -122,6 +124,7 @@ func init() {
fn(types.ControllerConfig.ConfigMapDaemonsetPath, types.DaemonsetTempl)
fn(types.ControllerConfig.ConfigMapPodPath, types.PodTempl)
fn(types.ControllerConfig.ConfigMapServicePath, types.ServiceTempl)
fn(types.ControllerConfig.ConfigMapIngressPath, types.IngressTempl)
}
cobra.OnInitialize(printFlag)

Expand Down
4 changes: 2 additions & 2 deletions pkg/grpcManager/server_implement.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,12 @@ import (
"context"
"fmt"
"github.com/kdoctor-io/kdoctor/api/v1/agentGrpc"
"github.com/kdoctor-io/kdoctor/pkg/utils"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"io"
"time"

"github.com/kdoctor-io/kdoctor/pkg/utils"
)

// ------ implement
Expand All @@ -31,6 +30,7 @@ func (s *myGrpcServer) ExecRemoteCmd(stream agentGrpc.CmdService_ExecRemoteCmdSe
logger.Error("grpc server ExecRemoteCmd: got empty command \n")
return nil, status.Error(codes.InvalidArgument, "request command is empty")
}

if r.Timeoutsecond == 0 {
logger.Error("grpc server ExecRemoteCmd: got empty timeout \n")
return nil, status.Error(codes.InvalidArgument, "request command is empty")
Expand Down
83 changes: 83 additions & 0 deletions pkg/k8ObjManager/deployment.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright 2023 Authors of kdoctor-io
// SPDX-License-Identifier: Apache-2.0

package k8sObjManager

import (
"context"
"fmt"
"github.com/kdoctor-io/kdoctor/pkg/utils"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"sigs.k8s.io/controller-runtime/pkg/client"
)

func (nm *k8sObjManager) GetDeployment(ctx context.Context, name, namespace string) (*appsv1.Deployment, error) {
d := &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
},
}
key := client.ObjectKeyFromObject(d)
if e := nm.client.Get(ctx, key, d); e != nil {
return nil, fmt.Errorf("failed to get deployment %v/%v, reason=%v", namespace, name, e)
}
return d, nil
}

func (nm *k8sObjManager) ListDeploymentPod(ctx context.Context, deploymentName, deploymentNameSpace string) ([]corev1.Pod, error) {

dae, e := nm.GetDeployment(ctx, deploymentName, deploymentNameSpace)
if e != nil {
return nil, fmt.Errorf("failed to get daemonset, error=%v", e)
}

podLable := dae.Spec.Template.Labels
opts := []client.ListOption{
client.MatchingLabelsSelector{
Selector: labels.SelectorFromSet(podLable),
},
}
return nm.GetPodList(ctx, opts...)
}

func (nm *k8sObjManager) ListDeploymentPodIPs(ctx context.Context, deploymentName, deploymentNameSpace string) (PodIps, error) {

podList, e := nm.ListDeploymentPod(ctx, deploymentName, deploymentNameSpace)
if e != nil {
return nil, e
}
if len(podList) == 0 {
return nil, fmt.Errorf("failed to get any pods")
}

result := PodIps{}

for _, v := range podList {
t := IPs{}
t.InterfaceName = "eth0"
for _, m := range v.Status.PodIPs {
if utils.CheckIPv4Format(m.IP) {
t.IPv4 = m.IP
} else if utils.CheckIPv6Format(m.IP) {
t.IPv6 = m.IP
}
}
result[v.Name] = []IPs{t}
}
return result, nil
}

func (nm *k8sObjManager) ListDeployPodMultusIPs(ctx context.Context, deploymentName, deploymentNameSpace string) (PodIps, error) {
podlist, e := nm.ListDeploymentPod(ctx, deploymentName, deploymentNameSpace)
if e != nil {
return nil, e
}
if len(podlist) == 0 {
return nil, fmt.Errorf("failed to get any pods")
}
return parseMultusIP(podlist)
}
5 changes: 5 additions & 0 deletions pkg/k8ObjManager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ type K8sObjManager interface {
ListDaemonsetPodIPs(ctx context.Context, daemonsetName, daemonsetNameSpace string) (PodIps, error)
ListDaemonsetPodMultusIPs(ctx context.Context, daemonsetName, daemonsetNameSpace string) (PodIps, error)

// deployment
GetDeployment(ctx context.Context, name, namespace string) (*appsv1.Deployment, error)
ListDeploymentPodIPs(ctx context.Context, deploymentName, deploymentNameSpace string) (PodIps, error)
ListDeployPodMultusIPs(ctx context.Context, deploymentName, deploymentNameSpace string) (PodIps, error)

// pod
GetPodList(ctx context.Context, opts ...client.ListOption) ([]corev1.Pod, error)
ListSelectedPodMultusIPs(ctx context.Context, labelSelector *metav1.LabelSelector) (PodIps, error)
Expand Down
5 changes: 4 additions & 1 deletion pkg/pluginManager/agentManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package pluginManager
import (
"context"
"fmt"
networkingv1 "k8s.io/api/networking/v1"
"time"

appsv1 "k8s.io/api/apps/v1"
Expand Down Expand Up @@ -36,7 +37,9 @@ func (s *pluginManager) RunAgentController() {
if e := crd.AddToScheme(scheme); e != nil {
logger.Sugar().Fatalf("failed to add scheme for plugins, reason=%v", e)
}

if e := networkingv1.AddToScheme(scheme); e != nil {
logger.Sugar().Fatalf("failed to add scheme for plugins, reason=%v", e)
}
var fm fileManager.FileManager
var e error
if types.AgentConfig.EnableAggregateAgentReport {
Expand Down
37 changes: 22 additions & 15 deletions pkg/pluginManager/controllerManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
networkingv1 "k8s.io/api/networking/v1"
"k8s.io/apimachinery/pkg/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
ctrl "sigs.k8s.io/controller-runtime"
Expand All @@ -34,7 +35,9 @@ func (s *pluginManager) RunControllerController(healthPort int, webhookPort int,
if e := crd.AddToScheme(scheme); e != nil {
logger.Sugar().Fatalf("failed to add scheme for plugins, reason=%v", e)
}

if e := networkingv1.AddToScheme(scheme); e != nil {
logger.Sugar().Fatalf("failed to add scheme for plugins, reason=%v", e)
}
n := ctrl.Options{
Scheme: scheme,
MetricsBindAddress: "0",
Expand Down Expand Up @@ -89,22 +92,17 @@ func (s *pluginManager) RunControllerController(healthPort int, webhookPort int,
var e error

// TODO: runControllerAggregateReportOnce need agents' IPs
//if types.ControllerConfig.EnableAggregateAgentReport {
if false {
if types.ControllerConfig.EnableAggregateAgentReport {
// fileManager takes charge of writing and removing local report
gcInterval := time.Duration(types.ControllerConfig.CleanAgedReportInMinute) * time.Minute
logger.Sugar().Infof("save report to %v, clean interval %v", types.ControllerConfig.DirPathControllerReport, gcInterval.String())
fm, e = fileManager.NewManager(logger.Named("fileManager"), types.ControllerConfig.DirPathControllerReport, gcInterval)
if e != nil {
logger.Sugar().Fatalf("failed to new fileManager , reason=%v", e)
}

// reportManager takes charge of sync reports from remote agents
interval := time.Duration(types.ControllerConfig.CollectAgentReportIntervalInSecond) * time.Second
logger.Sugar().Infof("run report Sync manager, save to %v, collectInterval %v ", types.ControllerConfig.DirPathControllerReport, interval)
reportManager.InitReportManager(logger.Named("reportSyncManager"), types.ControllerConfig.DirPathControllerReport, interval)
}

runtimeDB := make([]scheduler.DB, 0, len(s.chainingPlugins))
ctx, cancelFunc := context.WithCancel(context.TODO())
for name, plugin := range s.chainingPlugins {
// setup reconcile
Expand All @@ -115,6 +113,14 @@ func (s *pluginManager) RunControllerController(healthPort int, webhookPort int,
logger.Sugar().Debugf("there's no uniqueMatchLabelKey in the configmap, try to use the default '%s'", scheduler.UniqueMatchLabelKey)
uniqueMatchLabelKey = scheduler.UniqueMatchLabelKey
}
tracker := scheduler.NewTracker(mgr.GetClient(), mgr.GetAPIReader(), scheduler.TrackerConfig{
ItemChannelBuffer: int(types.ControllerConfig.ResourceTrackerChannelBuffer),
MaxDatabaseCap: int(types.ControllerConfig.ResourceTrackerMaxDatabaseCap),
ExecutorWorkers: int(types.ControllerConfig.ResourceTrackerExecutorWorkers),
SignalTimeOutDuration: time.Duration(types.ControllerConfig.ResourceTrackerSignalTimeoutSeconds) * time.Second,
TraceGapDuration: time.Duration(types.ControllerConfig.ResourceTrackerTraceGapSeconds) * time.Second,
}, logger.Named(name+"Tracker"))
runtimeDB = append(runtimeDB, tracker.DB)
k := &pluginControllerReconciler{
logger: logger.Named(name + "Reconciler"),
plugin: plugin,
Expand All @@ -124,13 +130,7 @@ func (s *pluginManager) RunControllerController(healthPort int, webhookPort int,
fm: fm,
crdKindName: name,
runtimeUniqueMatchLabelKey: uniqueMatchLabelKey,
tracker: scheduler.NewTracker(mgr.GetClient(), mgr.GetAPIReader(), scheduler.TrackerConfig{
ItemChannelBuffer: int(types.ControllerConfig.ResourceTrackerChannelBuffer),
MaxDatabaseCap: int(types.ControllerConfig.ResourceTrackerMaxDatabaseCap),
ExecutorWorkers: int(types.ControllerConfig.ResourceTrackerExecutorWorkers),
SignalTimeOutDuration: time.Duration(types.ControllerConfig.ResourceTrackerSignalTimeoutSeconds) * time.Second,
TraceGapDuration: time.Duration(types.ControllerConfig.ResourceTrackerTraceGapSeconds) * time.Second,
}, logger.Named(name+"Tracker")),
tracker: tracker,
}
k.tracker.Start(ctx)
if e := k.SetupWithManager(mgr); e != nil {
Expand All @@ -148,6 +148,13 @@ func (s *pluginManager) RunControllerController(healthPort int, webhookPort int,
}
}

if types.ControllerConfig.EnableAggregateAgentReport {
// reportManager takes charge of sync reports from remote agents
interval := time.Duration(types.ControllerConfig.CollectAgentReportIntervalInSecond) * time.Second
logger.Sugar().Infof("run report Sync manager, save to %v, collectInterval %v ", types.ControllerConfig.DirPathControllerReport, interval)
reportManager.InitReportManager(logger.Named("reportSyncManager"), types.ControllerConfig.DirPathControllerReport, interval, runtimeDB)
}

go func() {
msg := "reconcile of plugin down"
if e := mgr.Start(ctrl.SetupSignalHandler()); e != nil {
Expand Down
5 changes: 3 additions & 2 deletions pkg/pluginManager/controllerTools.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ import (
)

func (s *pluginControllerReconciler) GetSpiderAgentNodeNotInRecord(ctx context.Context, succeedNodeList []string, podMatchLabel client.MatchingLabels) ([]string, error) {
var allNodeList, failNodeList []string
allNodeList, failNodeList := []string{}, []string{}

podList := corev1.PodList{}

err := s.client.List(ctx, &podList,
Expand All @@ -50,7 +51,7 @@ func (s *pluginControllerReconciler) GetSpiderAgentNodeNotInRecord(ctx context.C
s.logger.Sugar().Debugf("all agent nodes: %v", allNodeList)

// gather the failure Node list
slices.Filter(failNodeList, allNodeList, func(s string) bool {
failNodeList = slices.Filter(failNodeList, allNodeList, func(s string) bool {
return !slices.Contains(succeedNodeList, s)
})

Expand Down
Loading

0 comments on commit 87becb5

Please sign in to comment.