From 94de223916ffb732a46a277db29a87acd0f3d727 Mon Sep 17 00:00:00 2001 From: Greg <2653109+glinton@users.noreply.github.com> Date: Mon, 4 Feb 2019 13:28:43 -0700 Subject: [PATCH] Add kube_inventory input plugin (#5110) --- Gopkg.lock | 24 ++ Gopkg.toml | 4 + plugins/inputs/all/all.go | 1 + plugins/inputs/kube_inventory/README.md | 238 ++++++++++++++++++ plugins/inputs/kube_inventory/client.go | 97 +++++++ plugins/inputs/kube_inventory/client_test.go | 35 +++ plugins/inputs/kube_inventory/daemonset.go | 49 ++++ .../inputs/kube_inventory/daemonset_test.go | 123 +++++++++ plugins/inputs/kube_inventory/deployment.go | 40 +++ .../inputs/kube_inventory/deployment_test.go | 131 ++++++++++ plugins/inputs/kube_inventory/kube_state.go | 175 +++++++++++++ plugins/inputs/kube_inventory/node.go | 56 +++++ plugins/inputs/kube_inventory/node_test.go | 172 +++++++++++++ .../inputs/kube_inventory/persistentvolume.go | 52 ++++ .../kube_inventory/persistentvolume_test.go | 112 +++++++++ .../kube_inventory/persistentvolumeclaim.go | 49 ++++ .../persistentvolumeclaim_test.go | 115 +++++++++ plugins/inputs/kube_inventory/pod.go | 87 +++++++ plugins/inputs/kube_inventory/pod_test.go | 199 +++++++++++++++ plugins/inputs/kube_inventory/statefulset.go | 46 ++++ .../inputs/kube_inventory/statefulset_test.go | 123 +++++++++ 21 files changed, 1928 insertions(+) create mode 100644 plugins/inputs/kube_inventory/README.md create mode 100644 plugins/inputs/kube_inventory/client.go create mode 100644 plugins/inputs/kube_inventory/client_test.go create mode 100644 plugins/inputs/kube_inventory/daemonset.go create mode 100644 plugins/inputs/kube_inventory/daemonset_test.go create mode 100644 plugins/inputs/kube_inventory/deployment.go create mode 100644 plugins/inputs/kube_inventory/deployment_test.go create mode 100644 plugins/inputs/kube_inventory/kube_state.go create mode 100644 plugins/inputs/kube_inventory/node.go create mode 100644 plugins/inputs/kube_inventory/node_test.go create mode 100644 plugins/inputs/kube_inventory/persistentvolume.go create mode 100644 plugins/inputs/kube_inventory/persistentvolume_test.go create mode 100644 plugins/inputs/kube_inventory/persistentvolumeclaim.go create mode 100644 plugins/inputs/kube_inventory/persistentvolumeclaim_test.go create mode 100644 plugins/inputs/kube_inventory/pod.go create mode 100644 plugins/inputs/kube_inventory/pod_test.go create mode 100644 plugins/inputs/kube_inventory/statefulset.go create mode 100644 plugins/inputs/kube_inventory/statefulset_test.go diff --git a/Gopkg.lock b/Gopkg.lock index d07773be99e0f..93834b78a2ae8 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -387,8 +387,11 @@ packages = [ ".", "apis/apiextensions/v1beta1", + "apis/apps/v1beta1", + "apis/apps/v1beta2", "apis/core/v1", "apis/meta/v1", + "apis/policy/v1beta1", "apis/resource", "runtime", "runtime/schema", @@ -696,6 +699,14 @@ pruneopts = "" revision = "b84e30acd515aadc4b783ad4ff83aff3299bdfe0" +[[projects]] + branch = "master" + digest = "1:e7737c09200582508f4f67227c39e7c4667cc6067a6d2b2e679654e43e8a8cb4" + name = "github.com/kubernetes/apimachinery" + packages = ["pkg/api/resource"] + pruneopts = "" + revision = "d41becfba9ee9bf8e55cec1dd3934cd7cfc04b99" + [[projects]] branch = "develop" digest = "1:3e66a61a57bbbe832c338edb3a623be0deb3dec650c2f3515149658898287e37" @@ -1424,6 +1435,14 @@ revision = "7f5bdfd858bb064d80559b2a32b86669c5de5d3b" version = "v3.0.5" +[[projects]] + digest = "1:75fb3fcfc73a8c723efde7777b40e8e8ff9babf30d8c56160d01beffea8a95a6" + name = "gopkg.in/inf.v0" + packages = ["."] + pruneopts = "" + revision = "d2d2541c53f18d2a059457998ce2876cc8e67cbf" + version = "v0.9.1" + [[projects]] digest = "1:367baf06b7dbd0ef0bbdd785f6a79f929c96b0c18e9d3b29c0eed1ac3f5db133" name = "gopkg.in/ldap.v2" @@ -1511,8 +1530,12 @@ "github.com/docker/libnetwork/ipvs", "github.com/eclipse/paho.mqtt.golang", "github.com/ericchiang/k8s", + "github.com/ericchiang/k8s/apis/apps/v1beta1", + "github.com/ericchiang/k8s/apis/apps/v1beta2", "github.com/ericchiang/k8s/apis/core/v1", "github.com/ericchiang/k8s/apis/meta/v1", + "github.com/ericchiang/k8s/apis/resource", + "github.com/ericchiang/k8s/util/intstr", "github.com/go-logfmt/logfmt", "github.com/go-redis/redis", "github.com/go-sql-driver/mysql", @@ -1537,6 +1560,7 @@ "github.com/kardianos/service", "github.com/karrick/godirwalk", "github.com/kballard/go-shellquote", + "github.com/kubernetes/apimachinery/pkg/api/resource", "github.com/matttproud/golang_protobuf_extensions/pbutil", "github.com/miekg/dns", "github.com/multiplay/go-ts3", diff --git a/Gopkg.toml b/Gopkg.toml index 62fe864ac7fd2..51fc1fbb6399e 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -254,6 +254,10 @@ name = "github.com/karrick/godirwalk" version = "1.7.5" +[[constraint]] + branch = "master" + name = "github.com/kubernetes/apimachinery" + [[constraint]] name = "github.com/go-logfmt/logfmt" version = "0.4.0" diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index 0a69ac21dd602..2435e1519414e 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -63,6 +63,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/kernel" _ "github.com/influxdata/telegraf/plugins/inputs/kernel_vmstat" _ "github.com/influxdata/telegraf/plugins/inputs/kibana" + _ "github.com/influxdata/telegraf/plugins/inputs/kube_inventory" _ "github.com/influxdata/telegraf/plugins/inputs/kubernetes" _ "github.com/influxdata/telegraf/plugins/inputs/leofs" _ "github.com/influxdata/telegraf/plugins/inputs/linux_sysctl_fs" diff --git a/plugins/inputs/kube_inventory/README.md b/plugins/inputs/kube_inventory/README.md new file mode 100644 index 0000000000000..9a71ec4a6e892 --- /dev/null +++ b/plugins/inputs/kube_inventory/README.md @@ -0,0 +1,238 @@ +# Kube_Inventory Plugin +This plugin generates metrics derived from the state of the following Kubernetes resources: + - daemonsets + - deployments + - nodes + - persistentvolumes + - persistentvolumeclaims + - pods (containers) + - statefulsets + +#### Series Cardinality Warning + +This plugin may produce a high number of series which, when not controlled +for, will cause high load on your database. Use the following techniques to +avoid cardinality issues: + +- Use [metric filtering][] options to exclude unneeded measurements and tags. +- Write to a database with an appropriate [retention policy][]. +- Limit series cardinality in your database using the + [max-series-per-database][] and [max-values-per-tag][] settings. +- Consider using the [Time Series Index][tsi]. +- Monitor your databases [series cardinality][]. +- Consult the [InfluxDB documentation][influx-docs] for the most up-to-date techniques. + +### Configuration: + +```toml +[[inputs.kube_inventory]] + ## URL for the Kubernetes API + url = "https://127.0.0.1" + + ## Namespace to use + # namespace = "default" + + ## Use bearer token for authorization. ('bearer_token' takes priority) + # bearer_token = "/path/to/bearer/token" + ## OR + # bearer_token_string = "abc_123" + + ## Set response_timeout (default 5 seconds) + # response_timeout = "5s" + + ## Optional Resources to exclude from gathering + ## Leave them with blank with try to gather everything available. + ## Values can be - "daemonsets", deployments", "nodes", "persistentvolumes", + ## "persistentvolumeclaims", "pods", "statefulsets" + # resource_exclude = [ "deployments", "nodes", "statefulsets" ] + + ## Optional Resources to include when gathering + ## Overrides resource_exclude if both set. + # resource_include = [ "deployments", "nodes", "statefulsets" ] + + ## Optional TLS Config + # tls_ca = "/path/to/cafile" + # tls_cert = "/path/to/certfile" + # tls_key = "/path/to/keyfile" + ## Use TLS but skip chain & host verification + # insecure_skip_verify = false +``` + +#### Kubernetes Permissions + +If using [RBAC authorization](https://kubernetes.io/docs/reference/access-authn-authz/rbac/), you will need to create a cluster role to list "persistentvolumes" and "nodes". You will then need to make an [aggregated ClusterRole](https://kubernetes.io/docs/reference/access-authn-authz/rbac/#aggregated-clusterroles) that will eventually be bound to a user or group. +```yaml +--- +kind: ClusterRole +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: influx:cluster:viewer + labels: + rbac.authorization.k8s.io/aggregate-view-telegraf: "true" +rules: +- apiGroups: [""] + resources: ["persistentvolumes","nodes"] + verbs: ["get","list"] + +--- +kind: ClusterRole +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: influx:telegraf +aggregationRule: + clusterRoleSelectors: + - matchLabels: + rbac.authorization.k8s.io/aggregate-view-telegraf: "true" + rbac.authorization.k8s.io/aggregate-to-view: "true" +rules: [] # Rules are automatically filled in by the controller manager. +``` + +Bind the newly created aggregated ClusterRole with the following config file, updating the subjects as needed. +```yaml +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: influx:telegraf:viewer +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: influx:telegraf +subjects: +- kind: ServiceAccount + name: telegraf + namespace: default +``` + + +### Metrics: + ++ kubernetes_daemonset + - tags: + - daemonset_name + - namespace + - fields: + - generation + - current_number_scheduled + - desired_number_scheduled + - number_available + - number_misscheduled + - number_ready + - number_unavailable + - updated_number_scheduled + +- kubernetes_deployment + - tags: + - deployment_name + - namespace + - fields: + - replicas_available + - replicas_unavailable + - created + ++ kubernetes_node + - tags: + - node_name + - fields: + - capacity_cpu_cores + - capacity_memory_bytes + - capacity_pods + - allocatable_cpu_cores + - allocatable_memory_bytes + - allocatable_pods + +- kubernetes_persistentvolume + - tags: + - pv_name + - phase + - storageclass + - fields: + - phase_type (int, [see below](#pv-phase_type)) + ++ kubernetes_persistentvolumeclaim + - tags: + - pvc_name + - namespace + - phase + - storageclass + - fields: + - phase_type (int, [see below](#pvc-phase_type)) + +- kubernetes_pod_container + - tags: + - container_name + - namespace + - node_name + - pod_name + - fields: + - restarts_total + - state + - terminated_reason + - resource_requests_cpu_units + - resource_requests_memory_bytes + - resource_limits_cpu_units + - resource_limits_memory_bytes + ++ kubernetes_statefulset + - tags: + - statefulset_name + - namespace + - fields: + - created + - generation + - replicas + - replicas_current + - replicas_ready + - replicas_updated + - spec_replicas + - observed_generation + +#### pv `phase_type` + +The persistentvolume "phase" is saved in the `phase` tag with a correlated numeric field called `phase_type` corresponding with that tag value. + +|Tag value |Corresponding field value| +-----------|-------------------------| +|bound | 0 | +|failed | 1 | +|pending | 2 | +|released | 3 | +|available | 4 | +|unknown | 5 | + +#### pvc `phase_type` + +The persistentvolumeclaim "phase" is saved in the `phase` tag with a correlated numeric field called `phase_type` corresponding with that tag value. + +|Tag value |Corresponding field value| +-----------|-------------------------| +|bound | 0 | +|lost | 1 | +|pending | 2 | +|unknown | 3 | + + +### Example Output: + +``` +kubernetes_configmap,configmap_name=envoy-config,namespace=default,resource_version=56593031 created=1544103867000000000i 1547597616000000000 +kubernetes_daemonset +kubernetes_deployment,deployment_name=deployd,namespace=default replicas_unavailable=0i,created=1544103082000000000i,replicas_available=1i 1547597616000000000 +kubernetes_node,node_name=ip-172-17-0-2.internal allocatable_pods=110i,capacity_memory_bytes=128837533696,capacity_pods=110i,capacity_cpu_cores=16i,allocatable_cpu_cores=16i,allocatable_memory_bytes=128732676096 1547597616000000000 +kubernetes_persistentvolume,phase=Released,pv_name=pvc-aaaaaaaa-bbbb-cccc-1111-222222222222,storageclass=ebs-1-retain phase_type=3i 1547597616000000000 +kubernetes_persistentvolumeclaim,namespace=default,phase=Bound,pvc_name=data-etcd-0,storageclass=ebs-1-retain phase_type=0i 1547597615000000000 +kubernetes_pod,namespace=default,node_name=ip-172-17-0-2.internal,pod_name=tick1 last_transition_time=1547578322000000000i,ready="false" 1547597616000000000 +kubernetes_pod_container,container_name=telegraf,namespace=default,node_name=ip-172-17-0-2.internal,pod_name=tick1,state=running resource_requests_cpu_units=0.1,resource_limits_memory_bytes=524288000,resource_limits_cpu_units=0.5,restarts_total=0i,state_code=0i,terminated_reason="",resource_requests_memory_bytes=524288000 1547597616000000000 +kubernetes_statefulset,namespace=default,statefulset_name=etcd replicas_updated=3i,spec_replicas=3i,observed_generation=1i,created=1544101669000000000i,generation=1i,replicas=3i,replicas_current=3i,replicas_ready=3i 1547597616000000000 +``` + + +[metric filtering]: https://github.com/influxdata/telegraf/blob/master/docs/CONFIGURATION.md#metric-filtering +[retention policy]: https://docs.influxdata.com/influxdb/latest/guides/downsampling_and_retention/ +[max-series-per-database]: https://docs.influxdata.com/influxdb/latest/administration/config/#max-series-per-database-1000000 +[max-values-per-tag]: https://docs.influxdata.com/influxdb/latest/administration/config/#max-values-per-tag-100000 +[tsi]: https://docs.influxdata.com/influxdb/latest/concepts/time-series-index/ +[series cardinality]: https://docs.influxdata.com/influxdb/latest/query_language/spec/#show-cardinality +[influx-docs]: https://docs.influxdata.com/influxdb/latest/ +[k8s-telegraf]: https://www.influxdata.com/blog/monitoring-kubernetes-architecture/ +[tick-charts]: https://github.com/influxdata/tick-charts diff --git a/plugins/inputs/kube_inventory/client.go b/plugins/inputs/kube_inventory/client.go new file mode 100644 index 0000000000000..bf207b0ad46d6 --- /dev/null +++ b/plugins/inputs/kube_inventory/client.go @@ -0,0 +1,97 @@ +package kube_inventory + +import ( + "context" + "time" + + "github.com/ericchiang/k8s" + "github.com/ericchiang/k8s/apis/apps/v1beta1" + "github.com/ericchiang/k8s/apis/apps/v1beta2" + "github.com/ericchiang/k8s/apis/core/v1" + + "github.com/influxdata/telegraf/internal/tls" +) + +type client struct { + namespace string + timeout time.Duration + *k8s.Client +} + +func newClient(baseURL, namespace, bearerToken string, timeout time.Duration, tlsConfig tls.ClientConfig) (*client, error) { + c, err := k8s.NewClient(&k8s.Config{ + Clusters: []k8s.NamedCluster{{Name: "cluster", Cluster: k8s.Cluster{ + Server: baseURL, + InsecureSkipTLSVerify: tlsConfig.InsecureSkipVerify, + CertificateAuthority: tlsConfig.TLSCA, + }}}, + Contexts: []k8s.NamedContext{{Name: "context", Context: k8s.Context{ + Cluster: "cluster", + AuthInfo: "auth", + Namespace: namespace, + }}}, + AuthInfos: []k8s.NamedAuthInfo{{Name: "auth", AuthInfo: k8s.AuthInfo{ + Token: bearerToken, + ClientCertificate: tlsConfig.TLSCert, + ClientKey: tlsConfig.TLSKey, + }}}, + }) + if err != nil { + return nil, err + } + + return &client{ + Client: c, + timeout: timeout, + namespace: namespace, + }, nil +} + +func (c *client) getDaemonSets(ctx context.Context) (*v1beta2.DaemonSetList, error) { + list := new(v1beta2.DaemonSetList) + ctx, cancel := context.WithTimeout(ctx, c.timeout) + defer cancel() + return list, c.List(ctx, c.namespace, list) +} + +func (c *client) getDeployments(ctx context.Context) (*v1beta1.DeploymentList, error) { + list := &v1beta1.DeploymentList{} + ctx, cancel := context.WithTimeout(ctx, c.timeout) + defer cancel() + return list, c.List(ctx, c.namespace, list) +} + +func (c *client) getNodes(ctx context.Context) (*v1.NodeList, error) { + list := new(v1.NodeList) + ctx, cancel := context.WithTimeout(ctx, c.timeout) + defer cancel() + return list, c.List(ctx, "", list) +} + +func (c *client) getPersistentVolumes(ctx context.Context) (*v1.PersistentVolumeList, error) { + list := new(v1.PersistentVolumeList) + ctx, cancel := context.WithTimeout(ctx, c.timeout) + defer cancel() + return list, c.List(ctx, "", list) +} + +func (c *client) getPersistentVolumeClaims(ctx context.Context) (*v1.PersistentVolumeClaimList, error) { + list := new(v1.PersistentVolumeClaimList) + ctx, cancel := context.WithTimeout(ctx, c.timeout) + defer cancel() + return list, c.List(ctx, c.namespace, list) +} + +func (c *client) getPods(ctx context.Context) (*v1.PodList, error) { + list := new(v1.PodList) + ctx, cancel := context.WithTimeout(ctx, c.timeout) + defer cancel() + return list, c.List(ctx, c.namespace, list) +} + +func (c *client) getStatefulSets(ctx context.Context) (*v1beta1.StatefulSetList, error) { + list := new(v1beta1.StatefulSetList) + ctx, cancel := context.WithTimeout(ctx, c.timeout) + defer cancel() + return list, c.List(ctx, c.namespace, list) +} diff --git a/plugins/inputs/kube_inventory/client_test.go b/plugins/inputs/kube_inventory/client_test.go new file mode 100644 index 0000000000000..4f54755b02362 --- /dev/null +++ b/plugins/inputs/kube_inventory/client_test.go @@ -0,0 +1,35 @@ +package kube_inventory + +import ( + "testing" + "time" + + "github.com/influxdata/telegraf/internal/tls" +) + +type mockHandler struct { + responseMap map[string]interface{} +} + +func toStrPtr(s string) *string { + return &s +} + +func toInt32Ptr(i int32) *int32 { + return &i +} + +func toInt64Ptr(i int64) *int64 { + return &i +} + +func toBoolPtr(b bool) *bool { + return &b +} + +func TestNewClient(t *testing.T) { + _, err := newClient("https://127.0.0.1:443/", "default", "abc123", time.Second, tls.ClientConfig{}) + if err != nil { + t.Errorf("Failed to create new client - %s", err.Error()) + } +} diff --git a/plugins/inputs/kube_inventory/daemonset.go b/plugins/inputs/kube_inventory/daemonset.go new file mode 100644 index 0000000000000..92c7bc195763e --- /dev/null +++ b/plugins/inputs/kube_inventory/daemonset.go @@ -0,0 +1,49 @@ +package kube_inventory + +import ( + "context" + "time" + + "github.com/ericchiang/k8s/apis/apps/v1beta2" + + "github.com/influxdata/telegraf" +) + +func collectDaemonSets(ctx context.Context, acc telegraf.Accumulator, ki *KubernetesInventory) { + list, err := ki.client.getDaemonSets(ctx) + if err != nil { + acc.AddError(err) + return + } + for _, d := range list.Items { + if err = ki.gatherDaemonSet(*d, acc); err != nil { + acc.AddError(err) + return + } + } +} + +func (ki *KubernetesInventory) gatherDaemonSet(d v1beta2.DaemonSet, acc telegraf.Accumulator) error { + fields := map[string]interface{}{ + "generation": d.Metadata.GetGeneration(), + "current_number_scheduled": d.Status.GetCurrentNumberScheduled(), + "desired_number_scheduled": d.Status.GetDesiredNumberScheduled(), + "number_available": d.Status.GetNumberAvailable(), + "number_misscheduled": d.Status.GetNumberMisscheduled(), + "number_ready": d.Status.GetNumberReady(), + "number_unavailable": d.Status.GetNumberUnavailable(), + "updated_number_scheduled": d.Status.GetUpdatedNumberScheduled(), + } + tags := map[string]string{ + "daemonset_name": d.Metadata.GetName(), + "namespace": d.Metadata.GetNamespace(), + } + + if d.Metadata.CreationTimestamp.GetSeconds() != 0 { + fields["created"] = time.Unix(d.Metadata.CreationTimestamp.GetSeconds(), int64(d.Metadata.CreationTimestamp.GetNanos())).UnixNano() + } + + acc.AddFields(daemonSetMeasurement, fields, tags) + + return nil +} diff --git a/plugins/inputs/kube_inventory/daemonset_test.go b/plugins/inputs/kube_inventory/daemonset_test.go new file mode 100644 index 0000000000000..3f11df1ca108d --- /dev/null +++ b/plugins/inputs/kube_inventory/daemonset_test.go @@ -0,0 +1,123 @@ +package kube_inventory + +import ( + "testing" + "time" + + "github.com/ericchiang/k8s/apis/apps/v1beta2" + metav1 "github.com/ericchiang/k8s/apis/meta/v1" + + "github.com/influxdata/telegraf/testutil" +) + +func TestDaemonSet(t *testing.T) { + cli := &client{} + now := time.Now() + now = time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 1, 36, 0, now.Location()) + tests := []struct { + name string + handler *mockHandler + output *testutil.Accumulator + hasError bool + }{ + { + name: "no daemon set", + handler: &mockHandler{ + responseMap: map[string]interface{}{ + "/daemonsets/": &v1beta2.DaemonSetList{}, + }, + }, + hasError: false, + }, + { + name: "collect daemonsets", + handler: &mockHandler{ + responseMap: map[string]interface{}{ + "/daemonsets/": &v1beta2.DaemonSetList{ + Items: []*v1beta2.DaemonSet{ + { + Status: &v1beta2.DaemonSetStatus{ + CurrentNumberScheduled: toInt32Ptr(3), + DesiredNumberScheduled: toInt32Ptr(5), + NumberAvailable: toInt32Ptr(2), + NumberMisscheduled: toInt32Ptr(2), + NumberReady: toInt32Ptr(1), + NumberUnavailable: toInt32Ptr(1), + UpdatedNumberScheduled: toInt32Ptr(2), + }, + Metadata: &metav1.ObjectMeta{ + Generation: toInt64Ptr(11221), + Namespace: toStrPtr("ns1"), + Name: toStrPtr("daemon1"), + Labels: map[string]string{ + "lab1": "v1", + "lab2": "v2", + }, + CreationTimestamp: &metav1.Time{Seconds: toInt64Ptr(now.Unix())}, + }, + }, + }, + }, + }, + }, + output: &testutil.Accumulator{ + Metrics: []*testutil.Metric{ + { + Fields: map[string]interface{}{ + "generation": int64(11221), + "current_number_scheduled": int32(3), + "desired_number_scheduled": int32(5), + "number_available": int32(2), + "number_misscheduled": int32(2), + "number_ready": int32(1), + "number_unavailable": int32(1), + "updated_number_scheduled": int32(2), + "created": now.UnixNano(), + }, + Tags: map[string]string{ + "daemonset_name": "daemon1", + "namespace": "ns1", + }, + }, + }, + }, + hasError: false, + }, + } + + for _, v := range tests { + ks := &KubernetesInventory{ + client: cli, + } + acc := new(testutil.Accumulator) + for _, dset := range ((v.handler.responseMap["/daemonsets/"]).(*v1beta2.DaemonSetList)).Items { + err := ks.gatherDaemonSet(*dset, acc) + if err != nil { + t.Errorf("Failed to gather daemonset - %s", err.Error()) + } + } + + err := acc.FirstError() + if err == nil && v.hasError { + t.Fatalf("%s failed, should have error", v.name) + } else if err != nil && !v.hasError { + t.Fatalf("%s failed, err: %v", v.name, err) + } + if v.output == nil && len(acc.Metrics) > 0 { + t.Fatalf("%s: collected extra data", v.name) + } else if v.output != nil && len(v.output.Metrics) > 0 { + for i := range v.output.Metrics { + for k, m := range v.output.Metrics[i].Tags { + if acc.Metrics[i].Tags[k] != m { + t.Fatalf("%s: tag %s metrics unmatch Expected %s, got %s\n", v.name, k, m, acc.Metrics[i].Tags[k]) + } + } + for k, m := range v.output.Metrics[i].Fields { + if acc.Metrics[i].Fields[k] != m { + t.Fatalf("%s: field %s metrics unmatch Expected %v(%T), got %v(%T)\n", v.name, k, m, m, acc.Metrics[i].Fields[k], acc.Metrics[i].Fields[k]) + } + } + } + } + } +} diff --git a/plugins/inputs/kube_inventory/deployment.go b/plugins/inputs/kube_inventory/deployment.go new file mode 100644 index 0000000000000..2d72e8d03a4f0 --- /dev/null +++ b/plugins/inputs/kube_inventory/deployment.go @@ -0,0 +1,40 @@ +package kube_inventory + +import ( + "context" + "time" + + "github.com/ericchiang/k8s/apis/apps/v1beta1" + + "github.com/influxdata/telegraf" +) + +func collectDeployments(ctx context.Context, acc telegraf.Accumulator, ki *KubernetesInventory) { + list, err := ki.client.getDeployments(ctx) + if err != nil { + acc.AddError(err) + return + } + for _, d := range list.Items { + if err = ki.gatherDeployment(*d, acc); err != nil { + acc.AddError(err) + return + } + } +} + +func (ki *KubernetesInventory) gatherDeployment(d v1beta1.Deployment, acc telegraf.Accumulator) error { + fields := map[string]interface{}{ + "replicas_available": d.Status.GetAvailableReplicas(), + "replicas_unavailable": d.Status.GetUnavailableReplicas(), + "created": time.Unix(d.Metadata.CreationTimestamp.GetSeconds(), int64(d.Metadata.CreationTimestamp.GetNanos())).UnixNano(), + } + tags := map[string]string{ + "deployment_name": d.Metadata.GetName(), + "namespace": d.Metadata.GetNamespace(), + } + + acc.AddFields(deploymentMeasurement, fields, tags) + + return nil +} diff --git a/plugins/inputs/kube_inventory/deployment_test.go b/plugins/inputs/kube_inventory/deployment_test.go new file mode 100644 index 0000000000000..0429b84fa1d87 --- /dev/null +++ b/plugins/inputs/kube_inventory/deployment_test.go @@ -0,0 +1,131 @@ +package kube_inventory + +import ( + "testing" + "time" + + "github.com/ericchiang/k8s/apis/apps/v1beta1" + metav1 "github.com/ericchiang/k8s/apis/meta/v1" + "github.com/ericchiang/k8s/util/intstr" + "github.com/influxdata/telegraf/testutil" +) + +func TestDeployment(t *testing.T) { + cli := &client{} + + now := time.Now() + now = time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 1, 36, 0, now.Location()) + outputMetric := &testutil.Metric{ + Fields: map[string]interface{}{ + "replicas_available": int32(1), + "replicas_unavailable": int32(4), + "created": now.UnixNano(), + }, + Tags: map[string]string{ + "namespace": "ns1", + "deployment_name": "deploy1", + }, + } + + tests := []struct { + name string + handler *mockHandler + output *testutil.Accumulator + hasError bool + }{ + { + name: "no deployments", + handler: &mockHandler{ + responseMap: map[string]interface{}{ + "/deployments/": &v1beta1.DeploymentList{}, + }, + }, + hasError: false, + }, + { + name: "collect deployments", + handler: &mockHandler{ + responseMap: map[string]interface{}{ + "/deployments/": &v1beta1.DeploymentList{ + Items: []*v1beta1.Deployment{ + { + Status: &v1beta1.DeploymentStatus{ + Replicas: toInt32Ptr(3), + AvailableReplicas: toInt32Ptr(1), + UnavailableReplicas: toInt32Ptr(4), + UpdatedReplicas: toInt32Ptr(2), + ObservedGeneration: toInt64Ptr(9121), + }, + Spec: &v1beta1.DeploymentSpec{ + Strategy: &v1beta1.DeploymentStrategy{ + RollingUpdate: &v1beta1.RollingUpdateDeployment{ + MaxUnavailable: &intstr.IntOrString{ + IntVal: toInt32Ptr(30), + }, + MaxSurge: &intstr.IntOrString{ + IntVal: toInt32Ptr(20), + }, + }, + }, + Replicas: toInt32Ptr(4), + }, + Metadata: &metav1.ObjectMeta{ + Generation: toInt64Ptr(11221), + Namespace: toStrPtr("ns1"), + Name: toStrPtr("deploy1"), + Labels: map[string]string{ + "lab1": "v1", + "lab2": "v2", + }, + CreationTimestamp: &metav1.Time{Seconds: toInt64Ptr(now.Unix())}, + }, + }, + }, + }, + }, + }, + output: &testutil.Accumulator{ + Metrics: []*testutil.Metric{ + outputMetric, + }, + }, + hasError: false, + }, + } + + for _, v := range tests { + ks := &KubernetesInventory{ + client: cli, + } + acc := new(testutil.Accumulator) + for _, deployment := range ((v.handler.responseMap["/deployments/"]).(*v1beta1.DeploymentList)).Items { + err := ks.gatherDeployment(*deployment, acc) + if err != nil { + t.Errorf("Failed to gather deployment - %s", err.Error()) + } + } + + err := acc.FirstError() + if err == nil && v.hasError { + t.Fatalf("%s failed, should have error", v.name) + } else if err != nil && !v.hasError { + t.Fatalf("%s failed, err: %v", v.name, err) + } + if v.output == nil && len(acc.Metrics) > 0 { + t.Fatalf("%s: collected extra data", v.name) + } else if v.output != nil && len(v.output.Metrics) > 0 { + for i := range v.output.Metrics { + for k, m := range v.output.Metrics[i].Tags { + if acc.Metrics[i].Tags[k] != m { + t.Fatalf("%s: tag %s metrics unmatch Expected %s, got '%v'\n", v.name, k, m, acc.Metrics[i].Tags[k]) + } + } + for k, m := range v.output.Metrics[i].Fields { + if acc.Metrics[i].Fields[k] != m { + t.Fatalf("%s: field %s metrics unmatch Expected %v(%T), got %v(%T)\n", v.name, k, m, m, acc.Metrics[i].Fields[k], acc.Metrics[i].Fields[k]) + } + } + } + } + } +} diff --git a/plugins/inputs/kube_inventory/kube_state.go b/plugins/inputs/kube_inventory/kube_state.go new file mode 100644 index 0000000000000..705d0f65e6361 --- /dev/null +++ b/plugins/inputs/kube_inventory/kube_state.go @@ -0,0 +1,175 @@ +package kube_inventory + +import ( + "context" + "fmt" + "io/ioutil" + "log" + "strconv" + "strings" + "sync" + "time" + + "github.com/kubernetes/apimachinery/pkg/api/resource" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/filter" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/internal/tls" + "github.com/influxdata/telegraf/plugins/inputs" +) + +// KubernetesInventory represents the config object for the plugin. +type KubernetesInventory struct { + URL string `toml:"url"` + BearerToken string `toml:"bearer_token"` + BearerTokenString string `toml:"bearer_token_string"` + Namespace string `toml:"namespace"` + ResponseTimeout internal.Duration `toml:"response_timeout"` // Timeout specified as a string - 3s, 1m, 1h + ResourceExclude []string `toml:"resource_exclude"` + ResourceInclude []string `toml:"resource_include"` + MaxConfigMapAge internal.Duration `toml:"max_config_map_age"` + + tls.ClientConfig + client *client +} + +var sampleConfig = ` + ## URL for the Kubernetes API + url = "https://127.0.0.1" + + ## Namespace to use + # namespace = "default" + + ## Use bearer token for authorization. ('bearer_token' takes priority) + # bearer_token = "/path/to/bearer/token" + ## OR + # bearer_token_string = "abc_123" + + ## Set response_timeout (default 5 seconds) + # response_timeout = "5s" + + ## Optional Resources to exclude from gathering + ## Leave them with blank with try to gather everything available. + ## Values can be - "daemonsets", deployments", "nodes", "persistentvolumes", + ## "persistentvolumeclaims", "pods", "statefulsets" + # resource_exclude = [ "deployments", "nodes", "statefulsets" ] + + ## Optional Resources to include when gathering + ## Overrides resource_exclude if both set. + # resource_include = [ "deployments", "nodes", "statefulsets" ] + + ## Optional TLS Config + # tls_ca = "/path/to/cafile" + # tls_cert = "/path/to/certfile" + # tls_key = "/path/to/keyfile" + ## Use TLS but skip chain & host verification + # insecure_skip_verify = false +` + +// SampleConfig returns a sample config +func (ki *KubernetesInventory) SampleConfig() string { + return sampleConfig +} + +// Description returns the description of this plugin +func (ki *KubernetesInventory) Description() string { + return "Read metrics from the Kubernetes api" +} + +// Gather collects kubernetes metrics from a given URL. +func (ki *KubernetesInventory) Gather(acc telegraf.Accumulator) (err error) { + if ki.client == nil { + if ki.client, err = ki.initClient(); err != nil { + return err + } + } + + resourceFilter, err := filter.NewIncludeExcludeFilter(ki.ResourceInclude, ki.ResourceExclude) + if err != nil { + return err + } + + wg := sync.WaitGroup{} + ctx := context.Background() + + for collector, f := range availableCollectors { + if resourceFilter.Match(collector) { + wg.Add(1) + go func(f func(ctx context.Context, acc telegraf.Accumulator, k *KubernetesInventory)) { + defer wg.Done() + f(ctx, acc, ki) + }(f) + } + } + + wg.Wait() + + return nil +} + +var availableCollectors = map[string]func(ctx context.Context, acc telegraf.Accumulator, ki *KubernetesInventory){ + "daemonsets": collectDaemonSets, + "deployments": collectDeployments, + "nodes": collectNodes, + "persistentvolumes": collectPersistentVolumes, + "persistentvolumeclaims": collectPersistentVolumeClaims, + "pods": collectPods, + "statefulsets": collectStatefulSets, +} + +func (ki *KubernetesInventory) initClient() (*client, error) { + if ki.BearerToken != "" { + token, err := ioutil.ReadFile(ki.BearerToken) + if err != nil { + return nil, err + } + ki.BearerTokenString = strings.TrimSpace(string(token)) + } + + return newClient(ki.URL, ki.Namespace, ki.BearerTokenString, ki.ResponseTimeout.Duration, ki.ClientConfig) +} + +func atoi(s string) int64 { + i, err := strconv.ParseInt(s, 10, 64) + if err != nil { + return 0 + } + return int64(i) +} + +func convertQuantity(s string, m float64) int64 { + q, err := resource.ParseQuantity(s) + if err != nil { + log.Printf("E! Failed to parse quantity - %v", err) + return 0 + } + f, err := strconv.ParseFloat(fmt.Sprint(q.AsDec()), 64) + if err != nil { + log.Printf("E! Failed to parse float - %v", err) + return 0 + } + if m < 1 { + m = 1 + } + return int64(f * m) +} + +var ( + daemonSetMeasurement = "kubernetes_daemonset" + deploymentMeasurement = "kubernetes_deployment" + nodeMeasurement = "kubernetes_node" + persistentVolumeMeasurement = "kubernetes_persistentvolume" + persistentVolumeClaimMeasurement = "kubernetes_persistentvolumeclaim" + podContainerMeasurement = "kubernetes_pod_container" + statefulSetMeasurement = "kubernetes_statefulset" +) + +func init() { + inputs.Add("kube_inventory", func() telegraf.Input { + return &KubernetesInventory{ + ResponseTimeout: internal.Duration{Duration: time.Second * 5}, + Namespace: "default", + } + }) +} diff --git a/plugins/inputs/kube_inventory/node.go b/plugins/inputs/kube_inventory/node.go new file mode 100644 index 0000000000000..cccf6897f8aa3 --- /dev/null +++ b/plugins/inputs/kube_inventory/node.go @@ -0,0 +1,56 @@ +package kube_inventory + +import ( + "context" + + "github.com/ericchiang/k8s/apis/core/v1" + + "github.com/influxdata/telegraf" +) + +func collectNodes(ctx context.Context, acc telegraf.Accumulator, ki *KubernetesInventory) { + list, err := ki.client.getNodes(ctx) + if err != nil { + acc.AddError(err) + return + } + for _, n := range list.Items { + if err = ki.gatherNode(*n, acc); err != nil { + acc.AddError(err) + return + } + } +} + +func (ki *KubernetesInventory) gatherNode(n v1.Node, acc telegraf.Accumulator) error { + fields := map[string]interface{}{} + tags := map[string]string{ + "node_name": *n.Metadata.Name, + } + + for resourceName, val := range n.Status.Capacity { + switch resourceName { + case "cpu": + fields["capacity_cpu_cores"] = atoi(val.GetString_()) + case "memory": + fields["capacity_memory_bytes"] = convertQuantity(val.GetString_(), 1) + case "pods": + fields["capacity_pods"] = atoi(val.GetString_()) + } + } + + for resourceName, val := range n.Status.Allocatable { + switch resourceName { + case "cpu": + fields["allocatable_cpu_cores"] = atoi(val.GetString_()) + case "memory": + fields["allocatable_memory_bytes"] = convertQuantity(val.GetString_(), 1) + case "pods": + fields["allocatable_pods"] = atoi(val.GetString_()) + } + } + + acc.AddFields(nodeMeasurement, fields, tags) + + return nil +} diff --git a/plugins/inputs/kube_inventory/node_test.go b/plugins/inputs/kube_inventory/node_test.go new file mode 100644 index 0000000000000..7573dd2c06f6d --- /dev/null +++ b/plugins/inputs/kube_inventory/node_test.go @@ -0,0 +1,172 @@ +package kube_inventory + +import ( + "testing" + "time" + + "github.com/ericchiang/k8s/apis/core/v1" + metav1 "github.com/ericchiang/k8s/apis/meta/v1" + "github.com/ericchiang/k8s/apis/resource" + + "github.com/influxdata/telegraf/testutil" +) + +func TestNode(t *testing.T) { + cli := &client{} + now := time.Now() + created := time.Date(now.Year(), now.Month(), now.Day(), now.Hour()-2, 1, 36, 0, now.Location()) + + tests := []struct { + name string + handler *mockHandler + output *testutil.Accumulator + hasError bool + }{ + { + name: "no nodes", + handler: &mockHandler{ + responseMap: map[string]interface{}{ + "/nodes/": &v1.NodeList{}, + }, + }, + hasError: false, + }, + { + name: "collect nodes", + handler: &mockHandler{ + responseMap: map[string]interface{}{ + "/nodes/": &v1.NodeList{ + Items: []*v1.Node{ + { + Status: &v1.NodeStatus{ + NodeInfo: &v1.NodeSystemInfo{ + KernelVersion: toStrPtr("4.14.48-coreos-r2"), + OsImage: toStrPtr("Container Linux by CoreOS 1745.7.0 (Rhyolite)"), + ContainerRuntimeVersion: toStrPtr("docker://18.3.1"), + KubeletVersion: toStrPtr("v1.10.3"), + KubeProxyVersion: toStrPtr("v1.10.3"), + }, + Phase: toStrPtr("Running"), + Capacity: map[string]*resource.Quantity{ + "cpu": {String_: toStrPtr("16")}, + "ephemeral_storage_bytes": {String_: toStrPtr("49536401408")}, + "hugepages_1Gi_bytes": {String_: toStrPtr("0")}, + "hugepages_2Mi_bytes": {String_: toStrPtr("0")}, + "memory": {String_: toStrPtr("125817904Ki")}, + "pods": {String_: toStrPtr("110")}, + }, + Allocatable: map[string]*resource.Quantity{ + "cpu": {String_: toStrPtr("16")}, + "ephemeral_storage_bytes": {String_: toStrPtr("44582761194")}, + "hugepages_1Gi_bytes": {String_: toStrPtr("0")}, + "hugepages_2Mi_bytes": {String_: toStrPtr("0")}, + "memory": {String_: toStrPtr("125715504Ki")}, + "pods": {String_: toStrPtr("110")}, + }, + Conditions: []*v1.NodeCondition{ + {Type: toStrPtr("Ready"), Status: toStrPtr("true"), LastTransitionTime: &metav1.Time{Seconds: toInt64Ptr(now.Unix())}}, + {Type: toStrPtr("OutOfDisk"), Status: toStrPtr("false"), LastTransitionTime: &metav1.Time{Seconds: toInt64Ptr(created.Unix())}}, + }, + }, + Spec: &v1.NodeSpec{ + ProviderID: toStrPtr("aws:///us-east-1c/i-0c00"), + Taints: []*v1.Taint{ + { + Key: toStrPtr("k1"), + Value: toStrPtr("v1"), + Effect: toStrPtr("NoExecute"), + }, + { + Key: toStrPtr("k2"), + Value: toStrPtr("v2"), + Effect: toStrPtr("NoSchedule"), + }, + }, + }, + Metadata: &metav1.ObjectMeta{ + Generation: toInt64Ptr(int64(11232)), + Namespace: toStrPtr("ns1"), + Name: toStrPtr("node1"), + Labels: map[string]string{ + "lab1": "v1", + "lab2": "v2", + }, + CreationTimestamp: &metav1.Time{Seconds: toInt64Ptr(created.Unix())}, + }, + }, + }, + }, + }, + }, + output: &testutil.Accumulator{ + Metrics: []*testutil.Metric{ + { + Measurement: nodeMeasurement, + Fields: map[string]interface{}{ + "capacity_cpu_cores": int64(16), + "capacity_memory_bytes": int64(1.28837533696e+11), + "capacity_pods": int64(110), + "allocatable_cpu_cores": int64(16), + "allocatable_memory_bytes": int64(1.28732676096e+11), + "allocatable_pods": int64(110), + }, + Tags: map[string]string{ + "node_name": "node1", + }, + }, + }, + }, + hasError: false, + }, + } + + for _, v := range tests { + ks := &KubernetesInventory{ + client: cli, + } + acc := new(testutil.Accumulator) + for _, node := range ((v.handler.responseMap["/nodes/"]).(*v1.NodeList)).Items { + err := ks.gatherNode(*node, acc) + if err != nil { + t.Errorf("Failed to gather node - %s", err.Error()) + } + } + + err := acc.FirstError() + if err == nil && v.hasError { + t.Fatalf("%s failed, should have error", v.name) + } else if err != nil && !v.hasError { + t.Fatalf("%s failed, err: %v", v.name, err) + } + if v.output == nil && len(acc.Metrics) > 0 { + t.Fatalf("%s: collected extra data", v.name) + } else if v.output != nil && len(v.output.Metrics) > 0 { + for i := range v.output.Metrics { + measurement := v.output.Metrics[i].Measurement + var keyTag string + switch measurement { + case nodeMeasurement: + keyTag = "node" + } + var j int + for j = range acc.Metrics { + if acc.Metrics[j].Measurement == measurement && + acc.Metrics[j].Tags[keyTag] == v.output.Metrics[i].Tags[keyTag] { + break + } + } + + for k, m := range v.output.Metrics[i].Tags { + if acc.Metrics[j].Tags[k] != m { + t.Fatalf("%s: tag %s metrics unmatch Expected %s, got %s, measurement %s, j %d\n", v.name, k, m, acc.Metrics[j].Tags[k], measurement, j) + } + } + for k, m := range v.output.Metrics[i].Fields { + if acc.Metrics[j].Fields[k] != m { + t.Fatalf("%s: field %s metrics unmatch Expected %v(%T), got %v(%T), measurement %s, j %d\n", v.name, k, m, m, acc.Metrics[j].Fields[k], acc.Metrics[i].Fields[k], measurement, j) + } + } + } + } + } +} diff --git a/plugins/inputs/kube_inventory/persistentvolume.go b/plugins/inputs/kube_inventory/persistentvolume.go new file mode 100644 index 0000000000000..05600522b7ea8 --- /dev/null +++ b/plugins/inputs/kube_inventory/persistentvolume.go @@ -0,0 +1,52 @@ +package kube_inventory + +import ( + "context" + "strings" + + "github.com/ericchiang/k8s/apis/core/v1" + + "github.com/influxdata/telegraf" +) + +func collectPersistentVolumes(ctx context.Context, acc telegraf.Accumulator, ki *KubernetesInventory) { + list, err := ki.client.getPersistentVolumes(ctx) + if err != nil { + acc.AddError(err) + return + } + for _, pv := range list.Items { + if err = ki.gatherPersistentVolume(*pv, acc); err != nil { + acc.AddError(err) + return + } + } +} + +func (ki *KubernetesInventory) gatherPersistentVolume(pv v1.PersistentVolume, acc telegraf.Accumulator) error { + phaseType := 5 + switch strings.ToLower(pv.Status.GetPhase()) { + case "bound": + phaseType = 0 + case "failed": + phaseType = 1 + case "pending": + phaseType = 2 + case "released": + phaseType = 3 + case "available": + phaseType = 4 + } + fields := map[string]interface{}{ + "phase_type": phaseType, + } + tags := map[string]string{ + "pv_name": pv.Metadata.GetName(), + "phase": pv.Status.GetPhase(), + "storageclass": pv.Spec.GetStorageClassName(), + } + + acc.AddFields(persistentVolumeMeasurement, fields, tags) + + return nil +} diff --git a/plugins/inputs/kube_inventory/persistentvolume_test.go b/plugins/inputs/kube_inventory/persistentvolume_test.go new file mode 100644 index 0000000000000..a5d20d047331a --- /dev/null +++ b/plugins/inputs/kube_inventory/persistentvolume_test.go @@ -0,0 +1,112 @@ +package kube_inventory + +import ( + "testing" + "time" + + "github.com/ericchiang/k8s/apis/core/v1" + metav1 "github.com/ericchiang/k8s/apis/meta/v1" + + "github.com/influxdata/telegraf/testutil" +) + +func TestPersistentVolume(t *testing.T) { + cli := &client{} + now := time.Now() + now = time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 1, 36, 0, now.Location()) + + tests := []struct { + name string + handler *mockHandler + output *testutil.Accumulator + hasError bool + }{ + { + name: "no pv", + handler: &mockHandler{ + responseMap: map[string]interface{}{ + "/persistentvolumes/": &v1.PersistentVolumeList{}, + }, + }, + hasError: false, + }, + { + name: "collect pvs", + handler: &mockHandler{ + responseMap: map[string]interface{}{ + "/persistentvolumes/": &v1.PersistentVolumeList{ + Items: []*v1.PersistentVolume{ + { + Status: &v1.PersistentVolumeStatus{ + Phase: toStrPtr("pending"), + }, + Spec: &v1.PersistentVolumeSpec{ + StorageClassName: toStrPtr("ebs-1"), + }, + Metadata: &metav1.ObjectMeta{ + Name: toStrPtr("pv1"), + Labels: map[string]string{ + "lab1": "v1", + "lab2": "v2", + }, + CreationTimestamp: &metav1.Time{Seconds: toInt64Ptr(now.Unix())}, + }, + }, + }, + }, + }, + }, + output: &testutil.Accumulator{ + Metrics: []*testutil.Metric{ + { + Fields: map[string]interface{}{ + "phase_type": 2, + }, + Tags: map[string]string{ + "pv_name": "pv1", + "storageclass": "ebs-1", + "phase": "pending", + }, + }, + }, + }, + hasError: false, + }, + } + + for _, v := range tests { + ks := &KubernetesInventory{ + client: cli, + } + acc := new(testutil.Accumulator) + for _, pv := range ((v.handler.responseMap["/persistentvolumes/"]).(*v1.PersistentVolumeList)).Items { + err := ks.gatherPersistentVolume(*pv, acc) + if err != nil { + t.Errorf("Failed to gather pv - %s", err.Error()) + } + } + + err := acc.FirstError() + if err == nil && v.hasError { + t.Fatalf("%s failed, should have error", v.name) + } else if err != nil && !v.hasError { + t.Fatalf("%s failed, err: %v", v.name, err) + } + if v.output == nil && len(acc.Metrics) > 0 { + t.Fatalf("%s: collected extra data", v.name) + } else if v.output != nil && len(v.output.Metrics) > 0 { + for i := range v.output.Metrics { + for k, m := range v.output.Metrics[i].Tags { + if acc.Metrics[i].Tags[k] != m { + t.Fatalf("%s: tag %s metrics unmatch Expected %s, got %s\n", v.name, k, m, acc.Metrics[i].Tags[k]) + } + } + for k, m := range v.output.Metrics[i].Fields { + if acc.Metrics[i].Fields[k] != m { + t.Fatalf("%s: field %s metrics unmatch Expected %v(%T), got %v(%T)\n", v.name, k, m, m, acc.Metrics[i].Fields[k], acc.Metrics[i].Fields[k]) + } + } + } + } + } +} diff --git a/plugins/inputs/kube_inventory/persistentvolumeclaim.go b/plugins/inputs/kube_inventory/persistentvolumeclaim.go new file mode 100644 index 0000000000000..0663462ae992d --- /dev/null +++ b/plugins/inputs/kube_inventory/persistentvolumeclaim.go @@ -0,0 +1,49 @@ +package kube_inventory + +import ( + "context" + "strings" + + "github.com/ericchiang/k8s/apis/core/v1" + + "github.com/influxdata/telegraf" +) + +func collectPersistentVolumeClaims(ctx context.Context, acc telegraf.Accumulator, ki *KubernetesInventory) { + list, err := ki.client.getPersistentVolumeClaims(ctx) + if err != nil { + acc.AddError(err) + return + } + for _, pvc := range list.Items { + if err = ki.gatherPersistentVolumeClaim(*pvc, acc); err != nil { + acc.AddError(err) + return + } + } +} + +func (ki *KubernetesInventory) gatherPersistentVolumeClaim(pvc v1.PersistentVolumeClaim, acc telegraf.Accumulator) error { + phaseType := 3 + switch strings.ToLower(pvc.Status.GetPhase()) { + case "bound": + phaseType = 0 + case "lost": + phaseType = 1 + case "pending": + phaseType = 2 + } + fields := map[string]interface{}{ + "phase_type": phaseType, + } + tags := map[string]string{ + "pvc_name": pvc.Metadata.GetName(), + "namespace": pvc.Metadata.GetNamespace(), + "phase": pvc.Status.GetPhase(), + "storageclass": pvc.Spec.GetStorageClassName(), + } + + acc.AddFields(persistentVolumeClaimMeasurement, fields, tags) + + return nil +} diff --git a/plugins/inputs/kube_inventory/persistentvolumeclaim_test.go b/plugins/inputs/kube_inventory/persistentvolumeclaim_test.go new file mode 100644 index 0000000000000..8a50c0f2eb914 --- /dev/null +++ b/plugins/inputs/kube_inventory/persistentvolumeclaim_test.go @@ -0,0 +1,115 @@ +package kube_inventory + +import ( + "testing" + "time" + + "github.com/ericchiang/k8s/apis/core/v1" + metav1 "github.com/ericchiang/k8s/apis/meta/v1" + + "github.com/influxdata/telegraf/testutil" +) + +func TestPersistentVolumeClaim(t *testing.T) { + cli := &client{} + now := time.Now() + now = time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 1, 36, 0, now.Location()) + + tests := []struct { + name string + handler *mockHandler + output *testutil.Accumulator + hasError bool + }{ + { + name: "no pv claims", + handler: &mockHandler{ + responseMap: map[string]interface{}{ + "/persistentvolumeclaims/": &v1.PersistentVolumeClaimList{}, + }, + }, + hasError: false, + }, + { + name: "collect pv claims", + handler: &mockHandler{ + responseMap: map[string]interface{}{ + "/persistentvolumeclaims/": &v1.PersistentVolumeClaimList{ + Items: []*v1.PersistentVolumeClaim{ + { + Status: &v1.PersistentVolumeClaimStatus{ + Phase: toStrPtr("bound"), + }, + Spec: &v1.PersistentVolumeClaimSpec{ + VolumeName: toStrPtr("pvc-dc870fd6-1e08-11e8-b226-02aa4bc06eb8"), + StorageClassName: toStrPtr("ebs-1"), + }, + Metadata: &metav1.ObjectMeta{ + Namespace: toStrPtr("ns1"), + Name: toStrPtr("pc1"), + Labels: map[string]string{ + "lab1": "v1", + "lab2": "v2", + }, + CreationTimestamp: &metav1.Time{Seconds: toInt64Ptr(now.Unix())}, + }, + }, + }, + }, + }, + }, + output: &testutil.Accumulator{ + Metrics: []*testutil.Metric{ + { + Fields: map[string]interface{}{ + "phase_type": 0, + }, + Tags: map[string]string{ + "pvc_name": "pc1", + "namespace": "ns1", + "storageclass": "ebs-1", + "phase": "bound", + }, + }, + }, + }, + hasError: false, + }, + } + + for _, v := range tests { + ks := &KubernetesInventory{ + client: cli, + } + acc := new(testutil.Accumulator) + for _, pvc := range ((v.handler.responseMap["/persistentvolumeclaims/"]).(*v1.PersistentVolumeClaimList)).Items { + err := ks.gatherPersistentVolumeClaim(*pvc, acc) + if err != nil { + t.Errorf("Failed to gather pvc - %s", err.Error()) + } + } + + err := acc.FirstError() + if err == nil && v.hasError { + t.Fatalf("%s failed, should have error", v.name) + } else if err != nil && !v.hasError { + t.Fatalf("%s failed, err: %v", v.name, err) + } + if v.output == nil && len(acc.Metrics) > 0 { + t.Fatalf("%s: collected extra data", v.name) + } else if v.output != nil && len(v.output.Metrics) > 0 { + for i := range v.output.Metrics { + for k, m := range v.output.Metrics[i].Tags { + if acc.Metrics[i].Tags[k] != m { + t.Fatalf("%s: tag %s metrics unmatch Expected %s, got %s\n", v.name, k, m, acc.Metrics[i].Tags[k]) + } + } + for k, m := range v.output.Metrics[i].Fields { + if acc.Metrics[i].Fields[k] != m { + t.Fatalf("%s: field %s metrics unmatch Expected %v(%T), got %v(%T)\n", v.name, k, m, m, acc.Metrics[i].Fields[k], acc.Metrics[i].Fields[k]) + } + } + } + } + } +} diff --git a/plugins/inputs/kube_inventory/pod.go b/plugins/inputs/kube_inventory/pod.go new file mode 100644 index 0000000000000..7b5207616d412 --- /dev/null +++ b/plugins/inputs/kube_inventory/pod.go @@ -0,0 +1,87 @@ +package kube_inventory + +import ( + "context" + + "github.com/ericchiang/k8s/apis/core/v1" + + "github.com/influxdata/telegraf" +) + +func collectPods(ctx context.Context, acc telegraf.Accumulator, ki *KubernetesInventory) { + list, err := ki.client.getPods(ctx) + if err != nil { + acc.AddError(err) + return + } + for _, p := range list.Items { + if err = ki.gatherPod(*p, acc); err != nil { + acc.AddError(err) + return + } + } +} + +func (ki *KubernetesInventory) gatherPod(p v1.Pod, acc telegraf.Accumulator) error { + if p.Metadata.CreationTimestamp.GetSeconds() == 0 && p.Metadata.CreationTimestamp.GetNanos() == 0 { + return nil + } + + for i, cs := range p.Status.ContainerStatuses { + c := p.Spec.Containers[i] + gatherPodContainer(*p.Spec.NodeName, p, *cs, *c, acc) + } + + return nil +} + +func gatherPodContainer(nodeName string, p v1.Pod, cs v1.ContainerStatus, c v1.Container, acc telegraf.Accumulator) { + stateCode := 3 + state := "unknown" + switch { + case cs.State.Running != nil: + stateCode = 0 + state = "running" + case cs.State.Terminated != nil: + stateCode = 1 + state = "terminated" + case cs.State.Waiting != nil: + stateCode = 2 + state = "waiting" + } + + fields := map[string]interface{}{ + "restarts_total": cs.GetRestartCount(), + "state_code": stateCode, + "terminated_reason": cs.State.Terminated.GetReason(), + } + tags := map[string]string{ + "container_name": *c.Name, + "namespace": *p.Metadata.Namespace, + "node_name": *p.Spec.NodeName, + "pod_name": *p.Metadata.Name, + "state": state, + } + + req := c.Resources.Requests + lim := c.Resources.Limits + + for resourceName, val := range req { + switch resourceName { + case "cpu": + fields["resource_requests_millicpu_units"] = convertQuantity(val.GetString_(), 1000) + case "memory": + fields["resource_requests_memory_bytes"] = convertQuantity(val.GetString_(), 1) + } + } + for resourceName, val := range lim { + switch resourceName { + case "cpu": + fields["resource_limits_millicpu_units"] = convertQuantity(val.GetString_(), 1000) + case "memory": + fields["resource_limits_memory_bytes"] = convertQuantity(val.GetString_(), 1) + } + } + + acc.AddFields(podContainerMeasurement, fields, tags) +} diff --git a/plugins/inputs/kube_inventory/pod_test.go b/plugins/inputs/kube_inventory/pod_test.go new file mode 100644 index 0000000000000..50b093880dbf1 --- /dev/null +++ b/plugins/inputs/kube_inventory/pod_test.go @@ -0,0 +1,199 @@ +package kube_inventory + +import ( + "testing" + "time" + + "github.com/ericchiang/k8s/apis/core/v1" + metav1 "github.com/ericchiang/k8s/apis/meta/v1" + "github.com/ericchiang/k8s/apis/resource" + "github.com/influxdata/telegraf/testutil" +) + +func TestPod(t *testing.T) { + cli := &client{} + now := time.Now() + started := time.Date(now.Year(), now.Month(), now.Day(), now.Hour()-1, 1, 36, 0, now.Location()) + created := time.Date(now.Year(), now.Month(), now.Day(), now.Hour()-2, 1, 36, 0, now.Location()) + cond1 := time.Date(now.Year(), 7, 5, 7, 53, 29, 0, now.Location()) + cond2 := time.Date(now.Year(), 7, 5, 7, 53, 31, 0, now.Location()) + + tests := []struct { + name string + handler *mockHandler + output *testutil.Accumulator + hasError bool + }{ + { + name: "no pods", + handler: &mockHandler{ + responseMap: map[string]interface{}{ + "/pods/": &v1.PodList{}, + }, + }, + hasError: false, + }, + { + name: "collect pods", + handler: &mockHandler{ + responseMap: map[string]interface{}{ + "/pods/": &v1.PodList{ + Items: []*v1.Pod{ + { + Spec: &v1.PodSpec{ + NodeName: toStrPtr("node1"), + Containers: []*v1.Container{ + { + Name: toStrPtr("forwarder"), + Image: toStrPtr("image1"), + Ports: []*v1.ContainerPort{ + { + ContainerPort: toInt32Ptr(8080), + Protocol: toStrPtr("TCP"), + }, + }, + Resources: &v1.ResourceRequirements{ + Limits: map[string]*resource.Quantity{ + "cpu": {String_: toStrPtr("100m")}, + }, + Requests: map[string]*resource.Quantity{ + "cpu": {String_: toStrPtr("100m")}, + }, + }, + }, + }, + Volumes: []*v1.Volume{ + { + Name: toStrPtr("vol1"), + VolumeSource: &v1.VolumeSource{ + PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ + ClaimName: toStrPtr("pc1"), + ReadOnly: toBoolPtr(true), + }, + }, + }, + { + Name: toStrPtr("vol2"), + }, + }, + }, + Status: &v1.PodStatus{ + Phase: toStrPtr("Running"), + HostIP: toStrPtr("180.12.10.18"), + PodIP: toStrPtr("10.244.2.15"), + StartTime: &metav1.Time{Seconds: toInt64Ptr(started.Unix())}, + Conditions: []*v1.PodCondition{ + { + Type: toStrPtr("Initialized"), + Status: toStrPtr("True"), + LastTransitionTime: &metav1.Time{Seconds: toInt64Ptr(cond1.Unix())}, + }, + { + Type: toStrPtr("Ready"), + Status: toStrPtr("True"), + LastTransitionTime: &metav1.Time{Seconds: toInt64Ptr(cond2.Unix())}, + }, + { + Type: toStrPtr("Scheduled"), + Status: toStrPtr("True"), + LastTransitionTime: &metav1.Time{Seconds: toInt64Ptr(cond1.Unix())}, + }, + }, + ContainerStatuses: []*v1.ContainerStatus{ + { + Name: toStrPtr("forwarder"), + State: &v1.ContainerState{ + Running: &v1.ContainerStateRunning{ + StartedAt: &metav1.Time{Seconds: toInt64Ptr(cond2.Unix())}, + }, + }, + Ready: toBoolPtr(true), + RestartCount: toInt32Ptr(3), + Image: toStrPtr("image1"), + ImageID: toStrPtr("image_id1"), + ContainerID: toStrPtr("docker://54abe32d0094479d3d"), + }, + }, + }, + Metadata: &metav1.ObjectMeta{ + OwnerReferences: []*metav1.OwnerReference{ + { + ApiVersion: toStrPtr("apps/v1"), + Kind: toStrPtr("DaemonSet"), + Name: toStrPtr("forwarder"), + Controller: toBoolPtr(true), + }, + }, + Generation: toInt64Ptr(11232), + Namespace: toStrPtr("ns1"), + Name: toStrPtr("pod1"), + Labels: map[string]string{ + "lab1": "v1", + "lab2": "v2", + }, + CreationTimestamp: &metav1.Time{Seconds: toInt64Ptr(created.Unix())}, + }, + }, + }, + }, + }, + }, + output: &testutil.Accumulator{ + Metrics: []*testutil.Metric{ + { + Measurement: podContainerMeasurement, + Fields: map[string]interface{}{ + "restarts_total": int32(3), + "state_code": 0, + "resource_requests_millicpu_units": int64(100), + "resource_limits_millicpu_units": int64(100), + }, + Tags: map[string]string{ + "namespace": "ns1", + "container_name": "forwarder", + "node_name": "node1", + "pod_name": "pod1", + "state": "running", + }, + }, + }, + }, + hasError: false, + }, + } + for _, v := range tests { + ks := &KubernetesInventory{ + client: cli, + } + acc := new(testutil.Accumulator) + for _, pod := range ((v.handler.responseMap["/pods/"]).(*v1.PodList)).Items { + err := ks.gatherPod(*pod, acc) + if err != nil { + t.Errorf("Failed to gather pod - %s", err.Error()) + } + } + + err := acc.FirstError() + if err == nil && v.hasError { + t.Fatalf("%s failed, should have error", v.name) + } else if err != nil && !v.hasError { + t.Fatalf("%s failed, err: %v", v.name, err) + } + if v.output == nil && len(acc.Metrics) > 0 { + t.Fatalf("%s: collected extra data", v.name) + } else if v.output != nil && len(v.output.Metrics) > 0 { + for i := range v.output.Metrics { + for k, m := range v.output.Metrics[i].Tags { + if acc.Metrics[i].Tags[k] != m { + t.Fatalf("%s: tag %s metrics unmatch Expected %s, got %s, i %d\n", v.name, k, m, acc.Metrics[i].Tags[k], i) + } + } + for k, m := range v.output.Metrics[i].Fields { + if acc.Metrics[i].Fields[k] != m { + t.Fatalf("%s: field %s metrics unmatch Expected %v(%T), got %v(%T), i %d\n", v.name, k, m, m, acc.Metrics[i].Fields[k], acc.Metrics[i].Fields[k], i) + } + } + } + } + } +} diff --git a/plugins/inputs/kube_inventory/statefulset.go b/plugins/inputs/kube_inventory/statefulset.go new file mode 100644 index 0000000000000..407aaac2fce08 --- /dev/null +++ b/plugins/inputs/kube_inventory/statefulset.go @@ -0,0 +1,46 @@ +package kube_inventory + +import ( + "context" + "time" + + "github.com/ericchiang/k8s/apis/apps/v1beta1" + + "github.com/influxdata/telegraf" +) + +func collectStatefulSets(ctx context.Context, acc telegraf.Accumulator, ki *KubernetesInventory) { + list, err := ki.client.getStatefulSets(ctx) + if err != nil { + acc.AddError(err) + return + } + for _, s := range list.Items { + if err = ki.gatherStatefulSet(*s, acc); err != nil { + acc.AddError(err) + return + } + } +} + +func (ki *KubernetesInventory) gatherStatefulSet(s v1beta1.StatefulSet, acc telegraf.Accumulator) error { + status := s.Status + fields := map[string]interface{}{ + "created": time.Unix(s.Metadata.CreationTimestamp.GetSeconds(), int64(s.Metadata.CreationTimestamp.GetNanos())).UnixNano(), + "generation": *s.Metadata.Generation, + "replicas": *status.Replicas, + "replicas_current": *status.CurrentReplicas, + "replicas_ready": *status.ReadyReplicas, + "replicas_updated": *status.UpdatedReplicas, + "spec_replicas": *s.Spec.Replicas, + "observed_generation": *s.Status.ObservedGeneration, + } + tags := map[string]string{ + "statefulset_name": *s.Metadata.Name, + "namespace": *s.Metadata.Namespace, + } + + acc.AddFields(statefulSetMeasurement, fields, tags) + + return nil +} diff --git a/plugins/inputs/kube_inventory/statefulset_test.go b/plugins/inputs/kube_inventory/statefulset_test.go new file mode 100644 index 0000000000000..6e94ad150ce0f --- /dev/null +++ b/plugins/inputs/kube_inventory/statefulset_test.go @@ -0,0 +1,123 @@ +package kube_inventory + +import ( + "testing" + "time" + + "github.com/ericchiang/k8s/apis/apps/v1beta1" + metav1 "github.com/ericchiang/k8s/apis/meta/v1" + + "github.com/influxdata/telegraf/testutil" +) + +func TestStatefulSet(t *testing.T) { + cli := &client{} + now := time.Now() + now = time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 1, 36, 0, now.Location()) + tests := []struct { + name string + handler *mockHandler + output *testutil.Accumulator + hasError bool + }{ + { + name: "no statefulsets", + handler: &mockHandler{ + responseMap: map[string]interface{}{ + "/statefulsets/": &v1beta1.StatefulSetList{}, + }, + }, + hasError: false, + }, + { + name: "collect statefulsets", + handler: &mockHandler{ + responseMap: map[string]interface{}{ + "/statefulsets/": &v1beta1.StatefulSetList{ + Items: []*v1beta1.StatefulSet{ + { + Status: &v1beta1.StatefulSetStatus{ + Replicas: toInt32Ptr(2), + CurrentReplicas: toInt32Ptr(4), + ReadyReplicas: toInt32Ptr(1), + UpdatedReplicas: toInt32Ptr(3), + ObservedGeneration: toInt64Ptr(119), + }, + Spec: &v1beta1.StatefulSetSpec{ + Replicas: toInt32Ptr(3), + }, + Metadata: &metav1.ObjectMeta{ + Generation: toInt64Ptr(332), + Namespace: toStrPtr("ns1"), + Name: toStrPtr("sts1"), + Labels: map[string]string{ + "lab1": "v1", + "lab2": "v2", + }, + CreationTimestamp: &metav1.Time{Seconds: toInt64Ptr(now.Unix())}, + }, + }, + }, + }, + }, + }, + output: &testutil.Accumulator{ + Metrics: []*testutil.Metric{ + { + Fields: map[string]interface{}{ + "generation": int64(332), + "observed_generation": int64(119), + "created": now.UnixNano(), + "spec_replicas": int32(3), + "replicas": int32(2), + "replicas_current": int32(4), + "replicas_ready": int32(1), + "replicas_updated": int32(3), + }, + Tags: map[string]string{ + "namespace": "ns1", + "statefulset_name": "sts1", + }, + }, + }, + }, + hasError: false, + }, + } + + for _, v := range tests { + ks := &KubernetesInventory{ + client: cli, + } + acc := new(testutil.Accumulator) + for _, ss := range ((v.handler.responseMap["/statefulsets/"]).(*v1beta1.StatefulSetList)).Items { + err := ks.gatherStatefulSet(*ss, acc) + if err != nil { + t.Errorf("Failed to gather ss - %s", err.Error()) + } + } + + err := acc.FirstError() + if err == nil && v.hasError { + t.Fatalf("%s failed, should have error", v.name) + } else if err != nil && !v.hasError { + t.Fatalf("%s failed, err: %v", v.name, err) + } + if v.output == nil && len(acc.Metrics) > 0 { + t.Fatalf("%s: collected extra data", v.name) + } else if v.output != nil && len(v.output.Metrics) > 0 { + for i := range v.output.Metrics { + for k, m := range v.output.Metrics[i].Tags { + if acc.Metrics[i].Tags[k] != m { + t.Fatalf("%s: tag %s metrics unmatch Expected %s, got %s\n", v.name, k, m, acc.Metrics[i].Tags[k]) + } + } + for k, m := range v.output.Metrics[i].Fields { + if acc.Metrics[i].Fields[k] != m { + t.Fatalf("%s: field %s metrics unmatch Expected %v(%T), got %v(%T)\n", v.name, k, m, m, acc.Metrics[i].Fields[k], acc.Metrics[i].Fields[k]) + } + } + } + } + } +}