Skip to content

Commit

Permalink
Move seed pod labelling to a separate controller
Browse files Browse the repository at this point in the history
* In jetstack#256 I want to refactor nodepool.Sync so that its only responsibility is to update the NodePool status.
* And the seed labelling seems like it is a separate concern that can live in its own module.
* With its own unit tests.
  • Loading branch information
wallrj committed Feb 28, 2018
1 parent 434172b commit e23ce4e
Show file tree
Hide file tree
Showing 9 changed files with 277 additions and 47 deletions.
18 changes: 11 additions & 7 deletions hack/e2e.sh
Original file line number Diff line number Diff line change
Expand Up @@ -222,13 +222,6 @@ function test_cassandracluster() {
fail_test "Cassandra pilots did not elect a leader"
fi

seed_label=$(kubectl get pods --namespace "${namespace}" \
cass-${CASS_NAME}-ringnodes-0 \
-o jsonpath='{.metadata.labels.seed}')
if [ "$seed_label" != "true" ]; then
fail_test "First cassandra node not marked as seed"
fi

# Wait 5 minutes for cassandra to start and listen for CQL queries.
if ! retry TIMEOUT=300 cql_connect \
"${namespace}" \
Expand Down Expand Up @@ -325,6 +318,17 @@ function test_cassandracluster() {
fail_test "Second cassandra node did not become ready"
fi

# TODO: A better test would be to query the endpoints and check that only
# the `-0` pods are included. E.g.
# kubectl -n test-cassandra-1519754828-19864 get ep cass-cassandra-1519754828-19864-cassandra-seedprovider -o "jsonpath={.subsets[*].addresses[*].hostname}"
if ! stdout_equals "cass-${CASS_NAME}-ringnodes-0" \
kubectl get pods --namespace "${namespace}" \
--selector=navigator.jetstack.io/cassandra-seed=true \
--output 'jsonpath={.items[*].metadata.name}'
then
fail_test "First cassandra node not marked as seed"
fi

simulate_unresponsive_cassandra_process \
"${namespace}" \
"cass-${CASS_NAME}-ringnodes-0" \
Expand Down
8 changes: 7 additions & 1 deletion pkg/controllers/cassandra/cassandra.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/jetstack/navigator/pkg/controllers/cassandra/pilot"
"github.com/jetstack/navigator/pkg/controllers/cassandra/role"
"github.com/jetstack/navigator/pkg/controllers/cassandra/rolebinding"
"github.com/jetstack/navigator/pkg/controllers/cassandra/seedlabeller"
servicecql "github.com/jetstack/navigator/pkg/controllers/cassandra/service/cql"
serviceseedprovider "github.com/jetstack/navigator/pkg/controllers/cassandra/service/seedprovider"
"github.com/jetstack/navigator/pkg/controllers/cassandra/serviceaccount"
Expand Down Expand Up @@ -110,7 +111,6 @@ func NewCassandra(
nodepool.NewControl(
kubeClient,
statefulSets.Lister(),
pods.Lister(),
recorder,
),
pilot.NewControl(
Expand All @@ -135,6 +135,12 @@ func NewCassandra(
roleBindings.Lister(),
recorder,
),
seedlabeller.NewControl(
kubeClient,
statefulSets.Lister(),
pods.Lister(),
recorder,
),
recorder,
)
cc.recorder = recorder
Expand Down
16 changes: 16 additions & 0 deletions pkg/controllers/cassandra/cluster_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/jetstack/navigator/pkg/controllers/cassandra/pilot"
"github.com/jetstack/navigator/pkg/controllers/cassandra/role"
"github.com/jetstack/navigator/pkg/controllers/cassandra/rolebinding"
"github.com/jetstack/navigator/pkg/controllers/cassandra/seedlabeller"
servicecql "github.com/jetstack/navigator/pkg/controllers/cassandra/service/cql"
serviceseedprovider "github.com/jetstack/navigator/pkg/controllers/cassandra/service/seedprovider"
"github.com/jetstack/navigator/pkg/controllers/cassandra/serviceaccount"
Expand All @@ -27,6 +28,7 @@ const (
MessageErrorSyncService = "Error syncing service: %s"
MessageErrorSyncNodePools = "Error syncing node pools: %s"
MessageErrorSyncPilots = "Error syncing pilots: %s"
MessageErrorSyncSeedLabels = "Error syncing seed labels: %s"
MessageSuccessSync = "Successfully synced CassandraCluster"
)

Expand All @@ -44,6 +46,7 @@ type defaultCassandraClusterControl struct {
serviceAccountControl serviceaccount.Interface
roleControl role.Interface
roleBindingControl rolebinding.Interface
seedLabellerControl seedlabeller.Interface
recorder record.EventRecorder
}

Expand All @@ -55,6 +58,7 @@ func NewControl(
serviceAccountControl serviceaccount.Interface,
roleControl role.Interface,
roleBindingControl rolebinding.Interface,
seedlabellerControl seedlabeller.Interface,
recorder record.EventRecorder,
) ControlInterface {
return &defaultCassandraClusterControl{
Expand All @@ -65,6 +69,7 @@ func NewControl(
serviceAccountControl: serviceAccountControl,
roleControl: roleControl,
roleBindingControl: roleBindingControl,
seedLabellerControl: seedlabellerControl,
recorder: recorder,
}
}
Expand Down Expand Up @@ -148,6 +153,17 @@ func (e *defaultCassandraClusterControl) Sync(c *v1alpha1.CassandraCluster) erro
)
return err
}
err = e.seedLabellerControl.Sync(c)
if err != nil {
e.recorder.Eventf(
c,
apiv1.EventTypeWarning,
ErrorSync,
MessageErrorSyncSeedLabels,
err,
)
return err
}
e.recorder.Event(
c,
apiv1.EventTypeNormal,
Expand Down
37 changes: 0 additions & 37 deletions pkg/controllers/cassandra/nodepool/nodepool.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,9 @@
package nodepool

import (
"fmt"

"github.com/golang/glog"
appsv1beta1 "k8s.io/api/apps/v1beta1"
k8sErrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/client-go/kubernetes"
appslisters "k8s.io/client-go/listers/apps/v1beta1"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/record"

v1alpha1 "github.com/jetstack/navigator/pkg/apis/navigator/v1alpha1"
Expand All @@ -22,7 +17,6 @@ type Interface interface {
type defaultCassandraClusterNodepoolControl struct {
kubeClient kubernetes.Interface
statefulSetLister appslisters.StatefulSetLister
pods corelisters.PodLister
recorder record.EventRecorder
}

Expand All @@ -31,13 +25,11 @@ var _ Interface = &defaultCassandraClusterNodepoolControl{}
func NewControl(
kubeClient kubernetes.Interface,
statefulSetLister appslisters.StatefulSetLister,
pods corelisters.PodLister,
recorder record.EventRecorder,
) Interface {
return &defaultCassandraClusterNodepoolControl{
kubeClient: kubeClient,
statefulSetLister: statefulSetLister,
pods: pods,
recorder: recorder,
}
}
Expand Down Expand Up @@ -76,30 +68,6 @@ func (e *defaultCassandraClusterNodepoolControl) removeUnusedStatefulSets(
return nil
}

func (e *defaultCassandraClusterNodepoolControl) labelSeedNodes(
cluster *v1alpha1.CassandraCluster,
set *appsv1beta1.StatefulSet,
) error {
// TODO: make number of seed nodes configurable
pod, err := e.pods.Pods(cluster.Namespace).Get(fmt.Sprintf("%s-%d", set.Name, 0))
if err != nil {
glog.Warningf("Couldn't get stateful set pod: %v", err)
return nil
}

// only label if the current label is incorrect
if pod.Labels["seed"] != "true" {
podCopy := pod.DeepCopy()
podCopy.Labels["seed"] = "true"
_, err := e.kubeClient.CoreV1().Pods(podCopy.Namespace).Update(podCopy)
if err != nil {
return err
}
}

return nil
}

func (e *defaultCassandraClusterNodepoolControl) createOrUpdateStatefulSet(
cluster *v1alpha1.CassandraCluster,
nodePool *v1alpha1.CassandraClusterNodePool,
Expand All @@ -120,11 +88,6 @@ func (e *defaultCassandraClusterNodepoolControl) createOrUpdateStatefulSet(
return err
}

err = e.labelSeedNodes(cluster, existingSet)
if err != nil {
return err
}

_, err = client.Update(desiredSet)
return err
}
Expand Down
82 changes: 82 additions & 0 deletions pkg/controllers/cassandra/seedlabeller/control.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package seedlabeller

import (
"fmt"

"github.com/golang/glog"
appsv1beta1 "k8s.io/api/apps/v1beta1"
"k8s.io/client-go/kubernetes"
appslisters "k8s.io/client-go/listers/apps/v1beta1"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/record"

"github.com/jetstack/navigator/pkg/apis/navigator/v1alpha1"
"github.com/jetstack/navigator/pkg/controllers/cassandra/service/seedprovider"
"github.com/jetstack/navigator/pkg/controllers/cassandra/util"
)

type Interface interface {
Sync(*v1alpha1.CassandraCluster) error
}

type defaultSeedLabeller struct {
kubeClient kubernetes.Interface
statefulSetLister appslisters.StatefulSetLister
pods corelisters.PodLister
recorder record.EventRecorder
}

var _ Interface = &defaultSeedLabeller{}

func NewControl(
kubeClient kubernetes.Interface,
statefulSetLister appslisters.StatefulSetLister,
pods corelisters.PodLister,
recorder record.EventRecorder,
) Interface {
return &defaultSeedLabeller{
kubeClient: kubeClient,
statefulSetLister: statefulSetLister,
pods: pods,
recorder: recorder,
}
}

func (c *defaultSeedLabeller) labelSeedNodes(
cluster *v1alpha1.CassandraCluster,
set *appsv1beta1.StatefulSet,
) error {
// TODO: make number of seed nodes configurable
pod, err := c.pods.Pods(cluster.Namespace).Get(fmt.Sprintf("%s-%d", set.Name, 0))
if err != nil {
glog.Warningf("Couldn't get stateful set pod: %v", err)
return nil
}
labels := pod.Labels
value := labels[seedprovider.SeedLabelKey]
if value == seedprovider.SeedLabelValue {
return nil
}
if labels == nil {
labels = map[string]string{}
}
labels[seedprovider.SeedLabelKey] = seedprovider.SeedLabelValue
podCopy := pod.DeepCopy()
podCopy.SetLabels(labels)
_, err = c.kubeClient.CoreV1().Pods(podCopy.Namespace).Update(podCopy)
return err
}

func (c *defaultSeedLabeller) Sync(cluster *v1alpha1.CassandraCluster) error {
sets, err := util.StatefulSetsForCluster(cluster, c.statefulSetLister)
if err != nil {
return err
}
for _, s := range sets {
err = c.labelSeedNodes(cluster, s)
if err != nil {
return err
}
}
return nil
}
Loading

0 comments on commit e23ce4e

Please sign in to comment.