Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implemented timeouts #4840

Merged
merged 2 commits into from
Oct 11, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 24 additions & 13 deletions plugins/inputs/vsphere/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"log"
"net/url"
"sync"
"time"

"github.com/vmware/govmomi"
"github.com/vmware/govmomi/performance"
Expand Down Expand Up @@ -33,6 +34,7 @@ type Client struct {
Root *view.ContainerView
Perf *performance.Manager
Valid bool
Timeout time.Duration
closeGate sync.Once
}

Expand All @@ -52,17 +54,21 @@ func (cf *ClientFactory) GetClient(ctx context.Context) (*Client, error) {
defer cf.mux.Unlock()
if cf.client == nil {
var err error
if cf.client, err = NewClient(cf.url, cf.parent); err != nil {
if cf.client, err = NewClient(ctx, cf.url, cf.parent); err != nil {
return nil, err
}
}

// Execute a dummy call against the server to make sure the client is
// still functional. If not, try to log back in. If that doesn't work,
// we give up.
if _, err := methods.GetCurrentTime(ctx, cf.client.Client); err != nil {
ctx1, cancel1 := context.WithTimeout(ctx, cf.parent.Timeout.Duration)
defer cancel1()
if _, err := methods.GetCurrentTime(ctx1, cf.client.Client); err != nil {
log.Printf("I! [input.vsphere]: Client session seems to have time out. Reauthenticating!")
if cf.client.Client.SessionManager.Login(ctx, url.UserPassword(cf.parent.Username, cf.parent.Password)) != nil {
ctx2, cancel2 := context.WithTimeout(ctx, cf.parent.Timeout.Duration)
defer cancel2()
if cf.client.Client.SessionManager.Login(ctx2, url.UserPassword(cf.parent.Username, cf.parent.Password)) != nil {
return nil, err
}
}
Expand All @@ -71,7 +77,7 @@ func (cf *ClientFactory) GetClient(ctx context.Context) (*Client, error) {
}

// NewClient creates a new vSphere client based on the url and setting passed as parameters.
func NewClient(u *url.URL, vs *VSphere) (*Client, error) {
func NewClient(ctx context.Context, u *url.URL, vs *VSphere) (*Client, error) {
sw := NewStopwatch("connect", u.Host)
tlsCfg, err := vs.ClientConfig.TLSConfig()
if err != nil {
Expand All @@ -84,7 +90,6 @@ func NewClient(u *url.URL, vs *VSphere) (*Client, error) {
if vs.Username != "" {
u.User = url.UserPassword(vs.Username, vs.Password)
}
ctx := context.Background()

log.Printf("D! [input.vsphere]: Creating client: %s", u.Host)
soapClient := soap.NewClient(u, tlsCfg.InsecureSkipVerify)
Expand All @@ -102,15 +107,19 @@ func NewClient(u *url.URL, vs *VSphere) (*Client, error) {
}
}

vimClient, err := vim25.NewClient(ctx, soapClient)
ctx1, cancel1 := context.WithTimeout(ctx, vs.Timeout.Duration)
defer cancel1()
vimClient, err := vim25.NewClient(ctx1, soapClient)
if err != nil {
return nil, err
}
sm := session.NewManager(vimClient)

// If TSLKey is specified, try to log in as an extension using a cert.
if vs.TLSKey != "" {
if err := sm.LoginExtensionByCertificate(ctx, vs.TLSKey); err != nil {
ctx2, cancel2 := context.WithTimeout(ctx, vs.Timeout.Duration)
defer cancel2()
if err := sm.LoginExtensionByCertificate(ctx2, vs.TLSKey); err != nil {
return nil, err
}
}
Expand Down Expand Up @@ -141,11 +150,12 @@ func NewClient(u *url.URL, vs *VSphere) (*Client, error) {
sw.Stop()

return &Client{
Client: c,
Views: m,
Root: v,
Perf: p,
Valid: true,
Client: c,
Views: m,
Root: v,
Perf: p,
Valid: true,
Timeout: vs.Timeout.Duration,
}, nil
}

Expand All @@ -163,7 +173,8 @@ func (c *Client) close() {
// Use a Once to prevent us from panics stemming from trying
// to close it multiple times.
c.closeGate.Do(func() {
ctx := context.Background()
ctx, cancel := context.WithTimeout(context.Background(), c.Timeout)
defer cancel()
if c.Client != nil {
if err := c.Client.Logout(ctx); err != nil {
log.Printf("E! [input.vsphere]: Error during logout: %s", err)
Expand Down
58 changes: 40 additions & 18 deletions plugins/inputs/vsphere/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type resourceKind struct {
objects objectMap
filters filter.Filter
collectInstances bool
getObjects func(context.Context, *view.ContainerView) (objectMap, error)
getObjects func(context.Context, *Endpoint, *view.ContainerView) (objectMap, error)
}

type metricEntry struct {
Expand Down Expand Up @@ -253,7 +253,9 @@ func (e *Endpoint) getMetricNameMap(ctx context.Context) (map[int32]string, erro
return nil, err
}

mn, err := client.Perf.CounterInfoByName(ctx)
ctx1, cancel1 := context.WithTimeout(ctx, e.Parent.Timeout.Duration)
defer cancel1()
mn, err := client.Perf.CounterInfoByName(ctx1)

if err != nil {
return nil, err
Expand All @@ -272,7 +274,9 @@ func (e *Endpoint) getMetadata(ctx context.Context, in interface{}) interface{}
}

rq := in.(*metricQRequest)
metrics, err := client.Perf.AvailableMetric(ctx, rq.obj.ref.Reference(), rq.res.sampling)
ctx1, cancel1 := context.WithTimeout(ctx, e.Parent.Timeout.Duration)
defer cancel1()
metrics, err := client.Perf.AvailableMetric(ctx1, rq.obj.ref.Reference(), rq.res.sampling)
if err != nil && err != context.Canceled {
log.Printf("E! [input.vsphere]: Error while getting metric metadata. Discovery will be incomplete. Error: %s", err)
}
Expand All @@ -292,7 +296,9 @@ func (e *Endpoint) getDatacenterName(ctx context.Context, client *Client, cache
path = append(path, here.Reference().String())
o := object.NewCommon(client.Client.Client, r)
var result mo.ManagedEntity
err := o.Properties(ctx, here, []string{"parent", "name"}, &result)
ctx1, cancel1 := context.WithTimeout(ctx, e.Parent.Timeout.Duration)
defer cancel1()
err := o.Properties(ctx1, here, []string{"parent", "name"}, &result)
if err != nil {
log.Printf("W! [input.vsphere]: Error while resolving parent. Assuming no parent exists. Error: %s", err)
break
Expand Down Expand Up @@ -344,7 +350,7 @@ func (e *Endpoint) discover(ctx context.Context) error {
log.Printf("D! [input.vsphere] Discovering resources for %s", res.name)
// Need to do this for all resource types even if they are not enabled (but datastore)
if res.enabled || (k != "datastore" && k != "vm") {
objects, err := res.getObjects(ctx, client.Root)
objects, err := res.getObjects(ctx, e, client.Root)
if err != nil {
return err
}
Expand Down Expand Up @@ -411,9 +417,11 @@ func (e *Endpoint) discover(ctx context.Context) error {
return nil
}

func getDatacenters(ctx context.Context, root *view.ContainerView) (objectMap, error) {
func getDatacenters(ctx context.Context, e *Endpoint, root *view.ContainerView) (objectMap, error) {
var resources []mo.Datacenter
err := root.Retrieve(ctx, []string{"Datacenter"}, []string{"name", "parent"}, &resources)
ctx1, cancel1 := context.WithTimeout(ctx, e.Parent.Timeout.Duration)
defer cancel1()
err := root.Retrieve(ctx1, []string{"Datacenter"}, []string{"name", "parent"}, &resources)
if err != nil {
return nil, err
}
Expand All @@ -425,9 +433,11 @@ func getDatacenters(ctx context.Context, root *view.ContainerView) (objectMap, e
return m, nil
}

func getClusters(ctx context.Context, root *view.ContainerView) (objectMap, error) {
func getClusters(ctx context.Context, e *Endpoint, root *view.ContainerView) (objectMap, error) {
var resources []mo.ClusterComputeResource
err := root.Retrieve(ctx, []string{"ClusterComputeResource"}, []string{"name", "parent"}, &resources)
ctx1, cancel1 := context.WithTimeout(ctx, e.Parent.Timeout.Duration)
defer cancel1()
err := root.Retrieve(ctx1, []string{"ClusterComputeResource"}, []string{"name", "parent"}, &resources)
if err != nil {
return nil, err
}
Expand All @@ -439,7 +449,9 @@ func getClusters(ctx context.Context, root *view.ContainerView) (objectMap, erro
if !ok {
o := object.NewFolder(root.Client(), *r.Parent)
var folder mo.Folder
err := o.Properties(ctx, *r.Parent, []string{"parent"}, &folder)
ctx2, cancel2 := context.WithTimeout(ctx, e.Parent.Timeout.Duration)
defer cancel2()
err := o.Properties(ctx2, *r.Parent, []string{"parent"}, &folder)
if err != nil {
log.Printf("W! [input.vsphere] Error while getting folder parent: %e", err)
p = nil
Expand All @@ -455,7 +467,7 @@ func getClusters(ctx context.Context, root *view.ContainerView) (objectMap, erro
return m, nil
}

func getHosts(ctx context.Context, root *view.ContainerView) (objectMap, error) {
func getHosts(ctx context.Context, e *Endpoint, root *view.ContainerView) (objectMap, error) {
var resources []mo.HostSystem
err := root.Retrieve(ctx, []string{"HostSystem"}, []string{"name", "parent"}, &resources)
if err != nil {
Expand All @@ -469,9 +481,11 @@ func getHosts(ctx context.Context, root *view.ContainerView) (objectMap, error)
return m, nil
}

func getVMs(ctx context.Context, root *view.ContainerView) (objectMap, error) {
func getVMs(ctx context.Context, e *Endpoint, root *view.ContainerView) (objectMap, error) {
var resources []mo.VirtualMachine
err := root.Retrieve(ctx, []string{"VirtualMachine"}, []string{"name", "runtime.host", "config.guestId", "config.uuid"}, &resources)
ctx1, cancel1 := context.WithTimeout(ctx, e.Parent.Timeout.Duration)
defer cancel1()
err := root.Retrieve(ctx1, []string{"VirtualMachine"}, []string{"name", "runtime.host", "config.guestId", "config.uuid"}, &resources)
if err != nil {
return nil, err
}
Expand All @@ -491,9 +505,11 @@ func getVMs(ctx context.Context, root *view.ContainerView) (objectMap, error) {
return m, nil
}

func getDatastores(ctx context.Context, root *view.ContainerView) (objectMap, error) {
func getDatastores(ctx context.Context, e *Endpoint, root *view.ContainerView) (objectMap, error) {
var resources []mo.Datastore
err := root.Retrieve(ctx, []string{"Datastore"}, []string{"name", "parent"}, &resources)
ctx1, cancel1 := context.WithTimeout(ctx, e.Parent.Timeout.Duration)
defer cancel1()
err := root.Retrieve(ctx1, []string{"Datastore"}, []string{"name", "parent"}, &resources)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -689,17 +705,23 @@ func (e *Endpoint) collectChunk(ctx context.Context, pqs []types.PerfQuerySpec,
return 0, err
}

metricInfo, err := client.Perf.CounterInfoByName(ctx)
ctx1, cancel1 := context.WithTimeout(ctx, e.Parent.Timeout.Duration)
defer cancel1()
metricInfo, err := client.Perf.CounterInfoByName(ctx1)
if err != nil {
return count, err
}

metrics, err := client.Perf.Query(ctx, pqs)
ctx2, cancel2 := context.WithTimeout(ctx, e.Parent.Timeout.Duration)
defer cancel2()
metrics, err := client.Perf.Query(ctx2, pqs)
if err != nil {
return count, err
}

ems, err := client.Perf.ToMetricSeries(ctx, metrics)
ctx3, cancel3 := context.WithTimeout(ctx, e.Parent.Timeout.Duration)
defer cancel3()
ems, err := client.Perf.ToMetricSeries(ctx3, metrics)
if err != nil {
return count, err
}
Expand Down
22 changes: 22 additions & 0 deletions plugins/inputs/vsphere/vsphere_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"regexp"
"sort"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -229,6 +230,27 @@ func TestWorkerPool(t *testing.T) {
}
}

func TestTimeout(t *testing.T) {
m, s, err := createSim()
if err != nil {
t.Fatal(err)
}
defer m.Remove()
defer s.Close()

var acc testutil.Accumulator
v := defaultVSphere()
v.Vcenters = []string{s.URL.String()}
v.Timeout = internal.Duration{Duration: 1 * time.Nanosecond}
require.NoError(t, v.Start(nil)) // We're not using the Accumulator, so it can be nil.
defer v.Stop()
require.NoError(t, v.Gather(&acc))

// The accumulator must contain exactly one error and it must be a deadline exceeded.
require.Equal(t, 1, len(acc.Errors))
require.True(t, strings.Contains(acc.Errors[0].Error(), "context deadline exceeded"))
}

func TestAll(t *testing.T) {
m, s, err := createSim()
if err != nil {
Expand Down