Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add scraping for Prometheus endpoint in Kubernetes #4920

Merged
merged 16 commits into from
Nov 5, 2018
Prev Previous commit
Next Next commit
Pass context down
  • Loading branch information
glinton committed Nov 2, 2018
commit c5f952fb2ff6db8b52578dcb7ed36ebfe8989d74
14 changes: 6 additions & 8 deletions plugins/inputs/prometheus/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func loadClient(kubeconfigPath string) (*k8s.Client, error) {
return k8s.NewClient(&config)
}

func start(p *Prometheus) error {
func (p *Prometheus) start(ctx context.Context) error {
client, err := k8s.NewInClusterClient()
if err != nil {
u, err := user.Current()
Expand All @@ -55,7 +55,6 @@ func start(p *Prometheus) error {
}
}

p.ctx, p.cancel = context.WithCancel(context.Background())
p.wg = sync.WaitGroup{}
in := make(chan payload)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove, unused.


Expand All @@ -64,24 +63,23 @@ func start(p *Prometheus) error {
defer p.wg.Done()
for {
select {
case <-p.ctx.Done():
case <-ctx.Done():
break
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to use a return or we won't exit the for loop

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we remove the err checking logic on the p.watch as well, since it doesn't effectively do anything for us?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would probably move the logging from line 84 up to this function, and just return err in the watch function. I see that there is both an error and debug level message in watch, but since they both terminate the watch function I think they should both be error level.

case <-time.After(time.Second):
err := watch(p, client, in)
err := p.watch(ctx, client, in)
if err == nil {
break
}
}
}
log.Printf("D! [inputs.prometheus] shutting down")
}()

return nil
}

func watch(p *Prometheus, client *k8s.Client, in chan payload) error {
func (p *Prometheus) watch(ctx context.Context, client *k8s.Client, in chan payload) error {
pod := &corev1.Pod{}
watcher, err := client.Watch(p.ctx, "", &corev1.Pod{})
watcher, err := client.Watch(ctx, "", &corev1.Pod{})
if err != nil {
log.Printf("E! [inputs.prometheus] unable to watch resources: %v", err)
return err
Expand All @@ -90,7 +88,7 @@ func watch(p *Prometheus, client *k8s.Client, in chan payload) error {

for {
select {
case <-p.ctx.Done():
case <-ctx.Done():
return nil
default:
pod = &corev1.Pod{}
Expand Down
5 changes: 3 additions & 2 deletions plugins/inputs/prometheus/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ type Prometheus struct {
MonitorPods bool `toml:"monitor_kubernetes_pods"`
lock sync.Mutex
kubernetesPods []URLAndAddress
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
}
Expand Down Expand Up @@ -264,7 +263,9 @@ func (p *Prometheus) gatherURL(u URLAndAddress, acc telegraf.Accumulator) error
// Start will start the Kubernetes scraping if enabled in the configuration
func (p *Prometheus) Start(a telegraf.Accumulator) error {
if p.MonitorPods {
return start(p)
var ctx context.Context
ctx, p.cancel = context.WithCancel(context.Background())
return p.start(ctx)
}
return nil
}
Expand Down