Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: supports k8s cluster #3599

Merged
merged 24 commits into from
Feb 10, 2025
Merged
Prev Previous commit
Next Next commit
fixes stats
  • Loading branch information
amir20 committed Feb 10, 2025
commit ce2a608ff0be36fdad35d29cd0afc947c607c1ef
44 changes: 7 additions & 37 deletions internal/k8s/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/amir20/dozzle/internal/utils"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
metricsclient "k8s.io/metrics/pkg/client/clientset/versioned"

"github.com/rs/zerolog/log"

Expand All @@ -23,9 +22,9 @@ import (
)

type K8sClient struct {
Clientset *kubernetes.Clientset
metricsClient *metricsclient.Clientset
namespace string
Clientset *kubernetes.Clientset
namespace string
config *rest.Config
}

func NewK8sClient(namespace string) (*K8sClient, error) {
Expand Down Expand Up @@ -57,15 +56,10 @@ func NewK8sClient(namespace string) (*K8sClient, error) {
return nil, err
}

metricsClient, err := metricsclient.NewForConfig(config)
if err != nil {
return nil, err
}

return &K8sClient{
Clientset: clientset,
namespace: namespace,
metricsClient: metricsClient,
Clientset: clientset,
namespace: namespace,
config: config,
}, nil
}
func (k *K8sClient) ListContainers(ctx context.Context, labels container.ContainerLabels) ([]container.Container, error) {
Expand Down Expand Up @@ -205,31 +199,7 @@ func (k *K8sClient) ContainerEvents(ctx context.Context, ch chan<- container.Con
}

func (k *K8sClient) ContainerStats(ctx context.Context, id string, stats chan<- container.ContainerStat) error {
ticker := time.NewTicker(1 * time.Second)

pod, id := parsePodContainerID(id)

for range ticker.C {
podMetrics, err := k.metricsClient.MetricsV1beta1().PodMetricses(k.namespace).Get(context.Background(), pod, metav1.GetOptions{})
if err != nil {
panic(err.Error())
}
for _, c := range podMetrics.Containers {
log.Trace().Interface("container", c).Msg("Pod stat")

select {
case <-ctx.Done():
return nil
case stats <- container.ContainerStat{
ID: pod + ":" + c.Name,
CPUPercent: c.Usage.Cpu().AsApproximateFloat64(),
MemoryUsage: c.Usage.Memory().AsApproximateFloat64(),
}:
}
}
}

return nil
panic("not implemented")
}

func (k *K8sClient) Ping(ctx context.Context) error {
Expand Down
126 changes: 126 additions & 0 deletions internal/k8s/stats_collector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package k8s

import (
"context"
"sync"
"sync/atomic"
"time"

"github.com/amir20/dozzle/internal/container"
"github.com/puzpuzpuz/xsync/v3"
"github.com/rs/zerolog/log"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
metricsclient "k8s.io/metrics/pkg/client/clientset/versioned"
)

var timeToStop = 2 * time.Hour

type K8sStatsCollector struct {
client *K8sClient
metrics *metricsclient.Clientset
subscribers *xsync.MapOf[context.Context, chan<- container.ContainerStat]
stopper context.CancelFunc
timer *time.Timer
mu sync.Mutex
totalStarted atomic.Int32
labels container.ContainerLabels
containers *xsync.MapOf[string, corev1.Container]
}

func NewK8sStatsCollector(client *K8sClient, labels container.ContainerLabels) (*K8sStatsCollector, error) {
metricsClient, err := metricsclient.NewForConfig(client.config)
if err != nil {
return nil, err
}
return &K8sStatsCollector{
subscribers: xsync.NewMapOf[context.Context, chan<- container.ContainerStat](),
client: client,
labels: labels,
metrics: metricsClient,
}, nil
}

func (c *K8sStatsCollector) Subscribe(ctx context.Context, stats chan<- container.ContainerStat) {
c.subscribers.Store(ctx, stats)
go func() {
<-ctx.Done()
c.subscribers.Delete(ctx)
}()
}

func (c *K8sStatsCollector) Stop() {
c.mu.Lock()
defer c.mu.Unlock()
if c.totalStarted.Add(-1) == 0 {
c.timer = time.AfterFunc(timeToStop, func() {
c.forceStop()
})
}
}

func (c *K8sStatsCollector) forceStop() {
c.mu.Lock()
defer c.mu.Unlock()
if c.stopper != nil {
c.stopper()
c.stopper = nil
log.Debug().Msg("stopped container k8s stats collector")
}
}

func (c *K8sStatsCollector) reset() {
c.mu.Lock()
defer c.mu.Unlock()
if c.timer != nil {
c.timer.Stop()
}
c.timer = nil
}

// Start starts the stats collector and blocks until it's stopped. It returns true if the collector was stopped, false if it was already running
func (sc *K8sStatsCollector) Start(parentCtx context.Context) bool {
sc.reset()
sc.totalStarted.Add(1)

sc.mu.Lock()
if sc.stopper != nil {
sc.mu.Unlock()
return false
}
var ctx context.Context
ctx, sc.stopper = context.WithCancel(parentCtx)
sc.mu.Unlock()

ticker := time.NewTicker(1 * time.Second)

for {
select {
case <-ticker.C:
metricList, err := sc.metrics.MetricsV1beta1().PodMetricses(sc.client.namespace).List(ctx, metav1.ListOptions{})
if err != nil {
panic(err.Error())
}
for _, pod := range metricList.Items {
for _, c := range pod.Containers {
stat := container.ContainerStat{
ID: pod.Name + ":" + c.Name,
CPUPercent: c.Usage.Cpu().AsApproximateFloat64(),
MemoryUsage: c.Usage.Memory().AsApproximateFloat64(),
}

sc.subscribers.Range(func(c context.Context, stats chan<- container.ContainerStat) bool {
select {
case stats <- stat:
case <-c.Done():
sc.subscribers.Delete(c)
}
return true
})
}
}
case <-ctx.Done():
return true
}
}
}
9 changes: 8 additions & 1 deletion internal/support/k8s/k8s_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package k8s_support
import (
"context"
"io"

"github.com/rs/zerolog/log"

"time"

"github.com/amir20/dozzle/internal/container"
Expand All @@ -15,9 +18,13 @@ type K8sClientService struct {
}

func NewK8sClientService(client *k8s.K8sClient, labels container.ContainerLabels) *K8sClientService {
statsCollector, err := k8s.NewK8sStatsCollector(client, labels)
if err != nil {
log.Fatal().Err(err).Msg("Could not create k8s stats collector")
}
return &K8sClientService{
client: client,
store: container.NewContainerStore(context.Background(), client, nil, labels), // TODO fixme
store: container.NewContainerStore(context.Background(), client, statsCollector, labels),
}
}

Expand Down
Loading