Skip to content

Commit

Permalink
Add get metrics info support
Browse files Browse the repository at this point in the history
  • Loading branch information
xianlu committed Jan 17, 2019
1 parent c69c7c2 commit 75ed7d7
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 3 deletions.
26 changes: 24 additions & 2 deletions cmd/argo/commands/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package commands
import (
"encoding/json"
"fmt"
"k8s.io/api/core/v1"
"log"
"os"
"strings"
Expand All @@ -19,8 +20,10 @@ import (
const onExitSuffix = "onExit"

var (
kubeClient *kubernetes.Clientset
showResUsage bool
kubeClient *kubernetes.Clientset
showResUsage bool
showMetrics bool
metricsConfigMap *v1.ConfigMap
)

func NewGetCommand() *cobra.Command {
Expand All @@ -36,19 +39,24 @@ func NewGetCommand() *cobra.Command {
cmd.HelpFunc()(cmd, args)
os.Exit(1)
}

wfClient := InitWorkflowClient()
kubeClient = initKubeClient()
wf, err := wfClient.Get(args[0], metav1.GetOptions{})
if err != nil {
log.Fatal(err)
}
if showMetrics {
metricsConfigMap = getMetricsConfigMap(wf, kubeClient)
}
printWorkflow(wf, output)
},
}

command.Flags().StringVarP(&output, "output", "o", "", "Output format. One of: json|yaml|wide")
command.Flags().BoolVar(&noColor, "no-color", false, "Disable colorized output")
command.Flags().BoolVar(&showResUsage, "show", false, "Show workflow resource usage")
command.Flags().BoolVar(&showMetrics, "metrics", false, "Show workflow metrics usage")
return command
}

Expand Down Expand Up @@ -96,6 +104,11 @@ func printWorkflowHelper(wf *wfv1.Workflow, outFmt string) {
fmt.Printf("%-20s %v (core*hour)\n", "Total CPU:", cpu)
fmt.Printf("%-20s %v (GB*hour)\n", "Total Memory:", memory)
}
if showMetrics {
cpu, memory := getWorkflowMetrics(metricsConfigMap)
fmt.Printf("%-20s %v (core*hour)\n", "Total CPU:", cpu)
fmt.Printf("%-20s %v (GB*hour)\n", "Total Memory:", memory)
}
}

