Skip to content

Commit

Permalink
Updated EKS Resource Detector to use Go Client for Kubernetes (#813)
Browse files Browse the repository at this point in the history
* Updated EKS Resource Detector to use Go Client for Kubernetes

* Updated function to avoid duplicate invocation

* Use high level k8s client to retrieve configmap

* Created changelog entry

Co-authored-by: Anthony Mirabella <a9@aneurysm9.com>
  • Loading branch information
Rahul Varma and Aneurysm9 authored Jun 14, 2021
1 parent 969f43c commit f1ff9d3
Show file tree
Hide file tree
Showing 5 changed files with 479 additions and 105 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
### Changed

- Supported minimum version of Go bumped from 1.14 to 1.15. (#787)
- EKS Resource Detector now use the Kubernetes Go client to obtain the ConfigMap. (#813)

### Removed

Expand Down
140 changes: 49 additions & 91 deletions detectors/aws/eks/detector.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,51 +16,48 @@ package eks

import (
"context"
"crypto/tls"
"crypto/x509"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"os"
"regexp"
"strings"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/resource"
"go.opentelemetry.io/otel/semconv"
)

const (
k8sSvcURL = "https://kubernetes.default.svc"
k8sTokenPath = "/var/run/secrets/kubernetes.io/serviceaccount/token"
k8sCertPath = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt"
authConfigmapPath = "/api/v1/namespaces/kube-system/configmaps/aws-auth"
cwConfigmapPath = "/api/v1/namespaces/amazon-cloudwatch/configmaps/cluster-info"
authConfigmapNS = "kube-system"
authConfigmapName = "aws-auth"
cwConfigmapNS = "amazon-cloudwatch"
cwConfigmapName = "cluster-info"
defaultCgroupPath = "/proc/self/cgroup"
containerIDLength = 64
timeoutMillis = 2000
)

// detectorUtils is used for testing the resourceDetector by abstracting functions that rely on external systems.
type detectorUtils interface {
fileExists(filename string) bool
fetchString(httpMethod string, URL string) (string, error)
getConfigMap(ctx context.Context, namespace string, name string) (map[string]string, error)
getContainerID() (string, error)
}

// This struct will implement the detectorUtils interface
type eksDetectorUtils struct{}
type eksDetectorUtils struct {
clientset *kubernetes.Clientset
}

// resourceDetector for detecting resources running on Amazon EKS
type resourceDetector struct {
utils detectorUtils
}

// This struct will help unmarshal clustername from JSON response
type data struct {
ClusterName string `json:"cluster.name"`
err error
}

// Compile time assertion that resourceDetector implements the resource.Detector interface.
Expand All @@ -71,13 +68,17 @@ var _ detectorUtils = (*eksDetectorUtils)(nil)

// NewResourceDetector returns a resource detector that will detect AWS EKS resources.
func NewResourceDetector() resource.Detector {
return &resourceDetector{utils: eksDetectorUtils{}}
utils, err := newK8sDetectorUtils()
return &resourceDetector{utils: utils, err: err}
}

// Detect returns a Resource describing the Amazon EKS environment being run in.
func (detector *resourceDetector) Detect(ctx context.Context) (*resource.Resource, error) {
if detector.err != nil {
return nil, detector.err
}

isEks, err := isEKS(detector.utils)
isEks, err := isEKS(ctx, detector.utils)
if err != nil {
return nil, err
}
Expand All @@ -91,7 +92,7 @@ func (detector *resourceDetector) Detect(ctx context.Context) (*resource.Resourc
attributes := []attribute.KeyValue{}

// Get clusterName and append to attributes
clusterName, err := getClusterName(detector.utils)
clusterName, err := getClusterName(ctx, detector.utils)
if err != nil {
return nil, err
}
Expand All @@ -110,22 +111,38 @@ func (detector *resourceDetector) Detect(ctx context.Context) (*resource.Resourc

// Return new resource object with clusterName and containerID as attributes
return resource.NewWithAttributes(attributes...), nil

}

// isEKS checks if the current environment is running in EKS.
func isEKS(utils detectorUtils) (bool, error) {
func isEKS(ctx context.Context, utils detectorUtils) (bool, error) {
if !isK8s(utils) {
return false, nil
}

// Make HTTP GET request
awsAuth, err := utils.fetchString(http.MethodGet, k8sSvcURL+authConfigmapPath)
awsAuth, err := utils.getConfigMap(ctx, authConfigmapNS, authConfigmapName)
if err != nil {
return false, fmt.Errorf("isEks() error retrieving auth configmap: %w", err)
}

return awsAuth != "", nil
return awsAuth != nil, nil
}

// newK8sDetectorUtils creates the Kubernetes clientset
func newK8sDetectorUtils() (*eksDetectorUtils, error) {
// Get cluster configuration
confs, err := rest.InClusterConfig()
if err != nil {
return nil, fmt.Errorf("failed to create config: %w", err)
}

// Create clientset using generated configuration
clientset, err := kubernetes.NewForConfig(confs)
if err != nil {
return nil, fmt.Errorf("failed to create clientset for Kubernetes client")
}

return &eksDetectorUtils{clientset: clientset}, nil
}

// isK8s checks if the current environment is running in a Kubernetes environment
Expand All @@ -139,84 +156,24 @@ func (eksUtils eksDetectorUtils) fileExists(filename string) bool {
return err == nil && !info.IsDir()
}

// fetchString executes an HTTP request with a given HTTP Method and URL string.
func (eksUtils eksDetectorUtils) fetchString(httpMethod string, URL string) (string, error) {
request, err := http.NewRequest(httpMethod, URL, nil)
if err != nil {
return "", fmt.Errorf("failed to create new HTTP request with method=%s, URL=%s: %w", httpMethod, URL, err)
}

// Set HTTP request header with authentication credentials
authHeader, err := getK8sCredHeader()
// getConfigMap retrieves the configuration map from the k8s API
func (eksUtils eksDetectorUtils) getConfigMap(ctx context.Context, namespace string, name string) (map[string]string, error) {
cm, err := eksUtils.clientset.CoreV1().ConfigMaps(namespace).Get(ctx, name, metav1.GetOptions{})
if err != nil {
return "", err
return nil, fmt.Errorf("failed to retrieve ConfigMap %s/%s: %w", namespace, name, err)
}
request.Header.Set("Authorization", authHeader)

// Get certificate
caCert, err := ioutil.ReadFile(k8sCertPath)
if err != nil {
return "", fmt.Errorf("failed to read file with path %s", k8sCertPath)
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)

// Set HTTP request timeout and add certificate
client := &http.Client{
Timeout: timeoutMillis * time.Millisecond,
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
RootCAs: caCertPool,
},
},
}

response, err := client.Do(request)
if err != nil || response.StatusCode != http.StatusOK {
return "", fmt.Errorf("failed to execute HTTP request with method=%s, URL=%s, Status Code=%d: %w", httpMethod, URL, response.StatusCode, err)
}

// Retrieve response body from HTTP request
body, err := ioutil.ReadAll(response.Body)
if err != nil {
return "", fmt.Errorf("failed to read response from HTTP request with method=%s, URL=%s: %w", httpMethod, URL, err)
}

return string(body), nil
}

// getK8sCredHeader retrieves the kubernetes credential information.
func getK8sCredHeader() (string, error) {
content, err := ioutil.ReadFile(k8sTokenPath)
if err != nil {
return "", fmt.Errorf("getK8sCredHeader() error: cannot read file with path %s", k8sTokenPath)
}

return "Bearer " + string(content), nil
return cm.Data, nil
}

// getClusterName retrieves the clusterName resource attribute
func getClusterName(utils detectorUtils) (string, error) {
resp, err := utils.fetchString("GET", k8sSvcURL+cwConfigmapPath)
func getClusterName(ctx context.Context, utils detectorUtils) (string, error) {
resp, err := utils.getConfigMap(ctx, cwConfigmapNS, cwConfigmapName)
if err != nil {
return "", fmt.Errorf("getClusterName() error: %w", err)
}

// parse JSON object returned from HTTP request
var respmap map[string]json.RawMessage
err = json.Unmarshal([]byte(resp), &respmap)
if err != nil {
return "", fmt.Errorf("getClusterName() error: cannot parse JSON: %w", err)
}
var d data
err = json.Unmarshal(respmap["data"], &d)
if err != nil {
return "", fmt.Errorf("getClusterName() error: cannot parse JSON: %w", err)
}

clusterName := d.ClusterName

return clusterName, nil
return resp["cluster.name"], nil
}

// getContainerID returns the containerID if currently running within a container.
Expand All @@ -226,6 +183,7 @@ func (eksUtils eksDetectorUtils) getContainerID() (string, error) {
return "", fmt.Errorf("getContainerID() error: cannot read file with path %s: %w", defaultCgroupPath, err)
}

// is this going to stop working with 1.20 when Docker is deprecated?
r, err := regexp.Compile(`^.*/docker/(.+)$`)
if err != nil {
return "", err
Expand Down
18 changes: 8 additions & 10 deletions detectors/aws/eks/detector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ func (detectorUtils *MockDetectorUtils) fileExists(filename string) bool {
return args.Bool(0)
}

// Mock function for fetchString()
func (detectorUtils *MockDetectorUtils) fetchString(httpMethod string, URL string) (string, error) {
args := detectorUtils.Called(httpMethod, URL)
return args.String(0), args.Error(1)
// Mock function for getConfigMap()
func (detectorUtils *MockDetectorUtils) getConfigMap(_ context.Context, namespace string, name string) (map[string]string, error) {
args := detectorUtils.Called(namespace, name)
return args.Get(0).(map[string]string), args.Error(1)
}

// Mock function for getContainerID()
Expand All @@ -51,14 +51,13 @@ func (detectorUtils *MockDetectorUtils) getContainerID() (string, error) {

// Tests EKS resource detector running in EKS environment
func TestEks(t *testing.T) {

detectorUtils := new(MockDetectorUtils)

// Mock functions and set expectations
detectorUtils.On("fileExists", k8sTokenPath).Return(true)
detectorUtils.On("fileExists", k8sCertPath).Return(true)
detectorUtils.On("fetchString", "GET", k8sSvcURL+authConfigmapPath).Return("not empty", nil)
detectorUtils.On("fetchString", "GET", k8sSvcURL+cwConfigmapPath).Return(`{"data":{"cluster.name":"my-cluster"}}`, nil)
detectorUtils.On("getConfigMap", authConfigmapNS, authConfigmapName).Return(map[string]string{"not": "nil"}, nil)
detectorUtils.On("getConfigMap", cwConfigmapNS, cwConfigmapName).Return(map[string]string{"cluster.name": "my-cluster"}, nil)
detectorUtils.On("getContainerID").Return("0123456789A", nil)

// Expected resource object
Expand All @@ -69,7 +68,7 @@ func TestEks(t *testing.T) {
expectedResource := resource.NewWithAttributes(eksResourceLabels...)

// Call EKS Resource detector to detect resources
eksResourceDetector := resourceDetector{detectorUtils}
eksResourceDetector := resourceDetector{utils: detectorUtils}
resourceObj, err := eksResourceDetector.Detect(context.Background())
require.NoError(t, err)

Expand All @@ -79,15 +78,14 @@ func TestEks(t *testing.T) {

// Tests EKS resource detector not running in EKS environment
func TestNotEKS(t *testing.T) {

detectorUtils := new(MockDetectorUtils)

k8sTokenPath := "/var/run/secrets/kubernetes.io/serviceaccount/token"

// Mock functions and set expectations
detectorUtils.On("fileExists", k8sTokenPath).Return(false)

detector := resourceDetector{detectorUtils}
detector := resourceDetector{utils: detectorUtils}
r, err := detector.Detect(context.Background())
require.NoError(t, err)
assert.Equal(t, resource.Empty(), r, "Resource object should be empty")
Expand Down
2 changes: 2 additions & 0 deletions detectors/aws/eks/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,6 @@ require (
github.com/stretchr/testify v1.7.0
go.opentelemetry.io/otel v0.20.0
go.opentelemetry.io/otel/sdk v0.20.0
k8s.io/apimachinery v0.21.1
k8s.io/client-go v0.21.1
)
Loading

0 comments on commit f1ff9d3

Please sign in to comment.