Skip to content

Commit

Permalink
Refactor controller main for easier future merges
Browse files Browse the repository at this point in the history
  • Loading branch information
Rob Brockbank committed Aug 4, 2018
1 parent 64361de commit e8ad129
Showing 1 changed file with 69 additions and 15 deletions.
84 changes: 69 additions & 15 deletions cmd/kube-controllers/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/pkg/transport"
"github.com/projectcalico/kube-controllers/pkg/config"
"github.com/projectcalico/kube-controllers/pkg/controllers/controller"
"github.com/projectcalico/kube-controllers/pkg/controllers/namespace"
"github.com/projectcalico/kube-controllers/pkg/controllers/networkpolicy"
"github.com/projectcalico/kube-controllers/pkg/controllers/node"
Expand Down Expand Up @@ -97,30 +98,49 @@ func main() {
if err != nil {
log.WithError(err).Fatal("Failed to initialize Calico datastore")
}
// Initialize readiness to false if enabled
s := status.New(status.DefaultStatusFile)
if config.HealthEnabled {
s.SetReady("CalicoDatastore", false, "initialized to false")
s.SetReady("KubeAPIServer", false, "initialized to false")

controllerCtrl := &controllerControl{
ctx: ctx,
controllerStates: make(map[string]*controllerState),
config: config,
stop: stop,
}

// Create the status file. We will only update it if we have healthchecks enabled.
s := status.New(status.DefaultStatusFile)

for _, controllerType := range strings.Split(config.EnabledControllers, ",") {
switch controllerType {
case "workloadendpoint":
podController := pod.NewPodController(ctx, k8sClientset, calicoClient)
go podController.Run(config.WorkloadEndpointWorkers, config.ReconcilerPeriod, stop)
controllerCtrl.controllerStates["Pod"] = &controllerState{
controller: podController,
threadiness: config.WorkloadEndpointWorkers,
}
case "profile", "namespace":
namespaceController := namespace.NewNamespaceController(ctx, k8sClientset, calicoClient)
go namespaceController.Run(config.ProfileWorkers, config.ReconcilerPeriod, stop)
controllerCtrl.controllerStates["Namespace"] = &controllerState{
controller: namespaceController,
threadiness: config.ProfileWorkers,
}
case "policy":
policyController := networkpolicy.NewPolicyController(ctx, k8sClientset, calicoClient)
go policyController.Run(config.PolicyWorkers, config.ReconcilerPeriod, stop)
controllerCtrl.controllerStates["NetworkPolicy"] = &controllerState{
controller: policyController,
threadiness: config.PolicyWorkers,
}
case "node":
nodeController := node.NewNodeController(ctx, k8sClientset, calicoClient)
go nodeController.Run(config.NodeWorkers, config.ReconcilerPeriod, stop)
controllerCtrl.controllerStates["Node"] = &controllerState{
controller: nodeController,
threadiness: config.NodeWorkers,
}
case "serviceaccount":
serviceAccountController := serviceaccount.NewServiceAccountController(ctx, k8sClientset, calicoClient)
go serviceAccountController.Run(config.ProfileWorkers, config.ReconcilerPeriod, stop)
controllerCtrl.controllerStates["ServiceAccount"] = &controllerState{
controller: serviceAccountController,
threadiness: config.ProfileWorkers,
}
default:
log.Fatalf("Invalid controller '%s' provided. Valid options are workloadendpoint, profile, policy", controllerType)
}
Expand All @@ -129,15 +149,26 @@ func main() {
// If configured to do so, start an etcdv3 compaction.
startCompactor(ctx, config)

// Wait forever and perform healthchecks.
// Run the health checks on a separate goroutine.
if config.HealthEnabled {
go runHealthChecks(ctx, s, k8sClientset, calicoClient)
}

// Run the controllers. This runs indefinitely.
controllerCtrl.RunControllers()
}

// Run the controller health checks.
func runHealthChecks(ctx context.Context, s *status.Status, k8sClientset *kubernetes.Clientset, calicoClient client.Interface) {
s.SetReady("CalicoDatastore", false, "initialized to false")
s.SetReady("KubeAPIServer", false, "initialized to false")

// Loop forever and perform healthchecks.
for {
// skip healthchecks if configured
if !config.HealthEnabled {
select {}
}
// Datastore HealthCheck
healthCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
err = calicoClient.EnsureInitialized(healthCtx, "", "k8s")
err := calicoClient.EnsureInitialized(healthCtx, "", "k8s")
if err != nil {
log.WithError(err).Errorf("Failed to verify datastore")
s.SetReady(
Expand Down Expand Up @@ -255,3 +286,26 @@ func newEtcdV3Client() (*clientv3.Client, error) {

return clientv3.New(cfg)
}

// Object for keeping track of controller states and statuses.
type controllerControl struct {
ctx context.Context
controllerStates map[string]*controllerState
config *config.Config
stop chan struct{}
}

// Runs all the controllers and blocks indefinitely.
func (cc *controllerControl) RunControllers() {
for controllerType, cs := range cc.controllerStates {
log.WithField("ControllerType", controllerType).Info("Starting controller")
go cs.controller.Run(cs.threadiness, cc.config.ReconcilerPeriod, cc.stop)
}
select {}
}

// Object for keeping track of Controller information.
type controllerState struct {
controller controller.Controller
threadiness int
}

0 comments on commit e8ad129

Please sign in to comment.