diff --git a/contrib/charts/navigator/templates/rbac.yaml b/contrib/charts/navigator/templates/rbac.yaml index 419bb1930..8f232118d 100644 --- a/contrib/charts/navigator/templates/rbac.yaml +++ b/contrib/charts/navigator/templates/rbac.yaml @@ -87,7 +87,7 @@ items: name: "{{ template "fullname" . }}:controller" rules: - apiGroups: ["navigator.jetstack.io"] - resources: ["elasticsearchclusters", "pilots", "elasticsearchclusters/status", "pilots/status", "cassandraclusters"] + resources: ["elasticsearchclusters", "pilots", "elasticsearchclusters/status", "pilots/status", "cassandraclusters", "cassandraclusters/status"] verbs: ["*"] - apiGroups: [""] resources: ["services", "configmaps", "serviceaccounts", "pods"] diff --git a/internal/test/util/generate/generate.go b/internal/test/util/generate/generate.go index 8baac095e..7a452aadb 100644 --- a/internal/test/util/generate/generate.go +++ b/internal/test/util/generate/generate.go @@ -1,6 +1,8 @@ package generate import ( + "testing" + "github.com/coreos/go-semver/semver" apps "k8s.io/api/apps/v1beta1" core "k8s.io/api/core/v1" @@ -118,3 +120,46 @@ func StatefulSet(c StatefulSetConfig) *apps.StatefulSet { }, } } + +func AssertStatefulSetMatches(t *testing.T, expected StatefulSetConfig, actual *apps.StatefulSet) { + if actual.Name != expected.Name { + t.Errorf("Name %q != %q", expected.Name, actual.Name) + } + if actual.Namespace != expected.Namespace { + t.Errorf("Namespace %q != %q", expected.Namespace, actual.Namespace) + } + if expected.Replicas != nil { + if actual.Spec.Replicas == nil { + t.Errorf("Replicas %d != %v", *expected.Replicas, nil) + } else { + if *actual.Spec.Replicas != *expected.Replicas { + t.Errorf("Replicas %d != %d", *expected.Replicas, *actual.Spec.Replicas) + } + } + } +} + +type CassandraClusterConfig struct { + Name, Namespace string +} + +func CassandraCluster(c CassandraClusterConfig) *v1alpha1.CassandraCluster { + return &v1alpha1.CassandraCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: c.Name, + Namespace: c.Namespace, + }, + } +} + +type CassandraClusterNodePoolConfig struct { + Name string + Replicas int32 +} + +func CassandraClusterNodePool(c CassandraClusterNodePoolConfig) *v1alpha1.CassandraClusterNodePool { + return &v1alpha1.CassandraClusterNodePool{ + Name: c.Name, + Replicas: c.Replicas, + } +} diff --git a/pkg/apis/navigator/types.go b/pkg/apis/navigator/types.go index c4f2ddfc0..6a805032f 100644 --- a/pkg/apis/navigator/types.go +++ b/pkg/apis/navigator/types.go @@ -47,7 +47,7 @@ type CassandraClusterStatus struct { } type CassandraClusterNodePoolStatus struct { - ReadyReplicas int64 + ReadyReplicas int32 } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object diff --git a/pkg/apis/navigator/v1alpha1/gen.go b/pkg/apis/navigator/v1alpha1/gen.go new file mode 100644 index 000000000..6e5740aa0 --- /dev/null +++ b/pkg/apis/navigator/v1alpha1/gen.go @@ -0,0 +1,70 @@ +package v1alpha1 + +import ( + "fmt" + "math/rand" + "reflect" + "testing/quick" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func (_ CassandraCluster) Generate(rand *rand.Rand, size int) reflect.Value { + o := CassandraCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("cluster%d", rand.Intn(10)), + Namespace: "", + }, + } + v, ok := quick.Value(reflect.TypeOf(CassandraClusterSpec{}), rand) + if ok { + o.Spec = v.Interface().(CassandraClusterSpec) + } + v, ok = quick.Value(reflect.TypeOf(CassandraClusterStatus{}), rand) + if ok { + o.Status = v.Interface().(CassandraClusterStatus) + } + return reflect.ValueOf(o) +} + +func (_ CassandraClusterSpec) Generate(rand *rand.Rand, size int) reflect.Value { + nodepools := make([]CassandraClusterNodePool, rand.Intn(10)) + for i := range nodepools { + v, ok := quick.Value(reflect.TypeOf(CassandraClusterNodePool{}), rand) + if ok { + nodepools[i] = v.Interface().(CassandraClusterNodePool) + } + } + o := CassandraClusterSpec{ + CqlPort: rand.Int31n(10), + NodePools: nodepools, + } + return reflect.ValueOf(o) +} + +func (_ CassandraClusterNodePool) Generate(rand *rand.Rand, size int) reflect.Value { + o := CassandraClusterNodePool{ + Name: fmt.Sprintf("np%d", rand.Intn(10)), + Replicas: rand.Int31n(10), + } + return reflect.ValueOf(o) +} + +func (_ CassandraClusterStatus) Generate(rand *rand.Rand, size int) reflect.Value { + o := CassandraClusterStatus{ + NodePools: map[string]CassandraClusterNodePoolStatus{}, + } + nodepools := make([]CassandraClusterNodePool, rand.Intn(10)) + for i := range nodepools { + v, ok := quick.Value(reflect.TypeOf(CassandraClusterNodePool{}), rand) + if ok { + nodepools[i] = v.Interface().(CassandraClusterNodePool) + } + } + for _, np := range nodepools { + o.NodePools[np.Name] = CassandraClusterNodePoolStatus{ + ReadyReplicas: np.Replicas, + } + } + return reflect.ValueOf(o) +} diff --git a/pkg/apis/navigator/v1alpha1/types.go b/pkg/apis/navigator/v1alpha1/types.go index 88ed4e22b..4731a2ce6 100644 --- a/pkg/apis/navigator/v1alpha1/types.go +++ b/pkg/apis/navigator/v1alpha1/types.go @@ -44,7 +44,7 @@ type CassandraClusterSpec struct { // CassandraClusterNodePool describes a node pool within a CassandraCluster. type CassandraClusterNodePool struct { Name string `json:"name"` - Replicas int64 `json:"replicas"` + Replicas int32 `json:"replicas"` // Persistence specifies the configuration for persistent data for this // node. @@ -71,7 +71,7 @@ type CassandraClusterStatus struct { } type CassandraClusterNodePoolStatus struct { - ReadyReplicas int64 `json:"readyReplicas"` + ReadyReplicas int32 `json:"readyReplicas"` } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object diff --git a/pkg/apis/navigator/v1alpha1/zz_generated.conversion.go b/pkg/apis/navigator/v1alpha1/zz_generated.conversion.go index 4ae7fce89..7506d404a 100644 --- a/pkg/apis/navigator/v1alpha1/zz_generated.conversion.go +++ b/pkg/apis/navigator/v1alpha1/zz_generated.conversion.go @@ -161,7 +161,7 @@ func Convert_navigator_CassandraClusterList_To_v1alpha1_CassandraClusterList(in func autoConvert_v1alpha1_CassandraClusterNodePool_To_navigator_CassandraClusterNodePool(in *CassandraClusterNodePool, out *navigator.CassandraClusterNodePool, s conversion.Scope) error { out.Name = in.Name - out.Replicas = in.Replicas + out.Replicas = int64(in.Replicas) if err := Convert_v1alpha1_PersistenceConfig_To_navigator_PersistenceConfig(&in.Persistence, &out.Persistence, s); err != nil { return err } @@ -178,7 +178,7 @@ func Convert_v1alpha1_CassandraClusterNodePool_To_navigator_CassandraClusterNode func autoConvert_navigator_CassandraClusterNodePool_To_v1alpha1_CassandraClusterNodePool(in *navigator.CassandraClusterNodePool, out *CassandraClusterNodePool, s conversion.Scope) error { out.Name = in.Name - out.Replicas = in.Replicas + out.Replicas = int32(in.Replicas) if err := Convert_navigator_PersistenceConfig_To_v1alpha1_PersistenceConfig(&in.Persistence, &out.Persistence, s); err != nil { return err } @@ -217,7 +217,17 @@ func autoConvert_v1alpha1_CassandraClusterSpec_To_navigator_CassandraClusterSpec if err := Convert_v1alpha1_NavigatorClusterConfig_To_navigator_NavigatorClusterConfig(&in.NavigatorClusterConfig, &out.NavigatorClusterConfig, s); err != nil { return err } - out.NodePools = *(*[]navigator.CassandraClusterNodePool)(unsafe.Pointer(&in.NodePools)) + if in.NodePools != nil { + in, out := &in.NodePools, &out.NodePools + *out = make([]navigator.CassandraClusterNodePool, len(*in)) + for i := range *in { + if err := Convert_v1alpha1_CassandraClusterNodePool_To_navigator_CassandraClusterNodePool(&(*in)[i], &(*out)[i], s); err != nil { + return err + } + } + } else { + out.NodePools = nil + } out.Image = (*navigator.ImageSpec)(unsafe.Pointer(in.Image)) out.CqlPort = in.CqlPort out.Version = in.Version @@ -233,7 +243,17 @@ func autoConvert_navigator_CassandraClusterSpec_To_v1alpha1_CassandraClusterSpec if err := Convert_navigator_NavigatorClusterConfig_To_v1alpha1_NavigatorClusterConfig(&in.NavigatorClusterConfig, &out.NavigatorClusterConfig, s); err != nil { return err } - out.NodePools = *(*[]CassandraClusterNodePool)(unsafe.Pointer(&in.NodePools)) + if in.NodePools != nil { + in, out := &in.NodePools, &out.NodePools + *out = make([]CassandraClusterNodePool, len(*in)) + for i := range *in { + if err := Convert_navigator_CassandraClusterNodePool_To_v1alpha1_CassandraClusterNodePool(&(*in)[i], &(*out)[i], s); err != nil { + return err + } + } + } else { + out.NodePools = nil + } out.Version = in.Version out.Image = (*ImageSpec)(unsafe.Pointer(in.Image)) out.CqlPort = in.CqlPort diff --git a/pkg/apis/navigator/validation/cassandra.go b/pkg/apis/navigator/validation/cassandra.go index b3c6e1da0..1bb435d5c 100644 --- a/pkg/apis/navigator/validation/cassandra.go +++ b/pkg/apis/navigator/validation/cassandra.go @@ -38,6 +38,7 @@ func ValidateCassandraClusterUpdate(old, new *navigator.CassandraCluster) field. } } } + return allErrs } func ValidateCassandraCluster(c *navigator.CassandraCluster) field.ErrorList { diff --git a/pkg/controllers/cassandra/actions/create_nodepool.go b/pkg/controllers/cassandra/actions/create_nodepool.go new file mode 100644 index 000000000..2e4a81f46 --- /dev/null +++ b/pkg/controllers/cassandra/actions/create_nodepool.go @@ -0,0 +1,30 @@ +package actions + +import ( + k8sErrors "k8s.io/apimachinery/pkg/api/errors" + + "github.com/jetstack/navigator/pkg/apis/navigator/v1alpha1" + "github.com/jetstack/navigator/pkg/controllers" + "github.com/jetstack/navigator/pkg/controllers/cassandra/nodepool" +) + +type CreateNodePool struct { + Cluster *v1alpha1.CassandraCluster + NodePool *v1alpha1.CassandraClusterNodePool +} + +var _ controllers.Action = &CreateNodePool{} + +func (a *CreateNodePool) Name() string { + return "CreateNodePool" +} + +func (a *CreateNodePool) Execute(s *controllers.State) error { + ss := nodepool.StatefulSetForCluster(a.Cluster, a.NodePool) + _, err := s.Clientset.AppsV1beta1().StatefulSets(ss.Namespace).Create(ss) + // XXX: Should this be idempotent? + if k8sErrors.IsAlreadyExists(err) { + return nil + } + return err +} diff --git a/pkg/controllers/cassandra/actions/create_nodepool_test.go b/pkg/controllers/cassandra/actions/create_nodepool_test.go new file mode 100644 index 000000000..5566b2ba7 --- /dev/null +++ b/pkg/controllers/cassandra/actions/create_nodepool_test.go @@ -0,0 +1,95 @@ +package actions_test + +import ( + "testing" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + + "github.com/jetstack/navigator/internal/test/unit/framework" + "github.com/jetstack/navigator/internal/test/util/generate" + "github.com/jetstack/navigator/pkg/controllers/cassandra/actions" +) + +func TestCreateNodePool(t *testing.T) { + type testT struct { + kubeObjects []runtime.Object + navObjects []runtime.Object + cluster generate.CassandraClusterConfig + nodePool generate.CassandraClusterNodePoolConfig + expectedStatefulSet generate.StatefulSetConfig + expectedErr bool + } + tests := map[string]testT{ + "A statefulset is created if one does not already exist": { + cluster: generate.CassandraClusterConfig{ + Name: "cluster1", + Namespace: "ns1", + }, + nodePool: generate.CassandraClusterNodePoolConfig{ + Name: "pool1", + }, + expectedStatefulSet: generate.StatefulSetConfig{ + Name: "cass-cluster1-pool1", + Namespace: "ns1", + Replicas: int32Ptr(0), + }, + }, + "Idempotent: CreateNodePool can be executed again without error": { + kubeObjects: []runtime.Object{ + generate.StatefulSet( + generate.StatefulSetConfig{ + Name: "cass-cluster1-pool1", + Namespace: "ns1", + Replicas: int32Ptr(10), + }, + ), + }, + cluster: generate.CassandraClusterConfig{Name: "cluster1", Namespace: "ns1"}, + nodePool: generate.CassandraClusterNodePoolConfig{ + Name: "pool1", + }, + expectedStatefulSet: generate.StatefulSetConfig{ + Name: "cass-cluster1-pool1", + Namespace: "ns1", + Replicas: int32Ptr(10), + }, + expectedErr: false, + }, + } + + for name, test := range tests { + t.Run( + name, + func(t *testing.T) { + fixture := &framework.StateFixture{ + T: t, + KubeObjects: test.kubeObjects, + NavigatorObjects: test.navObjects, + } + fixture.Start() + defer fixture.Stop() + state := fixture.State() + a := &actions.CreateNodePool{ + Cluster: generate.CassandraCluster(test.cluster), + NodePool: generate.CassandraClusterNodePool(test.nodePool), + } + err := a.Execute(state) + if !test.expectedErr && err != nil { + t.Errorf("Unexpected error: %s", err) + } + if test.expectedErr && err == nil { + t.Errorf("Expected an error") + } + actualStatefulSet, err := fixture.KubeClient(). + AppsV1beta1(). + StatefulSets(test.expectedStatefulSet.Namespace). + Get(test.expectedStatefulSet.Name, metav1.GetOptions{}) + if err != nil { + t.Fatalf("Unexpected error retrieving statefulset: %v", err) + } + generate.AssertStatefulSetMatches(t, test.expectedStatefulSet, actualStatefulSet) + }, + ) + } +} diff --git a/pkg/controllers/cassandra/actions/scaleout.go b/pkg/controllers/cassandra/actions/scaleout.go new file mode 100644 index 000000000..b67303d78 --- /dev/null +++ b/pkg/controllers/cassandra/actions/scaleout.go @@ -0,0 +1,42 @@ +package actions + +import ( + "fmt" + + "github.com/jetstack/navigator/pkg/apis/navigator/v1alpha1" + "github.com/jetstack/navigator/pkg/controllers" + "github.com/jetstack/navigator/pkg/controllers/cassandra/nodepool" +) + +type ScaleOut struct { + Cluster *v1alpha1.CassandraCluster + NodePool *v1alpha1.CassandraClusterNodePool +} + +var _ controllers.Action = &ScaleOut{} + +func (a *ScaleOut) Name() string { + return "ScaleOut" +} + +func (a *ScaleOut) Execute(s *controllers.State) error { + ss := nodepool.StatefulSetForCluster(a.Cluster, a.NodePool) + ss, err := s.StatefulSetLister.StatefulSets(ss.Namespace).Get(ss.Name) + if err != nil { + return err + } + ss = ss.DeepCopy() + if ss.Spec.Replicas == nil || *ss.Spec.Replicas < a.NodePool.Replicas { + ss.Spec.Replicas = &a.NodePool.Replicas + _, err = s.Clientset.AppsV1beta1().StatefulSets(ss.Namespace).Update(ss) + return err + } + if *ss.Spec.Replicas > a.NodePool.Replicas { + return fmt.Errorf( + "the NodePool.Replicas value (%d) "+ + "is less than the existing StatefulSet.Replicas value (%d)", + a.NodePool.Replicas, *ss.Spec.Replicas, + ) + } + return nil +} diff --git a/pkg/controllers/cassandra/actions/scaleout_test.go b/pkg/controllers/cassandra/actions/scaleout_test.go new file mode 100644 index 000000000..b71f514ff --- /dev/null +++ b/pkg/controllers/cassandra/actions/scaleout_test.go @@ -0,0 +1,183 @@ +package actions_test + +import ( + "testing" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + + "github.com/jetstack/navigator/internal/test/unit/framework" + "github.com/jetstack/navigator/internal/test/util/generate" + "github.com/jetstack/navigator/pkg/controllers/cassandra/actions" +) + +func TestScaleOut(t *testing.T) { + type testT struct { + kubeObjects []runtime.Object + navObjects []runtime.Object + cluster generate.CassandraClusterConfig + nodePool generate.CassandraClusterNodePoolConfig + expectedStatefulSet *generate.StatefulSetConfig + expectedErr bool + mutator func(*framework.StateFixture) + } + tests := map[string]testT{ + "Error if StatefulSet not listed": { + cluster: generate.CassandraClusterConfig{ + Name: "cluster1", + Namespace: "ns1", + }, + nodePool: generate.CassandraClusterNodePoolConfig{ + Name: "pool1", + Replicas: 123, + }, + expectedErr: true, + }, + "Error if clientset.Update fails (e.g. listed but not found)": { + kubeObjects: []runtime.Object{ + generate.StatefulSet( + generate.StatefulSetConfig{ + Name: "cass-cluster1-pool1", + Namespace: "ns1", + Replicas: int32Ptr(122), + }, + ), + }, + cluster: generate.CassandraClusterConfig{ + Name: "cluster1", + Namespace: "ns1", + }, + nodePool: generate.CassandraClusterNodePoolConfig{ + Name: "pool1", + Replicas: 123, + }, + expectedErr: true, + mutator: func(f *framework.StateFixture) { + err := f.KubeClient(). + AppsV1beta1(). + StatefulSets("ns1"). + Delete("cass-cluster1-pool1", &metav1.DeleteOptions{}) + if err != nil { + f.T.Fatal(err) + } + }, + }, + "Error if desired ReplicaCount is lower than actual ReplicaCount": { + kubeObjects: []runtime.Object{ + generate.StatefulSet( + generate.StatefulSetConfig{ + Name: "cass-cluster1-pool1", + Namespace: "ns1", + Replicas: int32Ptr(124), + }, + ), + }, + cluster: generate.CassandraClusterConfig{ + Name: "cluster1", + Namespace: "ns1", + }, + nodePool: generate.CassandraClusterNodePoolConfig{ + Name: "pool1", + Replicas: 123, + }, + expectedStatefulSet: &generate.StatefulSetConfig{ + Name: "cass-cluster1-pool1", + Namespace: "ns1", + Replicas: int32Ptr(124), + }, + expectedErr: true, + }, + "Idempotent: No error if ReplicaCount already matches the actual ReplicaCount": { + kubeObjects: []runtime.Object{ + generate.StatefulSet( + generate.StatefulSetConfig{ + Name: "cass-cluster1-pool1", + Namespace: "ns1", + Replicas: int32Ptr(124), + }, + ), + }, + cluster: generate.CassandraClusterConfig{ + Name: "cluster1", + Namespace: "ns1", + }, + nodePool: generate.CassandraClusterNodePoolConfig{ + Name: "pool1", + Replicas: 124, + }, + expectedStatefulSet: &generate.StatefulSetConfig{ + Name: "cass-cluster1-pool1", + Namespace: "ns1", + Replicas: int32Ptr(124), + }, + expectedErr: false, + }, + "The replicas count is incremented": { + kubeObjects: []runtime.Object{ + generate.StatefulSet( + generate.StatefulSetConfig{ + Name: "cass-cluster1-pool1", + Namespace: "ns1", + Replicas: int32Ptr(122), + }, + ), + }, + cluster: generate.CassandraClusterConfig{ + Name: "cluster1", + Namespace: "ns1", + }, + nodePool: generate.CassandraClusterNodePoolConfig{ + Name: "pool1", + Replicas: 123, + }, + expectedStatefulSet: &generate.StatefulSetConfig{ + Name: "cass-cluster1-pool1", + Namespace: "ns1", + Replicas: int32Ptr(123), + }, + }, + } + + for name, test := range tests { + t.Run( + name, + func(t *testing.T) { + fixture := &framework.StateFixture{ + T: t, + KubeObjects: test.kubeObjects, + NavigatorObjects: test.navObjects, + } + fixture.Start() + defer fixture.Stop() + state := fixture.State() + if test.mutator != nil { + test.mutator(fixture) + } + a := &actions.ScaleOut{ + Cluster: generate.CassandraCluster(test.cluster), + NodePool: generate.CassandraClusterNodePool(test.nodePool), + } + err := a.Execute(state) + if err != nil { + t.Logf("The error returned by Execute was: %s", err) + } + if !test.expectedErr && err != nil { + t.Errorf("Unexpected error: %s", err) + } + if test.expectedErr && err == nil { + t.Errorf("Expected an error") + } + if test.expectedStatefulSet != nil { + actualStatefulSet, err := fixture.KubeClient(). + AppsV1beta1(). + StatefulSets(test.expectedStatefulSet.Namespace). + Get(test.expectedStatefulSet.Name, metav1.GetOptions{}) + if err != nil { + t.Fatalf("Unexpected error retrieving statefulset: %v", err) + } + generate.AssertStatefulSetMatches(t, *test.expectedStatefulSet, actualStatefulSet) + } + }, + ) + } +} diff --git a/pkg/controllers/cassandra/actions/util_test.go b/pkg/controllers/cassandra/actions/util_test.go new file mode 100644 index 000000000..1dce439d2 --- /dev/null +++ b/pkg/controllers/cassandra/actions/util_test.go @@ -0,0 +1,9 @@ +package actions_test + +func int32Ptr(i int32) *int32 { + return &i +} + +func int64Ptr(i int64) *int64 { + return &i +} diff --git a/pkg/controllers/cassandra/cassandra.go b/pkg/controllers/cassandra/cassandra.go index c0d626bcc..68f92c3cb 100644 --- a/pkg/controllers/cassandra/cassandra.go +++ b/pkg/controllers/cassandra/cassandra.go @@ -8,6 +8,7 @@ import ( "github.com/golang/glog" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + utilerrors "k8s.io/apimachinery/pkg/util/errors" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" appsinformers "k8s.io/client-go/informers/apps/v1beta1" @@ -20,6 +21,7 @@ import ( rbacinformers "k8s.io/client-go/informers/rbac/v1beta1" + v1alpha1 "github.com/jetstack/navigator/pkg/apis/navigator/v1alpha1" navigatorclientset "github.com/jetstack/navigator/pkg/client/clientset/versioned" navigatorinformers "github.com/jetstack/navigator/pkg/client/informers/externalversions/navigator/v1alpha1" listersv1alpha1 "github.com/jetstack/navigator/pkg/client/listers/navigator/v1alpha1" @@ -53,6 +55,7 @@ type CassandraController struct { roleBindingsListerSynced cache.InformerSynced queue workqueue.RateLimitingInterface recorder record.EventRecorder + navigatorClient navigatorclientset.Interface } func NewCassandra( @@ -74,8 +77,9 @@ func NewCassandra( ) cc := &CassandraController{ - queue: queue, - recorder: recorder, + queue: queue, + recorder: recorder, + navigatorClient: naviClient, } cassClusters.Informer().AddEventHandler( &controllers.QueuingEventHandler{Queue: queue}, @@ -86,6 +90,12 @@ func NewCassandra( WorkFunc: cc.handlePodObject, }, ) + // An event handler to trigger status updates when statefulsets change + statefulSets.Informer().AddEventHandler( + &controllers.BlockingEventHandler{ + WorkFunc: cc.handleObject, + }, + ) cc.cassLister = cassClusters.Lister() cc.statefulSetLister = statefulSets.Lister() cc.cassListerSynced = cassClusters.Informer().HasSynced @@ -135,6 +145,10 @@ func NewCassandra( recorder, ), recorder, + &controllers.State{ + Clientset: kubeClient, + StatefulSetLister: statefulSets.Lister(), + }, ) cc.recorder = recorder return cc @@ -228,7 +242,16 @@ func (e *CassandraController) sync(key string) (err error) { ) return err } - return e.control.Sync(cass.DeepCopy()) + status, err := e.control.Sync(cass) + updateErr := e.updateStatus(cass, status) + return utilerrors.NewAggregate([]error{err, updateErr}) +} + +func (e *CassandraController) updateStatus(c *v1alpha1.CassandraCluster, status v1alpha1.CassandraClusterStatus) error { + copy := c.DeepCopy() + copy.Status = status + _, err := e.navigatorClient.NavigatorV1alpha1().CassandraClusters(c.Namespace).UpdateStatus(copy) + return err } func (e *CassandraController) enqueueCassandraCluster(obj interface{}) { diff --git a/pkg/controllers/cassandra/cluster_control.go b/pkg/controllers/cassandra/cluster_control.go index f9bc0be47..3464cfc5c 100644 --- a/pkg/controllers/cassandra/cluster_control.go +++ b/pkg/controllers/cassandra/cluster_control.go @@ -6,6 +6,8 @@ import ( "k8s.io/client-go/tools/record" v1alpha1 "github.com/jetstack/navigator/pkg/apis/navigator/v1alpha1" + "github.com/jetstack/navigator/pkg/controllers" + "github.com/jetstack/navigator/pkg/controllers/cassandra/actions" "github.com/jetstack/navigator/pkg/controllers/cassandra/nodepool" "github.com/jetstack/navigator/pkg/controllers/cassandra/pilot" "github.com/jetstack/navigator/pkg/controllers/cassandra/role" @@ -27,11 +29,12 @@ const ( MessageErrorSyncService = "Error syncing service: %s" MessageErrorSyncNodePools = "Error syncing node pools: %s" MessageErrorSyncPilots = "Error syncing pilots: %s" + MessageErrorSync = "Error syncing: %s" MessageSuccessSync = "Successfully synced CassandraCluster" ) type ControlInterface interface { - Sync(*v1alpha1.CassandraCluster) error + Sync(*v1alpha1.CassandraCluster) (v1alpha1.CassandraClusterStatus, error) } var _ ControlInterface = &defaultCassandraClusterControl{} @@ -45,6 +48,7 @@ type defaultCassandraClusterControl struct { roleControl role.Interface roleBindingControl rolebinding.Interface recorder record.EventRecorder + state *controllers.State } func NewControl( @@ -56,6 +60,7 @@ func NewControl( roleControl role.Interface, roleBindingControl rolebinding.Interface, recorder record.EventRecorder, + state *controllers.State, ) ControlInterface { return &defaultCassandraClusterControl{ seedProviderServiceControl: seedProviderServiceControl, @@ -66,10 +71,12 @@ func NewControl( roleControl: roleControl, roleBindingControl: roleBindingControl, recorder: recorder, + state: state, } } -func (e *defaultCassandraClusterControl) Sync(c *v1alpha1.CassandraCluster) error { +func (e *defaultCassandraClusterControl) Sync(c *v1alpha1.CassandraCluster) (v1alpha1.CassandraClusterStatus, error) { + c = c.DeepCopy() glog.V(4).Infof("defaultCassandraClusterControl.Sync") err := e.seedProviderServiceControl.Sync(c) if err != nil { @@ -80,7 +87,7 @@ func (e *defaultCassandraClusterControl) Sync(c *v1alpha1.CassandraCluster) erro MessageErrorSyncService, err, ) - return err + return c.Status, err } err = e.cqlServiceControl.Sync(c) if err != nil { @@ -91,7 +98,7 @@ func (e *defaultCassandraClusterControl) Sync(c *v1alpha1.CassandraCluster) erro MessageErrorSyncService, err, ) - return err + return c.Status, err } err = e.nodepoolControl.Sync(c) if err != nil { @@ -102,7 +109,7 @@ func (e *defaultCassandraClusterControl) Sync(c *v1alpha1.CassandraCluster) erro MessageErrorSyncNodePools, err, ) - return err + return c.Status, err } err = e.pilotControl.Sync(c) if err != nil { @@ -113,7 +120,7 @@ func (e *defaultCassandraClusterControl) Sync(c *v1alpha1.CassandraCluster) erro MessageErrorSyncPilots, err, ) - return err + return c.Status, err } err = e.serviceAccountControl.Sync(c) if err != nil { @@ -124,7 +131,7 @@ func (e *defaultCassandraClusterControl) Sync(c *v1alpha1.CassandraCluster) erro MessageErrorSyncServiceAccount, err, ) - return err + return c.Status, err } err = e.roleControl.Sync(c) if err != nil { @@ -135,7 +142,7 @@ func (e *defaultCassandraClusterControl) Sync(c *v1alpha1.CassandraCluster) erro MessageErrorSyncRole, err, ) - return err + return c.Status, err } err = e.roleBindingControl.Sync(c) if err != nil { @@ -146,13 +153,51 @@ func (e *defaultCassandraClusterControl) Sync(c *v1alpha1.CassandraCluster) erro MessageErrorSyncRoleBinding, err, ) - return err + return c.Status, err } + + a := NextAction(c) + if a != nil { + err = a.Execute(e.state) + if err != nil { + e.recorder.Eventf( + c, + apiv1.EventTypeWarning, + ErrorSync, + MessageErrorSync, + err, + ) + return c.Status, err + } + } + e.recorder.Event( c, apiv1.EventTypeNormal, SuccessSync, MessageSuccessSync, ) + return c.Status, nil +} + +func NextAction(c *v1alpha1.CassandraCluster) controllers.Action { + for _, np := range c.Spec.NodePools { + _, found := c.Status.NodePools[np.Name] + if !found { + return &actions.CreateNodePool{ + Cluster: c, + NodePool: &np, + } + } + } + for _, np := range c.Spec.NodePools { + nps := c.Status.NodePools[np.Name] + if np.Replicas > nps.ReadyReplicas { + return &actions.ScaleOut{ + Cluster: c, + NodePool: &np, + } + } + } return nil } diff --git a/pkg/controllers/cassandra/cluster_control_test.go b/pkg/controllers/cassandra/cluster_control_test.go new file mode 100644 index 000000000..dbbdb090b --- /dev/null +++ b/pkg/controllers/cassandra/cluster_control_test.go @@ -0,0 +1,92 @@ +package cassandra_test + +import ( + "testing" + "testing/quick" + + v1alpha1 "github.com/jetstack/navigator/pkg/apis/navigator/v1alpha1" + "github.com/jetstack/navigator/pkg/controllers/cassandra" + "github.com/jetstack/navigator/pkg/controllers/cassandra/actions" +) + +// func TestNextAction(t *testing.T) { +// cases := map[string]struct { +// c *v1alpha1.CassandraCluster +// a controllers.Action +// }{ +// "scale up": { +// c: &v1alpha1.CassandraCluster{ +// ObjectMeta: metav1.ObjectMeta{ +// Name: "bar", +// Namespace: "foo", +// }, +// Spec: v1alpha1.CassandraClusterSpec{ +// NodePools: []v1alpha1.CassandraClusterNodePool{ +// { +// Name: "np1", +// Replicas: 2, +// }, +// }, +// }, +// Status: v1alpha1.CassandraClusterStatus{ +// NodePools: map[string]v1alpha1.CassandraClusterNodePoolStatus{ +// "np1": { +// ReadyReplicas: 1, +// }, +// }, +// }, +// }, +// a: &actions.ScaleOut{ +// Namespace: "foo", +// Cluster: "bar", +// NodePool: "np1", +// Replicas: 2, +// }, +// }, +// } + +// for title, test := range cases { +// t.Run( +// title, +// func(t *testing.T) { +// a := cassandra.NextAction(test.c) +// if !reflect.DeepEqual(test.a, a) { +// t.Errorf("Expected did not equal actual: %s", pretty.Diff(test.a, a)) +// } +// }, +// ) +// } +// } + +func TestQuick(t *testing.T) { + f := func(c v1alpha1.CassandraCluster) bool { + a := cassandra.NextAction(&c) + + switch action := a.(type) { + case *actions.CreateNodePool: + _, found := c.Status.NodePools[action.NodePool.Name] + if found { + t.Errorf("Unexpected attempt to create a nodepool when there's an existing status") + return false + } + case *actions.ScaleOut: + nps, found := c.Status.NodePools[action.NodePool.Name] + if !found { + t.Errorf("Unexpected attempt to scale up a nodepool without a status") + return false + } + if action.NodePool.Replicas <= nps.ReadyReplicas { + t.Errorf("Unexpected attempt to scale up a nodepool with >= ready replicas") + return false + } + } + return true + } + config := &quick.Config{ + MaxCount: 1000, + } + err := quick.Check(f, config) + if err != nil { + t.Errorf("quick check failure: %#v", err) + } +} diff --git a/pkg/controllers/cassandra/nodepool/nodepool.go b/pkg/controllers/cassandra/nodepool/nodepool.go index 8f4d34494..7690b863d 100644 --- a/pkg/controllers/cassandra/nodepool/nodepool.go +++ b/pkg/controllers/cassandra/nodepool/nodepool.go @@ -1,7 +1,8 @@ package nodepool import ( - k8sErrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/api/apps/v1beta1" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/kubernetes" appslisters "k8s.io/client-go/listers/apps/v1beta1" "k8s.io/client-go/tools/record" @@ -34,76 +35,65 @@ func NewControl( } } -func (e *defaultCassandraClusterNodepoolControl) removeUnusedStatefulSets( +func (e *defaultCassandraClusterNodepoolControl) clusterStatefulSets( cluster *v1alpha1.CassandraCluster, -) error { - expectedStatefulSetNames := map[string]bool{} - for _, pool := range cluster.Spec.NodePools { - name := util.NodePoolResourceName(cluster, &pool) - expectedStatefulSetNames[name] = true - } - client := e.kubeClient.AppsV1beta1().StatefulSets(cluster.Namespace) +) (results map[string]*v1beta1.StatefulSet, err error) { + results = map[string]*v1beta1.StatefulSet{} lister := e.statefulSetLister.StatefulSets(cluster.Namespace) selector, err := util.SelectorForCluster(cluster) if err != nil { - return err + return nil, err } existingSets, err := lister.List(selector) if err != nil { - return err + return nil, err } for _, set := range existingSets { err := util.OwnerCheck(set, cluster) if err != nil { - return err - } - _, found := expectedStatefulSetNames[set.Name] - if !found { - err := client.Delete(set.Name, nil) - if err != nil { - return err - } + continue } + results[set.Name] = set } - return nil + return results, nil } -func (e *defaultCassandraClusterNodepoolControl) createOrUpdateStatefulSet( - cluster *v1alpha1.CassandraCluster, - nodePool *v1alpha1.CassandraClusterNodePool, -) error { - desiredSet := StatefulSetForCluster(cluster, nodePool) - client := e.kubeClient.AppsV1beta1().StatefulSets(cluster.Namespace) - lister := e.statefulSetLister.StatefulSets(desiredSet.Namespace) - existingSet, err := lister.Get(desiredSet.Name) - if k8sErrors.IsNotFound(err) { - _, err = client.Create(desiredSet) - return err +// Add a NodePoolStatus for each NodePool, only if a corresponding StatefulSet is found. +// Update the NodePoolStatus for each NodePool, using values from the corresponding StatefulSet. +// Remove the NodePoolStatus for NodePools that do not have a StatefulSet +// (the statefulset has been deleted unexpectedly) +// Remove the NodePoolStatus if there is no corresponding NodePool. +// (the statefulset has been removed by a DeleteNodePool action - not yet implemented) +func (e *defaultCassandraClusterNodepoolControl) Sync(cluster *v1alpha1.CassandraCluster) error { + if cluster.Status.NodePools == nil { + cluster.Status.NodePools = map[string]v1alpha1.CassandraClusterNodePoolStatus{} } + ssList, err := e.clusterStatefulSets(cluster) if err != nil { return err } - err = util.OwnerCheck(existingSet, cluster) - if err != nil { - return err + nodePoolNames := sets.NewString() + for _, np := range cluster.Spec.NodePools { + nodePoolNames.Insert(np.Name) + ssName := util.NodePoolResourceName(cluster, &np) + ss, setFound := ssList[ssName] + nps, npsFound := cluster.Status.NodePools[np.Name] + if setFound { + if !npsFound { + cluster.Status.NodePools[np.Name] = nps + } + if nps.ReadyReplicas != ss.Status.ReadyReplicas { + nps.ReadyReplicas = ss.Status.ReadyReplicas + cluster.Status.NodePools[np.Name] = nps + } + } else { + delete(cluster.Status.NodePools, np.Name) + } } - _, err = client.Update(desiredSet) - return err -} - -func (e *defaultCassandraClusterNodepoolControl) syncStatefulSets( - cluster *v1alpha1.CassandraCluster, -) error { - for _, pool := range cluster.Spec.NodePools { - err := e.createOrUpdateStatefulSet(cluster, &pool) - if err != nil { - return err + for npName := range cluster.Status.NodePools { + if !nodePoolNames.Has(npName) { + delete(cluster.Status.NodePools, npName) } } - err := e.removeUnusedStatefulSets(cluster) - return err -} - -func (e *defaultCassandraClusterNodepoolControl) Sync(cluster *v1alpha1.CassandraCluster) error { - return e.syncStatefulSets(cluster) + return nil } diff --git a/pkg/controllers/cassandra/nodepool/nodepool_test.go b/pkg/controllers/cassandra/nodepool/nodepool_test.go index cd61ae996..daf83d9ac 100644 --- a/pkg/controllers/cassandra/nodepool/nodepool_test.go +++ b/pkg/controllers/cassandra/nodepool/nodepool_test.go @@ -13,12 +13,15 @@ func TestNodePoolControlSync(t *testing.T) { "create a statefulset", func(t *testing.T) { f := casstesting.NewFixture(t) - f.Run() + status := f.Run() f.AssertStatefulSetsLength(1) + if len(status.NodePools) != 0 { + t.Errorf("Expected no nodepool status. Found: %#v", status.NodePools) + } }, ) t.Run( - "ignore existing statefulset", + "add NodePoolStatus if a matching StatefulSet exists", func(t *testing.T) { f := casstesting.NewFixture(t) f.AddObjectK( @@ -27,73 +30,127 @@ func TestNodePoolControlSync(t *testing.T) { &f.Cluster.Spec.NodePools[0], ), ) - f.Run() + status := f.Run() f.AssertStatefulSetsLength(1) + if len(status.NodePools) != 1 { + t.Errorf("Expected one nodepool status. Found: %#v", status.NodePools) + } }, ) t.Run( - "update statefulset", + "update NodePoolStatus.ReadyReplicas to match StatefulSet.ReadyReplicas", func(t *testing.T) { f := casstesting.NewFixture(t) - unsyncedSet := nodepool.StatefulSetForCluster( + np := &f.Cluster.Spec.NodePools[0] + ss := nodepool.StatefulSetForCluster( f.Cluster, - &f.Cluster.Spec.NodePools[0], + np, ) - unsyncedSet.SetLabels(map[string]string{}) - f.AddObjectK(unsyncedSet) - f.Run() + ss.Status.ReadyReplicas = np.Replicas + f.AddObjectK(ss) + status := f.Run() f.AssertStatefulSetsLength(1) - sets := f.StatefulSets() - set := sets.Items[0] - labels := set.GetLabels() - if len(labels) == 0 { - t.Log(set) - t.Error("StatefulSet was not updated") + if np.Replicas != status.NodePools[np.Name].ReadyReplicas { + t.Errorf( + "Unexpected NodePoolStatus.ReadyReplicas: %d != %d", + np.Replicas, + status.NodePools[np.Name].ReadyReplicas, + ) } }, ) t.Run( - "error on update foreign statefulset", + "remove NodePoolStatus if no matching StatefulSet exists", func(t *testing.T) { f := casstesting.NewFixture(t) - foreignUnsyncedSet := nodepool.StatefulSetForCluster( - f.Cluster, - &f.Cluster.Spec.NodePools[0], - ) - foreignUnsyncedSet.SetLabels(map[string]string{}) - foreignUnsyncedSet.OwnerReferences = nil - f.AddObjectK(foreignUnsyncedSet) - f.RunExpectError() + np := f.Cluster.Spec.NodePools[0] + f.Cluster.Status.NodePools = map[string]v1alpha1.CassandraClusterNodePoolStatus{ + np.Name: {}, + } + status := f.Run() + if _, found := status.NodePools[np.Name]; found { + t.Error("Orphan NodePoolStatus was not deleted:", status) + } }, ) + t.Run( - "delete statefulset without nodepool", + "remove NodePoolStatus if no matching NodePool exists", func(t *testing.T) { f := casstesting.NewFixture(t) - f.AddObjectK( - nodepool.StatefulSetForCluster( - f.Cluster, - &f.Cluster.Spec.NodePools[0], - ), - ) - f.Cluster.Spec.NodePools = []v1alpha1.CassandraClusterNodePool{} - f.Run() - f.AssertStatefulSetsLength(0) + f.Cluster.Status.NodePools = map[string]v1alpha1.CassandraClusterNodePoolStatus{ + "orphan-status-1234": {}, + } + status := f.Run() + f.AssertStatefulSetsLength(1) + if _, found := status.NodePools["orphan-status-1234"]; found { + t.Error("Orphan NodePoolStatus was not deleted:", status) + } }, ) - t.Run( - "do not delete foreign owned stateful sets", - func(t *testing.T) { - f := casstesting.NewFixture(t) - foreignStatefulSet := nodepool.StatefulSetForCluster( - f.Cluster, - &f.Cluster.Spec.NodePools[0], - ) - foreignStatefulSet.OwnerReferences = nil + // t.Run( + // "update statefulset", + // func(t *testing.T) { + // f := casstesting.NewFixture(t) + // unsyncedSet := nodepool.StatefulSetForCluster( + // f.Cluster, + // &f.Cluster.Spec.NodePools[0], + // ) + // unsyncedSet.SetLabels(map[string]string{}) + // f.AddObjectK(unsyncedSet) + // f.Run() + // f.AssertStatefulSetsLength(1) + // sets := f.StatefulSets() + // set := sets.Items[0] + // labels := set.GetLabels() + // if len(labels) == 0 { + // t.Log(set) + // t.Error("StatefulSet was not updated") + // } + // }, + // ) + // t.Run( + // "error on update foreign statefulset", + // func(t *testing.T) { + // f := casstesting.NewFixture(t) + // foreignUnsyncedSet := nodepool.StatefulSetForCluster( + // f.Cluster, + // &f.Cluster.Spec.NodePools[0], + // ) + // foreignUnsyncedSet.SetLabels(map[string]string{}) + // foreignUnsyncedSet.OwnerReferences = nil + // f.AddObjectK(foreignUnsyncedSet) + // f.RunExpectError() + // }, + // ) + // t.Run( + // "delete statefulset without nodepool", + // func(t *testing.T) { + // f := casstesting.NewFixture(t) + // f.AddObjectK( + // nodepool.StatefulSetForCluster( + // f.Cluster, + // &f.Cluster.Spec.NodePools[0], + // ), + // ) + // f.Cluster.Spec.NodePools = []v1alpha1.CassandraClusterNodePool{} + // f.Run() + // f.AssertStatefulSetsLength(0) + // }, + // ) + // t.Run( + // "do not delete foreign owned stateful sets", + // func(t *testing.T) { + // f := casstesting.NewFixture(t) + // foreignStatefulSet := nodepool.StatefulSetForCluster( + // f.Cluster, + // &f.Cluster.Spec.NodePools[0], + // ) + // foreignStatefulSet.OwnerReferences = nil - f.AddObjectK(foreignStatefulSet) - f.Cluster.Spec.NodePools = []v1alpha1.CassandraClusterNodePool{} - f.RunExpectError() - }, - ) + // f.AddObjectK(foreignStatefulSet) + // f.Cluster.Spec.NodePools = []v1alpha1.CassandraClusterNodePool{} + // f.RunExpectError() + // }, + // ) } diff --git a/pkg/controllers/cassandra/nodepool/resource.go b/pkg/controllers/cassandra/nodepool/resource.go index 415c43fb5..d27a28af1 100644 --- a/pkg/controllers/cassandra/nodepool/resource.go +++ b/pkg/controllers/cassandra/nodepool/resource.go @@ -35,7 +35,6 @@ func StatefulSetForCluster( cluster *v1alpha1.CassandraCluster, np *v1alpha1.CassandraClusterNodePool, ) *apps.StatefulSet { - statefulSetName := util.NodePoolResourceName(cluster, np) seedProviderServiceName := util.SeedProviderServiceName(cluster) nodePoolLabels := util.NodePoolLabels(cluster, np.Name) @@ -58,7 +57,7 @@ func StatefulSetForCluster( OwnerReferences: []metav1.OwnerReference{util.NewControllerRef(cluster)}, }, Spec: apps.StatefulSetSpec{ - Replicas: util.Int32Ptr(int32(np.Replicas)), + Replicas: util.Int32Ptr(int32(0)), ServiceName: seedProviderServiceName, Selector: &metav1.LabelSelector{ MatchLabels: nodePoolLabels, diff --git a/pkg/controllers/cassandra/testing/testing.go b/pkg/controllers/cassandra/testing/testing.go index 5b7415b94..dd9ebdcb7 100644 --- a/pkg/controllers/cassandra/testing/testing.go +++ b/pkg/controllers/cassandra/testing/testing.go @@ -6,6 +6,7 @@ import ( rbacv1 "k8s.io/api/rbac/v1beta1" navinformers "github.com/jetstack/navigator/pkg/client/informers/externalversions" + "github.com/jetstack/navigator/pkg/controllers" "github.com/jetstack/navigator/pkg/apis/navigator/v1alpha1" "github.com/jetstack/navigator/pkg/controllers/cassandra" @@ -78,7 +79,7 @@ func (f *Fixture) AddObjectN(o runtime.Object) { f.naviObjects = append(f.naviObjects, o) } -func (f *Fixture) setupAndSync() error { +func (f *Fixture) setupAndSync() (v1alpha1.CassandraClusterStatus, error) { recorder := record.NewFakeRecorder(0) finished := make(chan struct{}) defer func() { @@ -159,6 +160,10 @@ func (f *Fixture) setupAndSync() error { f.RoleControl, f.RoleBindingControl, recorder, + &controllers.State{ + Clientset: f.k8sClient, + StatefulSetLister: statefulSets, + }, ) stopCh := make(chan struct{}) defer close(stopCh) @@ -179,18 +184,20 @@ func (f *Fixture) setupAndSync() error { return c.Sync(f.Cluster) } -func (f *Fixture) Run() { - err := f.setupAndSync() +func (f *Fixture) Run() v1alpha1.CassandraClusterStatus { + status, err := f.setupAndSync() if err != nil { f.t.Error(err) } + return status } -func (f *Fixture) RunExpectError() { - err := f.setupAndSync() +func (f *Fixture) RunExpectError() v1alpha1.CassandraClusterStatus { + status, err := f.setupAndSync() if err == nil { f.t.Error("Sync was expected to return an error. Got nil.") } + return status } func (f *Fixture) Services() *v1.ServiceList {