Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add scraping for Prometheus endpoint in Kubernetes #4920

Merged
merged 16 commits into from
Nov 5, 2018
Merged
69 changes: 41 additions & 28 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 23 additions & 0 deletions plugins/inputs/prometheus/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,17 @@ in Prometheus format.
## An array of Kubernetes services to scrape metrics from.
# kubernetes_services = ["http://my-service-dns.my-namespace:9100/metrics"]

## Kubernetes config file to create client from.
# kube_config = "/path/to/kubernetes.config"

## Scrape Kubernetes pods for the following prometheus annotations:
## - prometheus.io/scrape: Enable scraping for this pod
## - prometheus.io/scheme: If the metrics endpoint is secured then you will need to
## set this to `https` & most likely set the tls config.
## - prometheus.io/path: If the metrics path is not /metrics, define it with this annotation.
## - prometheus.io/port: If port is not 9102 use this annotation
# monitor_kubernetes_pods = true

## Use bearer token for authorization
# bearer_token = /path/to/bearer/token

Expand All @@ -37,6 +48,18 @@ by looking up all A records assigned to the hostname as described in
This method can be used to locate all
[Kubernetes headless services](https://kubernetes.io/docs/concepts/services-networking/service/#headless-services).

#### Kubernetes scraping

Enabling this option will allow the plugin to scrape for prometheus annotation on Kubernetes
glinton marked this conversation as resolved.
Show resolved Hide resolved
pods. Currently, you can run this plugin in your kubernetes cluster, or we use the kubeconfig
file to determine where to monitor.
Currently the following annotation are supported:

* `prometheus.io/scrape` Enable scraping for this pod.
* `prometheus.io/scheme` If the metrics endpoint is secured then you will need to set this to `https` & most likely set the tls config. (default 'http')
* `prometheus.io/path` Override the path for the metrics endpoint on the service. (default '/metrics')
* `prometheus.io/port` Used to override the port. (default 9102)

#### Bearer Token

If set, the file specified by the `bearer_token` parameter will be read on
Expand Down
201 changes: 201 additions & 0 deletions plugins/inputs/prometheus/kubernetes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
package prometheus

import (
"context"
"fmt"
"io/ioutil"
"log"
"net"
"net/url"
"os/user"
"path/filepath"
"sync"
"time"

"github.com/ericchiang/k8s"
corev1 "github.com/ericchiang/k8s/apis/core/v1"
"gopkg.in/yaml.v2"
)

type payload struct {
eventype string
pod *corev1.Pod
}

// loadClient parses a kubeconfig from a file and returns a Kubernetes
// client. It does not support extensions or client auth providers.
func loadClient(kubeconfigPath string) (*k8s.Client, error) {
data, err := ioutil.ReadFile(kubeconfigPath)
if err != nil {
return nil, fmt.Errorf("failed reading '%s': %v", kubeconfigPath, err)
}

// Unmarshal YAML into a Kubernetes config object.
var config k8s.Config
if err := yaml.Unmarshal(data, &config); err != nil {
return nil, err
}
return k8s.NewClient(&config)
}

func (p *Prometheus) start(ctx context.Context) error {
client, err := k8s.NewInClusterClient()
if err != nil {
u, err := user.Current()
if err != nil {
return fmt.Errorf("Failed to get current user - %v", err)
}
configLocation := filepath.Join(u.HomeDir, ".kube/config")
if p.KubeConfig != "" {
configLocation = p.KubeConfig
}
client, err = loadClient(configLocation)
if err != nil {
return err
}
}

p.wg = sync.WaitGroup{}
in := make(chan payload)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove, unused.


p.wg.Add(1)
go func() {
defer p.wg.Done()
for {
select {
case <-ctx.Done():
break
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to use a return or we won't exit the for loop

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we remove the err checking logic on the p.watch as well, since it doesn't effectively do anything for us?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would probably move the logging from line 84 up to this function, and just return err in the watch function. I see that there is both an error and debug level message in watch, but since they both terminate the watch function I think they should both be error level.

case <-time.After(time.Second):
err := p.watch(ctx, client, in)
if err == nil {
break
}
}
}
}()

return nil
}

func (p *Prometheus) watch(ctx context.Context, client *k8s.Client, in chan payload) error {
pod := &corev1.Pod{}
watcher, err := client.Watch(ctx, "", &corev1.Pod{})
if err != nil {
log.Printf("E! [inputs.prometheus] unable to watch resources: %v", err)
return err
}
defer watcher.Close()

for {
select {
case <-ctx.Done():
return nil
default:
pod = &corev1.Pod{}
// An error here means we need to reconnect the watcher.
eventType, err := watcher.Next(pod)
if err != nil {
log.Printf("D! [inputs.prometheus] unable to watch next: %v", err)
return err
}

switch eventType {
case k8s.EventAdded:
registerPod(pod, p)
case k8s.EventDeleted:
unregisterPod(pod, p)
case k8s.EventModified:
}
}
}
}

func registerPod(pod *corev1.Pod, p *Prometheus) {
targetURL := getScrapeURL(pod)
if targetURL == nil {
return
}

log.Printf("D! [inputs.prometheus] will scrape metrics from %s", *targetURL)
// add annotation as metrics tags
tags := pod.GetMetadata().GetAnnotations()
tags["pod_name"] = pod.GetMetadata().GetName()
tags["namespace"] = pod.GetMetadata().GetNamespace()
// add labels as metrics tags
for k, v := range pod.GetMetadata().GetLabels() {
tags[k] = v
}
URL, err := url.Parse(*targetURL)
if err != nil {
log.Printf("E! [inputs.prometheus] could not parse URL %s: %v", *targetURL, err)
return
}
podURL := p.AddressToURL(URL, URL.Hostname())
p.lock.Lock()
p.kubernetesPods = append(p.kubernetesPods,
URLAndAddress{
URL: podURL,
Address: URL.Hostname(),
OriginalURL: URL,
Tags: tags})
p.lock.Unlock()
}

func getScrapeURL(pod *corev1.Pod) *string {
scrape := pod.GetMetadata().GetAnnotations()["prometheus.io/scrape"]
if scrape != "true" {
return nil
}
ip := pod.Status.GetPodIP()
if ip == "" {
// return as if scrape was disabled, we will be notified again once the pod
// has an IP
return nil
}

scheme := pod.GetMetadata().GetAnnotations()["prometheus.io/scheme"]
path := pod.GetMetadata().GetAnnotations()["prometheus.io/path"]
port := pod.GetMetadata().GetAnnotations()["prometheus.io/port"]

if scheme == "" {
scheme = "http"
}
if port == "" {
port = "9102"
}
if path == "" {
path = "/metrics"
}

u := &url.URL{
Scheme: scheme,
Host: net.JoinHostPort(ip, port),
Path: path,
}

x := u.String()

return &x
}

func unregisterPod(pod *corev1.Pod, p *Prometheus) {
url := getScrapeURL(pod)
if url == nil {
return
}

p.lock.Lock()
defer p.lock.Unlock()
log.Printf("D! [inputs.prometheus] registered a delete request for %s in namespace %s",
pod.GetMetadata().GetName(), pod.GetMetadata().GetNamespace())
var result []URLAndAddress
for _, v := range p.kubernetesPods {
if v.URL.String() != *url {
result = append(result, v)
} else {
log.Printf("D! [inputs.prometheus] will stop scraping for %s", *url)
}

}
p.kubernetesPods = result
}
Loading