@@ -31,12 +31,14 @@ import (
3131 "k8s.io/client-go/transport/spdy"
3232)
3333
34+ // UsageReading the CPU and memory utilisation at a given timestamp
3435type UsageReading struct {
3536 Timestamp time.Time `json:"timestamp,omitempty" yaml:"timestamp,omitempty"`
3637 CPU float64 `json:"cpu,omitempty" yaml:"cpu,omitempty"`
3738 Memory float64 `json:"memory,omitempty" yaml:"memory,omitempty"`
3839}
3940
41+ // Utilisation a list of UsageReadings by namespace
4042type Utilisation struct {
4143 Readings map [string ][]UsageReading `json:"readings,omitempty" yaml:"readings,omitempty"`
4244}
@@ -59,28 +61,27 @@ func (c *CollectCPUMemUtilisation) IsExcluded() (bool, error) {
5961 return isExcluded (c .Collector .Exclude )
6062}
6163
62- // todo: Rework.
6364func (c * CollectCPUMemUtilisation ) Collect (progressChan chan <- interface {}) (CollectorResult , error ) {
6465 thanosServiceNamespace := "kommander"
6566 output := NewResult ()
6667
67- // Get the pods of the deployment
68+ // Get the pods of the thanos-query deployment.
6869 pods , err := c .Client .CoreV1 ().Pods (thanosServiceNamespace ).List (context .Background (), metav1.ListOptions {
6970 LabelSelector : "app.kubernetes.io/component=query,app.kubernetes.io/instance=thanos,app.kubernetes.io/name=thanos" ,
7071 })
7172 if err != nil {
72- return nil , errors .Wrap (err , "Could not look up thanos-query pods: " )
73+ return nil , errors .Wrap (err , "could not look up thanos-query pods" )
7374 }
7475 if len (pods .Items ) < 1 {
7576 // Check if no thanos-query pods are found.
76- return nil , errors .New ("Could not find any thanos-query pods. " )
77+ return nil , errors .New ("could not find any thanos-query pods" )
7778 }
7879 podName := pods .Items [0 ].Name
7980
80- // Create a Prometheus API client using the API server endpoint
81+ // Create a Prometheus API client using the API server endpoint.
8182 prometheusClient , stopCh , err := StartPortForwarding (context .Background (), c .ClientConfig , coreout .NewNonInteractiveShell (os .Stdout , os .Stderr , 4 ), podName )
8283 if err != nil {
83- return nil , errors .Wrap (err , "Error querying Prometheus for CPU utilization: " )
84+ return nil , errors .Wrap (err , "could not query Prometheus for CPU utilization" )
8485 }
8586 v1api := v1 .NewAPI (prometheusClient )
8687
@@ -95,7 +96,7 @@ func (c *CollectCPUMemUtilisation) Collect(progressChan chan<- interface{}) (Col
9596 Step : step ,
9697 })
9798 if err != nil {
98- return nil , errors .Wrap (err , "Error querying Prometheus for CPU utilization: " )
99+ return nil , errors .Wrap (err , "could not query Prometheus for CPU utilization" )
99100 }
100101
101102 memoryQuery := "sum(container_memory_working_set_bytes{container!='POD', container!=''}) by (namespace)"
@@ -106,23 +107,23 @@ func (c *CollectCPUMemUtilisation) Collect(progressChan chan<- interface{}) (Col
106107 })
107108
108109 if err != nil {
109- return nil , errors .Wrap (err , "Error querying Prometheus for memory utilization: " )
110+ return nil , errors .Wrap (err , "could not query Prometheus for memory utilization" )
110111 }
111112 // Process query result
112113 memMatrix , ok := memoryResult .(model.Matrix )
113114 if ! ok {
114- return nil , errors .New ("Invalid query result, expected matrix" )
115+ return nil , errors .New ("invalid query result, expected matrix" )
115116 }
116117
117118 // Process query result
118119 cpuMatrix , ok := cpuResult .(model.Matrix )
119120 if ! ok {
120- return nil , errors .New ("Invalid query result, expected matrix" )
121+ return nil , errors .New ("invalid query result, expected matrix" )
121122 }
122- metricsByNamespace := make (map [string ]map [string ][]model.SamplePair , 0 )
123+ metricsByNamespace := make (map [string ]map [string ][]model.SamplePair , max ( memMatrix . Len (), cpuMatrix . Len ()) )
123124 for _ , entry := range memMatrix {
124125 namespaceName := string (entry .Metric ["namespace" ])
125- metricsByNamespace [namespaceName ] = make (map [string ][]model.SamplePair , 0 )
126+ metricsByNamespace [namespaceName ] = make (map [string ][]model.SamplePair , 2 )
126127 metricsByNamespace [namespaceName ]["memory" ] = entry .Values
127128 metricsByNamespace [namespaceName ]["cpu" ] = []model.SamplePair {}
128129 }
@@ -132,7 +133,7 @@ func (c *CollectCPUMemUtilisation) Collect(progressChan chan<- interface{}) (Col
132133 if ok {
133134 metricsByNamespace [namespaceName ]["cpu" ] = entry .Values
134135 } else {
135- metricsByNamespace [namespaceName ] = make (map [string ][]model.SamplePair , 0 )
136+ metricsByNamespace [namespaceName ] = make (map [string ][]model.SamplePair , 2 )
136137 metricsByNamespace [namespaceName ]["cpu" ] = entry .Values
137138 metricsByNamespace [namespaceName ]["memory" ] = []model.SamplePair {}
138139 }
@@ -142,12 +143,12 @@ func (c *CollectCPUMemUtilisation) Collect(progressChan chan<- interface{}) (Col
142143 for namespaceName , readingsLists := range metricsByNamespace {
143144 jointReadings , err := joinReadings (readingsLists ["cpu" ], readingsLists ["memory" ])
144145 if err != nil {
145- return nil , errors .Wrap (err , "Failed to parse readings: " )
146+ return nil , errors .Wrap (err , "failed to parse readings" )
146147 }
147148
148149 payload , err := json .MarshalIndent (jointReadings , "" , " " )
149150 if err != nil {
150- return nil , errors .Wrap (err , "Error formatting readings: " )
151+ return nil , errors .Wrap (err , "could not format readings " )
151152 }
152153 path := []string {"utilisation" , namespaceName }
153154 output .SaveResult (c .BundlePath , filepath .Join (path ... ), bytes .NewBuffer (payload ))
@@ -189,8 +190,6 @@ func StartPortForwarding(
189190 return prometheusClient , stopCh , err
190191}
191192
192- type PortForwardFn = func (* rest.Config , string , string , int , output.Output ) (int , chan struct {}, error )
193-
194193// PortForward creates an asynchronous port-forward to a pod on the cluster in order to
195194// access it over the local network.
196195func PortForward (cfg * rest.Config , ns , pod string , port int , out output.Output ) (localPort int , stop chan struct {}, err error ) {
@@ -244,20 +243,16 @@ func getPortForwarder(dialer httpstream.Dialer,
244243}
245244
246245func runPortForwarder (pf * portforward.PortForwarder , ready chan struct {}, out output.Output ) error {
247- out .Info ("running the port-forwarder in the background" )
248246 errCh := make (chan error )
249- // start the portforwarder in the background and look for errors
247+ // Start the portforwarder in the background and look for errors.
250248 go func () {
251249 if err := pf .ForwardPorts (); err != nil {
252250 errCh <- err
253251 }
254252 close (errCh )
255253 }()
256- // wait for the port-forward to be established and check for errors
257- out .Info ("waiting for port-forward to be established" )
254+ // Wait for the port-forward to be established and check for errors.
258255 select {
259- case <- ready :
260- out .Info ("the port-forward has signaled ready" )
261256 case err := <- errCh :
262257 if err != nil {
263258 return fmt .Errorf ("port-forward failed: %w" , err )
@@ -286,35 +281,36 @@ func constructPortForwardURL(host, ns, pod string) (*url.URL, error) {
286281 }, nil
287282}
288283
284+ // joinReadings an sql-like outer join of two lists of readings on timestamps.
289285func joinReadings (cpuSamples , memSamples []model.SamplePair ) ([]UsageReading , error ) {
290286 jointReadingsList := make ([]UsageReading , 0 )
291287 i , j := 0 , 0
292288
293289 for i < len (cpuSamples ) && j < len (memSamples ) {
294290 if cpuSamples [i ].Timestamp .Before (memSamples [j ].Timestamp ) {
295- // Include item from CPU readings
291+ // Include item from CPU readings.
296292 value , err := strconv .ParseFloat (cpuSamples [i ].Value .String (), 64 )
297293 if err != nil {
298- return nil , errors .Wrap (err , "Failed to parse float: " )
294+ return nil , errors .Wrap (err , "failed to parse float" )
299295 }
300296 jointReadingsList = append (jointReadingsList , UsageReading {
301297 Timestamp : cpuSamples [i ].Timestamp .Time ().UTC (),
302298 CPU : value ,
303299 })
304300 i ++
305301 } else if cpuSamples [i ].Timestamp .After (memSamples [j ].Timestamp ) {
306- // Include item from memory readings
302+ // Include item from memory readings.
307303 value , err := strconv .ParseFloat (memSamples [j ].Value .String (), 64 )
308304 if err != nil {
309- return nil , errors .Wrap (err , "Failed to parse float: " )
305+ return nil , errors .Wrap (err , "failed to parse float" )
310306 }
311307 jointReadingsList = append (jointReadingsList , UsageReading {
312308 Timestamp : memSamples [j ].Timestamp .Time ().UTC (),
313309 Memory : value ,
314310 })
315311 j ++
316312 } else {
317- // Include items from both readings with matching timestamp
313+ // Include items from both readings with matching timestamp.
318314 cpu , err := strconv .ParseFloat (cpuSamples [i ].Value .String (), 64 )
319315 if err != nil {
320316 return nil , errors .Wrap (err , "Failed to parse float:" )
@@ -333,11 +329,11 @@ func joinReadings(cpuSamples, memSamples []model.SamplePair) ([]UsageReading, er
333329 }
334330 }
335331
336- // Include remaining items from cpu readings
332+ // Include remaining items from cpu readings, if any.
337333 for i < len (cpuSamples ) {
338334 cpu , err := strconv .ParseFloat (cpuSamples [i ].Value .String (), 64 )
339335 if err != nil {
340- return nil , errors .Wrap (err , "Failed to parse float: " )
336+ return nil , errors .Wrap (err , "failed to parse float" )
341337 }
342338 jointReadingsList = append (jointReadingsList , UsageReading {
343339 Timestamp : cpuSamples [i ].Timestamp .Time ().UTC (),
@@ -346,11 +342,11 @@ func joinReadings(cpuSamples, memSamples []model.SamplePair) ([]UsageReading, er
346342 i ++
347343 }
348344
349- // Include remaining items from memory readings
345+ // Include remaining items from memory readings, if any.
350346 for j < len (memSamples ) {
351347 mem , err := strconv .ParseFloat (memSamples [j ].Value .String (), 64 )
352348 if err != nil {
353- return nil , errors .Wrap (err , "Failed to parse float: " )
349+ return nil , errors .Wrap (err , "failed to parse float" )
354350 }
355351 jointReadingsList = append (jointReadingsList , UsageReading {
356352 Timestamp : memSamples [j ].Timestamp .Time ().UTC (),
@@ -361,3 +357,10 @@ func joinReadings(cpuSamples, memSamples []model.SamplePair) ([]UsageReading, er
361357
362358 return jointReadingsList , nil
363359}
360+
361+ func max (a , b int ) int {
362+ if a > b {
363+ return a
364+ }
365+ return b
366+ }
0 commit comments