Skip to content

Commit

Permalink
[receivers/awscontainerinsightsreceiver] Add podresourcesstore in aws…
Browse files Browse the repository at this point in the history
…containerinsightreceiver (#167)

[receivers/awscontainerinsightsreceiver] add podresourcesstore to awscontainerinsightsreceiver
  • Loading branch information
aditya-purang authored Mar 6, 2024
1 parent 51289a1 commit a4af460
Show file tree
Hide file tree
Showing 6 changed files with 544 additions and 1 deletion.
20 changes: 20 additions & 0 deletions .chloggen/add-pod-resources-store.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Use this changelog template to create an entry for release notes.
# If your change doesn't affect end users, such as a test fix or a tooling change,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: awscontainerinsightreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: added a new podresourcestore which provides mapping from resource to container and vice-versa

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [167]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: this change provides a new store in awscontainerinsightreceiver, which when started provides mapping from resources to container and vice versa using kubelet podresourcesapi.
3 changes: 2 additions & 1 deletion receiver/awscontainerinsightreceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@ require (
go.opentelemetry.io/otel/trace v1.23.1
go.uber.org/goleak v1.3.0
go.uber.org/zap v1.26.0
google.golang.org/grpc v1.61.0
k8s.io/api v0.29.2
k8s.io/apimachinery v0.29.2
k8s.io/client-go v0.29.2
k8s.io/klog v1.0.0
k8s.io/kubelet v0.27.3
)

require (
Expand Down Expand Up @@ -140,7 +142,6 @@ require (
golang.org/x/time v0.4.0 // indirect
google.golang.org/appengine v1.6.8 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240102182953-50ed04b92917 // indirect
google.golang.org/grpc v1.61.0 // indirect
google.golang.org/protobuf v1.32.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions receiver/awscontainerinsightreceiver/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package kubeletutil // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/stores/kubeletutil"

import (
"context"
"fmt"
"net"
"os"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1"
)

const (
socketPath = "/var/lib/kubelet/pod-resources/kubelet.sock"
connectionTimeout = 10 * time.Second
)

type PodResourcesClient struct {
delegateClient podresourcesapi.PodResourcesListerClient
conn *grpc.ClientConn
}

func NewPodResourcesClient() (*PodResourcesClient, error) {
podResourcesClient := &PodResourcesClient{}

conn, err := podResourcesClient.connectToServer(socketPath)
podResourcesClient.conn = conn
if err != nil {
return nil, fmt.Errorf("failed to connect to server: %w", err)
}

podResourcesClient.delegateClient = podresourcesapi.NewPodResourcesListerClient(conn)

return podResourcesClient, nil
}

func (p *PodResourcesClient) connectToServer(socket string) (*grpc.ClientConn, error) {
_, err := os.Stat(socket)
if os.IsNotExist(err) {
return nil, fmt.Errorf("socket path does not exist: %s", socket)
} else if err != nil {
return nil, fmt.Errorf("failed to check socket path: %w", err)
}

ctx, cancel := context.WithTimeout(context.Background(), connectionTimeout)
defer cancel()

conn, err := grpc.DialContext(ctx,
socket,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithBlock(),
grpc.WithContextDialer(func(ctx context.Context, addr string) (net.Conn, error) {
d := net.Dialer{}
return d.DialContext(ctx, "unix", addr)
}),
)

if err != nil {
return nil, fmt.Errorf("failure connecting to '%s': %w", socket, err)
}

return conn, nil
}

func (p *PodResourcesClient) ListPods() (*podresourcesapi.ListPodResourcesResponse, error) {
ctx, cancel := context.WithTimeout(context.Background(), connectionTimeout)
defer cancel()

resp, err := p.delegateClient.List(ctx, &podresourcesapi.ListPodResourcesRequest{})
if err != nil {
return nil, fmt.Errorf("failure getting pod resources: %w", err)
}

return resp, nil
}

func (p *PodResourcesClient) Shutdown() {
err := p.conn.Close()
if err != nil {
return
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package stores // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/stores"

import (
"context"
"fmt"
"sync"
"time"

"go.uber.org/zap"
v1 "k8s.io/kubelet/pkg/apis/podresources/v1"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awscontainerinsightreceiver/internal/stores/kubeletutil"
)

const (
taskTimeout = 10 * time.Second
)

var (
instance *PodResourcesStore
once sync.Once
)

type ContainerInfo struct {
PodName string
ContainerName string
Namespace string
}

type ResourceInfo struct {
ResourceName string
DeviceID string
}

type PodResourcesClientInterface interface {
ListPods() (*v1.ListPodResourcesResponse, error)
Shutdown()
}

type PodResourcesStore struct {
containerInfoToResourcesMap map[ContainerInfo][]ResourceInfo
resourceToPodContainerMap map[ResourceInfo]ContainerInfo
resourceNameSet map[string]struct{}
lastRefreshed time.Time
ctx context.Context
cancel context.CancelFunc
logger *zap.Logger
podResourcesClient PodResourcesClientInterface
}

func NewPodResourcesStore(logger *zap.Logger) *PodResourcesStore {
once.Do(func() {
podResourcesClient, _ := kubeletutil.NewPodResourcesClient()
ctx, cancel := context.WithCancel(context.Background())
instance = &PodResourcesStore{
containerInfoToResourcesMap: make(map[ContainerInfo][]ResourceInfo),
resourceToPodContainerMap: make(map[ResourceInfo]ContainerInfo),
resourceNameSet: make(map[string]struct{}),
lastRefreshed: time.Now(),
ctx: ctx,
cancel: cancel,
logger: logger,
podResourcesClient: podResourcesClient,
}

go func() {
refreshTicker := time.NewTicker(time.Second)
for {
select {
case <-refreshTicker.C:
instance.refreshTick()
case <-instance.ctx.Done():
refreshTicker.Stop()
return
}
}
}()
})
return instance
}

func (p *PodResourcesStore) refreshTick() {
now := time.Now()
if now.Sub(p.lastRefreshed) >= taskTimeout {
p.refresh()
p.lastRefreshed = now
}
}

func (p *PodResourcesStore) refresh() {
doRefresh := func() {
p.updateMaps()
}

refreshWithTimeout(p.ctx, doRefresh, taskTimeout)
}

func (p *PodResourcesStore) updateMaps() {
p.containerInfoToResourcesMap = make(map[ContainerInfo][]ResourceInfo)
p.resourceToPodContainerMap = make(map[ResourceInfo]ContainerInfo)

if len(p.resourceNameSet) == 0 {
p.logger.Warn("No resource names allowlisted thus skipping updating of maps.")
return
}

devicePods, err := p.podResourcesClient.ListPods()

if err != nil {
p.logger.Error(fmt.Sprintf("Error getting pod resources: %v", err))
return
}

for _, pod := range devicePods.GetPodResources() {
for _, container := range pod.GetContainers() {
for _, device := range container.GetDevices() {

containerInfo := ContainerInfo{
PodName: pod.GetName(),
Namespace: pod.GetNamespace(),
ContainerName: container.GetName(),
}

for _, deviceID := range device.GetDeviceIds() {
resourceInfo := ResourceInfo{
ResourceName: device.GetResourceName(),
DeviceID: deviceID,
}
_, found := p.resourceNameSet[resourceInfo.ResourceName]
if found {
p.containerInfoToResourcesMap[containerInfo] = append(p.containerInfoToResourcesMap[containerInfo], resourceInfo)
p.resourceToPodContainerMap[resourceInfo] = containerInfo
}
}
}
}
}
}

func (p *PodResourcesStore) GetContainerInfo(deviceID string, resourceName string) *ContainerInfo {
key := ResourceInfo{DeviceID: deviceID, ResourceName: resourceName}
if containerInfo, ok := p.resourceToPodContainerMap[key]; ok {
return &containerInfo
}
return nil
}

func (p *PodResourcesStore) GetResourcesInfo(podName string, containerName string, namespace string) *[]ResourceInfo {
key := ContainerInfo{PodName: podName, ContainerName: containerName, Namespace: namespace}
if resourceInfo, ok := p.containerInfoToResourcesMap[key]; ok {
return &resourceInfo
}
return nil
}

func (p *PodResourcesStore) AddResourceName(resourceName string) {
p.resourceNameSet[resourceName] = struct{}{}
}

func (p *PodResourcesStore) Shutdown() {
p.cancel()
p.podResourcesClient.Shutdown()
}
Loading

0 comments on commit a4af460

Please sign in to comment.