if len(wf.Spec.Arguments.Parameters) > 0 {
Expand Down Expand Up @@ -141,6 +154,8 @@ func printWorkflowHelper(wf *wfv1.Workflow, outFmt string) {
} else {
if showResUsage {
fmt.Fprintf(w, "%s\tPODNAME\tDURATION\tMESSAGE\tCPU(core*hour)\tMEMORY(GB*hour)\n", ansiFormat("STEP", FgDefault))
} else if showMetrics {
fmt.Fprintf(w, "%s\tPODNAME\tDURATION\tMESSAGE\tCPU(core*hour)\tMEMORY(GB*hour)\n", ansiFormat("STEP", FgDefault))
} else {
fmt.Fprintf(w, "%s\tPODNAME\tDURATION\tMESSAGE\n", ansiFormat("STEP", FgDefault))
}
Expand Down Expand Up @@ -426,13 +441,18 @@ func printNode(w *tabwriter.Writer, wf *wfv1.Workflow, node wfv1.NodeStatus, dep
if showResUsage {
cpu, memory := getCpuMemoryRequest(node, wf.Namespace, kubeClient, wf)
args = []interface{}{nodePrefix, nodeName, node.ID, duration, node.Message, cpu, memory}
} else if showMetrics {
cpu, memory := getPodMetrics(node, metricsConfigMap)
args = []interface{}{nodePrefix, nodeName, node.ID, duration, node.Message, cpu, memory}
} else {
args = []interface{}{nodePrefix, nodeName, node.ID, duration, node.Message}
}

} else {
if showResUsage {
args = []interface{}{nodePrefix, nodeName, "", "", node.Message, 0.0, 0.0}
} else if showMetrics {
args = []interface{}{nodePrefix, nodeName, "", "", node.Message, 0.0, 0.0}
} else {
args = []interface{}{nodePrefix, nodeName, "", "", node.Message}
}
Expand All @@ -445,6 +465,8 @@ func printNode(w *tabwriter.Writer, wf *wfv1.Workflow, node wfv1.NodeStatus, dep
} else {
if showResUsage {
fmt.Fprintf(w, "%s%s\t%s\t%s\t%s\t%g\t%g\n", args...)
} else if showMetrics {
fmt.Fprintf(w, "%s%s\t%s\t%s\t%s\t%g\t%g\n", args...)
} else {
fmt.Fprintf(w, "%s%s\t%s\t%s\t%s\n", args...)
}
Expand Down
85 changes: 84 additions & 1 deletion cmd/argo/commands/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ import (
wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"os"
"strconv"
"strings"
)

func NewTopCommand() *cobra.Command {
Expand All @@ -34,9 +36,54 @@ func NewTopCommand() *cobra.Command {
return command
}

func getMetricsConfigMap(wf *wfv1.Workflow, kubeClient *kubernetes.Clientset) *v1.ConfigMap {
cm, err := kubeClient.CoreV1().ConfigMaps(wf.Namespace).Get(wf.Name, metav1.GetOptions{})
if err != nil {
log.Warningf("getMetricsConfigMap error %v", err)
return nil
}

return cm
}

func getWorkflowMetrics(metricsConfigMap *v1.ConfigMap) (float64, float64) {
if metricsConfigMap == nil {
log.Warningf("metricsConfigMap is nil")
return 0, 0
}
data := metricsConfigMap.Data
var totalCpu float64
var totalMemory float64

for podName, value := range data {
if strings.HasSuffix(podName, "cpu") {
tmpCpuValue, tmpErr := strconv.ParseFloat(value, 64)
if tmpErr != nil {
log.Warningf("Parse %s to float64 error %v", value, tmpErr)
continue
}
totalCpu += tmpCpuValue
}

if strings.HasSuffix(podName, "memory") {
tmpMemoryValue, tmpErr := strconv.ParseFloat(value, 64)
if tmpErr != nil {
log.Warningf("Parse %s to float64 error %v", value, tmpErr)
continue
}
totalMemory += tmpMemoryValue
}
}

totalCpu /= 1000
totalMemory /= 1024 * 1024 * 1024
return Decimal(totalCpu), Decimal(totalMemory)
}

func getPodResource(wf *wfv1.Workflow, kubeClient *kubernetes.Clientset) (float64, float64, float64, float64) {
if wf == nil {
log.Fatal("Wf is nil")
log.Warningf("Wf is nil")
return 0, 0, 0, 0
}
//unit cpu/minute
cpuMax := 0.0
Expand Down Expand Up @@ -114,6 +161,42 @@ func getCpuMemoryRequest(node wfv1.NodeStatus, namespace string, kubeClient *kub
return 0, 0
}

func getPodMetrics(node wfv1.NodeStatus, metricsConfigMap *v1.ConfigMap) (float64, float64) {
if node.Type != wfv1.NodeTypePod {
return 0, 0
}

if metricsConfigMap == nil {
return 0, 0
}

data := metricsConfigMap.Data
var podCpuMetrics float64
var podMemoryMetrics float64

if tmpPodCpuStr, ok := data[node.ID+".cpu"]; ok {
tmpPodCpuValue, tmpErr := strconv.ParseFloat(tmpPodCpuStr, 64)
if tmpErr != nil {
log.Warningf("Parse %s to float64 error %v", tmpPodCpuStr, tmpErr)
} else {
podCpuMetrics = tmpPodCpuValue
}
}

if tmpPodMemoryStr, ok := data[node.ID+".memory"]; ok {
tmpPodMemoryValue, tmpErr := strconv.ParseFloat(tmpPodMemoryStr, 64)
if tmpErr != nil {
log.Warningf("Parse %s to float64 error %v", tmpPodMemoryStr, tmpErr)
} else {
podMemoryMetrics = tmpPodMemoryValue
}
}

podCpuMetrics /= 1000
podMemoryMetrics /= 1024 * 1024 * 1024
return Decimal(podCpuMetrics), Decimal(podMemoryMetrics)
}

func SetClientConfig(client clientcmd.ClientConfig) {
clientConfig = client
}
Expand Down

0 comments on commit 75ed7d7

Please sign in to comment.