Skip to content
This repository has been archived by the owner on Apr 20, 2021. It is now read-only.

Commit

Permalink
Allow running serve out-of-cluster
Browse files Browse the repository at this point in the history
kubeconfig can be passed via flag

Closes #9
  • Loading branch information
robertgzr committed Jul 26, 2017
1 parent ece18df commit d163cfe
Show file tree
Hide file tree
Showing 8 changed files with 182 additions and 46 deletions.
58 changes: 50 additions & 8 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 0 additions & 5 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,3 @@
# [[override]]
# name = "github.com/x/y"
# version = "2.4.0"


[[constraint]]
branch = "master"
name = "github.com/spf13/cobra"
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ all: $(BIN)

build: deps $(BIN)
$(BIN): bpf/bindata.go
GOOS=$(GOOS) go build \
go build \
-ldflags "-X github.com/kinvolk/cgnet/cmd.version=$(VERSION)" \
-o $@ .

Expand All @@ -35,7 +35,7 @@ deploy-clean: clean
docker rmi $(CONTAINER):latest

deps: build-deps
dep ensure
dep ensure -v

build-deps:
go get -u github.com/golang/dep/...
Expand Down
34 changes: 29 additions & 5 deletions cmd/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,23 @@ limitations under the License.
package cmd

import (
"context"
"fmt"
"os"
"os/signal"
"syscall"

log "github.com/inconshreveable/log15"
"github.com/spf13/cobra"

"github.com/kinvolk/cgnet/kube"
"github.com/kinvolk/cgnet/metrics"
)

var metricsPort int
var (
metricsPort int
kubeconfig string
)

