Skip to content

Commit

Permalink
Refactor podman (#10421)
Browse files Browse the repository at this point in the history
* feat: move podman client to libpod client

* feat: add new container scraper entity

* feat: add new methods to client

- list the running containers
- events

* feat: new logic to container scraper

The container scraper spawns a go routine to listen for container events
and keep track of the running containers. Only containers in the local
map will be fetched, thus we can easily add inclusion/exclusion filters.
In addition, the containers map contain extra information like the
pod name.

* feat: fetch stats per container

* feat: add list container test

* feat: add test case for image attribute

* feat: update CHANGELOG.md

* fix: linter error

* fix: cross-compile windows tags

* Update receiver/podmanreceiver/podman.go

Co-authored-by: Tyler Helmuth <12352919+TylerHelmuth@users.noreply.github.com>

* change variable name for listing containers

* feat: add test for events function

* feat: use unexported fields

* fix: changelog version entry

* add unreleased file

* fix lint errors

* remove redundant line break

* fix linter issue with go.mod file

Co-authored-by: Alex Boten <aboten@lightstep.com>
Co-authored-by: Tyler Helmuth <12352919+TylerHelmuth@users.noreply.github.com>
Co-authored-by: Dmitrii Anoshin <anoshindx@gmail.com>
  • Loading branch information
4 people authored Jul 8, 2022
1 parent f316178 commit e149ae2
Show file tree
Hide file tree
Showing 12 changed files with 952 additions and 267 deletions.
2 changes: 1 addition & 1 deletion receiver/podmanreceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
go.opentelemetry.io/collector v0.55.0
go.opentelemetry.io/collector/pdata v0.55.0
go.opentelemetry.io/collector/semconv v0.55.0
go.uber.org/multierr v1.8.0
go.uber.org/zap v1.21.0
golang.org/x/crypto v0.0.0-20220507011949-2cf3adece122
)
Expand All @@ -32,7 +33,6 @@ require (
go.opentelemetry.io/otel/metric v0.30.0 // indirect
go.opentelemetry.io/otel/trace v1.7.0 // indirect
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.8.0 // indirect
golang.org/x/net v0.0.0-20220225172249-27dd8689420f // indirect
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect
golang.org/x/text v0.3.7 // indirect
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"io/ioutil"
"net/http"
"net/url"
"time"

"go.uber.org/zap"
)
Expand All @@ -34,68 +33,24 @@ var (
errNoStatsFound = fmt.Errorf("No stats found")
)

type containerStats struct {
AvgCPU float64
ContainerID string
Name string
PerCPU []uint64
CPU float64
CPUNano uint64
CPUSystemNano uint64
DataPoints int64
SystemNano uint64
MemUsage uint64
MemLimit uint64
MemPerc float64
NetInput uint64
NetOutput uint64
BlockInput uint64
BlockOutput uint64
PIDs uint64
UpTime time.Duration
Duration uint64
}

type containerStatsReportError struct {
Cause string
Message string
Response int64
}

type containerStatsReport struct {
Error containerStatsReportError
Stats []containerStats
}

type clientFactory func(logger *zap.Logger, cfg *Config) (client, error)

type client interface {
ping(context.Context) error
stats(context.Context) ([]containerStats, error)
}

type podmanClient struct {
type libpodClient struct {
conn *http.Client
endpoint string

// The maximum amount of time to wait for Podman API responses
timeout time.Duration
}

func newPodmanClient(logger *zap.Logger, cfg *Config) (client, error) {
func newLibpodClient(logger *zap.Logger, cfg *Config) (PodmanClient, error) {
connection, err := newPodmanConnection(logger, cfg.Endpoint, cfg.SSHKey, cfg.SSHPassphrase)
if err != nil {
return nil, err
}
c := &podmanClient{
c := &libpodClient{
conn: connection,
endpoint: fmt.Sprintf("http://d/v%s/libpod", cfg.APIVersion),
timeout: cfg.Timeout,
}
return c, nil
}

func (c *podmanClient) request(ctx context.Context, path string, params url.Values) (*http.Response, error) {
func (c *libpodClient) request(ctx context.Context, path string, params url.Values) (*http.Response, error) {
req, err := http.NewRequestWithContext(ctx, "GET", c.endpoint+path, nil)
if err != nil {
return nil, err
Expand All @@ -107,13 +62,8 @@ func (c *podmanClient) request(ctx context.Context, path string, params url.Valu
return c.conn.Do(req)
}

func (c *podmanClient) stats(ctx context.Context) ([]containerStats, error) {
params := url.Values{}
params.Add("stream", "false")

statsCtx, cancel := context.WithTimeout(ctx, c.timeout)
defer cancel()
resp, err := c.request(statsCtx, "/containers/stats", params)
func (c *libpodClient) stats(ctx context.Context, options url.Values) ([]containerStats, error) {
resp, err := c.request(ctx, "/containers/stats", options)
if err != nil {
return nil, err
}
Expand All @@ -138,10 +88,28 @@ func (c *podmanClient) stats(ctx context.Context) ([]containerStats, error) {
return report.Stats, nil
}

func (c *podmanClient) ping(ctx context.Context) error {
pingCtx, cancel := context.WithTimeout(ctx, c.timeout)
defer cancel()
resp, err := c.request(pingCtx, "/_ping", nil)
func (c *libpodClient) list(ctx context.Context, options url.Values) ([]container, error) {
resp, err := c.request(ctx, "/containers/json", options)
if err != nil {
return nil, err
}
defer resp.Body.Close()

bytes, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}

var report []container
err = json.Unmarshal(bytes, &report)
if err != nil {
return nil, err
}
return report, nil
}

func (c *libpodClient) ping(ctx context.Context) error {
resp, err := c.request(ctx, "/_ping", nil)
if err != nil {
return err
}
Expand All @@ -151,3 +119,50 @@ func (c *podmanClient) ping(ctx context.Context) error {
}
return nil
}

// events returns a stream of events. It's up to the caller to close the stream by canceling the context.
func (c *libpodClient) events(ctx context.Context, options url.Values) (<-chan event, <-chan error) {
events := make(chan event)
errs := make(chan error, 1)

started := make(chan struct{})
go func() {
defer close(errs)

resp, err := c.request(ctx, "/events", options)
if err != nil {
close(started)
errs <- err
return
}
defer resp.Body.Close()

dec := json.NewDecoder(resp.Body)
close(started)
for {
var e event
select {
case <-ctx.Done():
errs <- ctx.Err()
return
default:
err := dec.Decode(&e)
if err != nil {
errs <- err
return
}

select {
case events <- e:
case <-ctx.Done():
errs <- ctx.Err()
return
}
}
}
}()

<-started

return events, errs
}
Loading

0 comments on commit e149ae2

Please sign in to comment.