var serveCmd = &cobra.Command{
Use: "serve",
Expand All @@ -31,20 +39,35 @@ var serveCmd = &cobra.Command{
}

func cmdServe(cmd *cobra.Command, args []string) {
stop := make(chan struct{})
defer close(stop)
ctx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc()

cfg, err := kube.BuildConfig(kubeconfig)
if err != nil {
log.Error("error building config", "err", err)
return
}

events := make(chan kube.Event)
go kube.RunPodInformer(stop, events)
go metrics.Serve(fmt.Sprintf(":%d", metricsPort))
go kube.WatchPodEvents(ctx, cancelFunc, cfg, events)

addr := fmt.Sprintf(":%d", metricsPort)
go metrics.Serve(ctx, addr)

// TODO
// * install bpf program on every 'new pod' event
// * query the bpf maps to retrieve data
// * update podmetrics with data

term := make(chan os.Signal)
signal.Notify(term, syscall.SIGINT, syscall.SIGTERM)

for {
select {
case <-term:
return
case <-ctx.Done():
return
case e := <-events:
switch e {
case kube.NewPodEvent:
Expand All @@ -59,4 +82,5 @@ func cmdServe(cmd *cobra.Command, args []string) {
func init() {
RootCmd.AddCommand(serveCmd)
serveCmd.Flags().IntVarP(&metricsPort, "port", "p", 9101, "metrics port")
serveCmd.Flags().StringVarP(&kubeconfig, "kubeconfig", "k", "", "path to kubeconfig file. Only required if out-of-cluster.")
}
1 change: 1 addition & 0 deletions cmd/top.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ var topCmd = &cobra.Command{
func cmdTop(cmd *cobra.Command, args []string) {
if len(args) < 1 {
cmd.Usage()
os.Exit(0)
}

if err := bpf.Setup(args[0]); err != nil {
Expand Down
49 changes: 42 additions & 7 deletions kube/events.go
Original file line number Diff line number Diff line change
@@ -1,24 +1,59 @@
package kube

import (
"log"

"k8s.io/api/core/v1"
log "github.com/inconshreveable/log15"
"k8s.io/client-go/pkg/api/v1"
)

type Event int

const (
NewPodEvent Event = iota
UpdatePodEvent
DeletePodEvent
)

func emitEvent(eChan chan Event, e Event) func(obj interface{}) {
func onAdd(events chan Event) func(obj interface{}) {
funclog := log.New("func", "onAdd")
return func(obj interface{}) {
pod, ok := obj.(*v1.Pod)
if !ok {
funclog.Error("unable to assert type")
return
}
events <- NewPodEvent
funclog.Info(pod.ObjectMeta.SelfLink)
}
}

func onUpdate(events chan Event) func(oldObj, newObj interface{}) {
// funclog := log.New("func", "onUpdate")
return func(oldObj, newObj interface{}) {
// do nothing
return
// oldPod, ok := oldObj.(*v1.Pod)
// if !ok {
// funclog.Error("unable to assert type")
// return
// }
// newPod, ok := newObj.(*v1.Pod)
// if !ok {
// funclog.Error("unable to assert type")
// return
// }
// funclog.Info(fmt.Sprintf("old: %s, new: %s", oldPod.ObjectMeta.SelfLink, newPod.ObjectMeta.SelfLink))
}
}

func onDelete(events chan Event) func(obj interface{}) {
funclog := log.New("func", "onDelete")
return func(obj interface{}) {
_, ok := obj.(*v1.Pod)
pod, ok := obj.(*v1.Pod)
if !ok {
log.Printf("unexpected object type: %#v\n", obj)
funclog.Error("unable to assert type")
return
}
eChan <- e
events <- DeletePodEvent
funclog.Info(pod.ObjectMeta.SelfLink)
}
}
52 changes: 37 additions & 15 deletions kube/pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,39 +14,61 @@ limitations under the License.
package kube

import (
"log"
"context"
"time"

"k8s.io/api/core/v1"
// "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/pkg/api/v1"

log "github.com/inconshreveable/log15"
)

func RunPodInformer(stop chan struct{}, events chan Event) {
cfg, err := rest.InClusterConfig()
func WatchPodEvents(ctx context.Context, cancelFunc context.CancelFunc, cfg *rest.Config, events chan Event) {
_, err := watchCustomResources(ctx, cfg, events)
if err != nil {
panic(err)
cancelFunc()
return
}

cs, err := kubernetes.NewForConfig(cfg)
<-ctx.Done()
log.Info("stopped watching events")
}

func watchCustomResources(ctx context.Context, cfg *rest.Config, events chan Event) (cache.Controller, error) {
clientset, err := kubernetes.NewForConfig(cfg)
if err != nil {
panic(err)
log.Error("error creating configset", "err", err)
return nil, err
}

// watch new pod events
lw := cache.NewListWatchFromClient(cs.Core().RESTClient(), "pods", v1.NamespaceDefault, fields.Everything())
_, ctl := cache.NewInformer(
lw,
source := cache.NewListWatchFromClient(clientset.Core().RESTClient(), string(v1.ResourcePods), v1.NamespaceDefault, fields.Everything())
_, k8sController := cache.NewInformer(
source,
&v1.Pod{},
0*time.Second,
1*time.Minute,
cache.ResourceEventHandlerFuncs{
AddFunc: emitEvent(events, NewPodEvent),
DeleteFunc: emitEvent(events, DeletePodEvent),
AddFunc: onAdd(events),
UpdateFunc: onUpdate(events),
DeleteFunc: onDelete(events),
},
)

log.Println("started watching pod events")
ctl.Run(stop)
go k8sController.Run(ctx.Done())
log.Info("started watching events", "resource", v1.ResourcePods)

return k8sController, nil
}

func BuildConfig(kubeconfig string) (*rest.Config, error) {
if kubeconfig != "" {
return clientcmd.BuildConfigFromFlags("", kubeconfig)
}
log.Warn("assume running inside k8s cluster")
return rest.InClusterConfig()
}
25 changes: 21 additions & 4 deletions metrics/prom.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@ limitations under the License.
package metrics

import (
"log"
"context"
"net/http"
"time"

log "github.com/inconshreveable/log15"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
Expand Down Expand Up @@ -62,8 +64,23 @@ func init() {
prometheus.MustRegister(GlobalPodMetrics.OutgoingPackets)
}

func Serve(addr string) {
log.Printf("started serving metrics on %s", addr)
func Serve(ctx context.Context, addr string) {
http.Handle("/metrics", promhttp.Handler())
log.Fatal(http.ListenAndServe(addr, nil))
srv := http.Server{
Addr: addr,
Handler: http.DefaultServeMux,
}
go srv.ListenAndServe()

log.Info("serving metrics", "addr", addr)
<-ctx.Done()

toCtx, cancelFunc := context.WithTimeout(ctx, 2*time.Second)
defer cancelFunc()

log.Info("waiting for server shutdown")
if err := srv.Shutdown(toCtx); err != nil {
panic(err)
}
log.Info("server stopped")
}

0 comments on commit d163cfe

Please sign in to comment.