Skip to content
This repository has been archived by the owner on Apr 4, 2023. It is now read-only.

Commit

Permalink
Add CreateNodePool and ScaleOut actions for Cassandra.
Browse files Browse the repository at this point in the history
These become the only changes supported by the Cassandra controller until ScaleIn and  CassandraUpgrade actions are implemented in followup branches.

Fixes: #253
  • Loading branch information
wallrj committed Feb 27, 2018
1 parent 63ba777 commit 42f02a1
Show file tree
Hide file tree
Showing 19 changed files with 834 additions and 126 deletions.
2 changes: 1 addition & 1 deletion contrib/charts/navigator/templates/rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ items:
name: "{{ template "fullname" . }}:controller"
rules:
- apiGroups: ["navigator.jetstack.io"]
resources: ["elasticsearchclusters", "pilots", "elasticsearchclusters/status", "pilots/status", "cassandraclusters"]
resources: ["elasticsearchclusters", "pilots", "elasticsearchclusters/status", "pilots/status", "cassandraclusters", "cassandraclusters/status"]
verbs: ["*"]
- apiGroups: [""]
resources: ["services", "configmaps", "serviceaccounts", "pods"]
Expand Down
45 changes: 45 additions & 0 deletions internal/test/util/generate/generate.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package generate

import (
"testing"

"github.com/coreos/go-semver/semver"
apps "k8s.io/api/apps/v1beta1"
core "k8s.io/api/core/v1"
Expand Down Expand Up @@ -118,3 +120,46 @@ func StatefulSet(c StatefulSetConfig) *apps.StatefulSet {
},
}
}

func AssertStatefulSetMatches(t *testing.T, expected StatefulSetConfig, actual *apps.StatefulSet) {
if actual.Name != expected.Name {
t.Errorf("Name %q != %q", expected.Name, actual.Name)
}
if actual.Namespace != expected.Namespace {
t.Errorf("Namespace %q != %q", expected.Namespace, actual.Namespace)
}
if expected.Replicas != nil {
if actual.Spec.Replicas == nil {
t.Errorf("Replicas %d != %v", *expected.Replicas, nil)
} else {
if *actual.Spec.Replicas != *expected.Replicas {
t.Errorf("Replicas %d != %d", *expected.Replicas, *actual.Spec.Replicas)
}
}
}
}

type CassandraClusterConfig struct {
Name, Namespace string
}

func CassandraCluster(c CassandraClusterConfig) *v1alpha1.CassandraCluster {
return &v1alpha1.CassandraCluster{
ObjectMeta: metav1.ObjectMeta{
Name: c.Name,
Namespace: c.Namespace,
},
}
}

type CassandraClusterNodePoolConfig struct {
Name string
Replicas int32
}

func CassandraClusterNodePool(c CassandraClusterNodePoolConfig) *v1alpha1.CassandraClusterNodePool {
return &v1alpha1.CassandraClusterNodePool{
Name: c.Name,
Replicas: c.Replicas,
}
}
2 changes: 1 addition & 1 deletion pkg/apis/navigator/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type CassandraClusterStatus struct {
}

type CassandraClusterNodePoolStatus struct {
ReadyReplicas int64
ReadyReplicas int32
}

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
Expand Down
70 changes: 70 additions & 0 deletions pkg/apis/navigator/v1alpha1/gen.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package v1alpha1

import (
"fmt"
"math/rand"
"reflect"
"testing/quick"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func (_ CassandraCluster) Generate(rand *rand.Rand, size int) reflect.Value {
o := CassandraCluster{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("cluster%d", rand.Intn(10)),
Namespace: "",
},
}
v, ok := quick.Value(reflect.TypeOf(CassandraClusterSpec{}), rand)
if ok {
o.Spec = v.Interface().(CassandraClusterSpec)
}
v, ok = quick.Value(reflect.TypeOf(CassandraClusterStatus{}), rand)
if ok {
o.Status = v.Interface().(CassandraClusterStatus)
}
return reflect.ValueOf(o)
}

func (_ CassandraClusterSpec) Generate(rand *rand.Rand, size int) reflect.Value {
nodepools := make([]CassandraClusterNodePool, rand.Intn(10))
for i := range nodepools {
v, ok := quick.Value(reflect.TypeOf(CassandraClusterNodePool{}), rand)
if ok {
nodepools[i] = v.Interface().(CassandraClusterNodePool)
}
}
o := CassandraClusterSpec{
CqlPort: rand.Int31n(10),
NodePools: nodepools,
}
return reflect.ValueOf(o)
}

func (_ CassandraClusterNodePool) Generate(rand *rand.Rand, size int) reflect.Value {
o := CassandraClusterNodePool{
Name: fmt.Sprintf("np%d", rand.Intn(10)),
Replicas: rand.Int31n(10),
}
return reflect.ValueOf(o)
}

func (_ CassandraClusterStatus) Generate(rand *rand.Rand, size int) reflect.Value {
o := CassandraClusterStatus{
NodePools: map[string]CassandraClusterNodePoolStatus{},
}
nodepools := make([]CassandraClusterNodePool, rand.Intn(10))
for i := range nodepools {
v, ok := quick.Value(reflect.TypeOf(CassandraClusterNodePool{}), rand)
if ok {
nodepools[i] = v.Interface().(CassandraClusterNodePool)
}
}
for _, np := range nodepools {
o.NodePools[np.Name] = CassandraClusterNodePoolStatus{
ReadyReplicas: np.Replicas,
}
}
return reflect.ValueOf(o)
}
4 changes: 2 additions & 2 deletions pkg/apis/navigator/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type CassandraClusterSpec struct {
// CassandraClusterNodePool describes a node pool within a CassandraCluster.
type CassandraClusterNodePool struct {
Name string `json:"name"`
Replicas int64 `json:"replicas"`
Replicas int32 `json:"replicas"`

// Persistence specifies the configuration for persistent data for this
// node.
Expand All @@ -71,7 +71,7 @@ type CassandraClusterStatus struct {
}

type CassandraClusterNodePoolStatus struct {
ReadyReplicas int64 `json:"readyReplicas"`
ReadyReplicas int32 `json:"readyReplicas"`
}

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
Expand Down
28 changes: 24 additions & 4 deletions pkg/apis/navigator/v1alpha1/zz_generated.conversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func Convert_navigator_CassandraClusterList_To_v1alpha1_CassandraClusterList(in

func autoConvert_v1alpha1_CassandraClusterNodePool_To_navigator_CassandraClusterNodePool(in *CassandraClusterNodePool, out *navigator.CassandraClusterNodePool, s conversion.Scope) error {
out.Name = in.Name
out.Replicas = in.Replicas
out.Replicas = int64(in.Replicas)
if err := Convert_v1alpha1_PersistenceConfig_To_navigator_PersistenceConfig(&in.Persistence, &out.Persistence, s); err != nil {
return err
}
Expand All @@ -178,7 +178,7 @@ func Convert_v1alpha1_CassandraClusterNodePool_To_navigator_CassandraClusterNode

func autoConvert_navigator_CassandraClusterNodePool_To_v1alpha1_CassandraClusterNodePool(in *navigator.CassandraClusterNodePool, out *CassandraClusterNodePool, s conversion.Scope) error {
out.Name = in.Name
out.Replicas = in.Replicas
out.Replicas = int32(in.Replicas)
if err := Convert_navigator_PersistenceConfig_To_v1alpha1_PersistenceConfig(&in.Persistence, &out.Persistence, s); err != nil {
return err
}
Expand Down Expand Up @@ -217,7 +217,17 @@ func autoConvert_v1alpha1_CassandraClusterSpec_To_navigator_CassandraClusterSpec
if err := Convert_v1alpha1_NavigatorClusterConfig_To_navigator_NavigatorClusterConfig(&in.NavigatorClusterConfig, &out.NavigatorClusterConfig, s); err != nil {
return err
}
out.NodePools = *(*[]navigator.CassandraClusterNodePool)(unsafe.Pointer(&in.NodePools))
if in.NodePools != nil {
in, out := &in.NodePools, &out.NodePools
*out = make([]navigator.CassandraClusterNodePool, len(*in))
for i := range *in {
if err := Convert_v1alpha1_CassandraClusterNodePool_To_navigator_CassandraClusterNodePool(&(*in)[i], &(*out)[i], s); err != nil {
return err
}
}
} else {
out.NodePools = nil
}
out.Image = (*navigator.ImageSpec)(unsafe.Pointer(in.Image))
out.CqlPort = in.CqlPort
out.Version = in.Version
Expand All @@ -233,7 +243,17 @@ func autoConvert_navigator_CassandraClusterSpec_To_v1alpha1_CassandraClusterSpec
if err := Convert_navigator_NavigatorClusterConfig_To_v1alpha1_NavigatorClusterConfig(&in.NavigatorClusterConfig, &out.NavigatorClusterConfig, s); err != nil {
return err
}
out.NodePools = *(*[]CassandraClusterNodePool)(unsafe.Pointer(&in.NodePools))
if in.NodePools != nil {
in, out := &in.NodePools, &out.NodePools
*out = make([]CassandraClusterNodePool, len(*in))
for i := range *in {
if err := Convert_navigator_CassandraClusterNodePool_To_v1alpha1_CassandraClusterNodePool(&(*in)[i], &(*out)[i], s); err != nil {
return err
}
}
} else {
out.NodePools = nil
}
out.Version = in.Version
out.Image = (*ImageSpec)(unsafe.Pointer(in.Image))
out.CqlPort = in.CqlPort
Expand Down
1 change: 1 addition & 0 deletions pkg/apis/navigator/validation/cassandra.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ func ValidateCassandraClusterUpdate(old, new *navigator.CassandraCluster) field.
}
}
}
return allErrs
}

func ValidateCassandraCluster(c *navigator.CassandraCluster) field.ErrorList {
Expand Down
30 changes: 30 additions & 0 deletions pkg/controllers/cassandra/actions/create_nodepool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package actions

import (
k8sErrors "k8s.io/apimachinery/pkg/api/errors"

"github.com/jetstack/navigator/pkg/apis/navigator/v1alpha1"
"github.com/jetstack/navigator/pkg/controllers"
"github.com/jetstack/navigator/pkg/controllers/cassandra/nodepool"
)

type CreateNodePool struct {
Cluster *v1alpha1.CassandraCluster
NodePool *v1alpha1.CassandraClusterNodePool
}

var _ controllers.Action = &CreateNodePool{}

func (a *CreateNodePool) Name() string {
return "CreateNodePool"
}

func (a *CreateNodePool) Execute(s *controllers.State) error {
ss := nodepool.StatefulSetForCluster(a.Cluster, a.NodePool)
_, err := s.Clientset.AppsV1beta1().StatefulSets(ss.Namespace).Create(ss)
// XXX: Should this be idempotent?
if k8sErrors.IsAlreadyExists(err) {
return nil
}
return err
}
95 changes: 95 additions & 0 deletions pkg/controllers/cassandra/actions/create_nodepool_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package actions_test

import (
"testing"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"

"github.com/jetstack/navigator/internal/test/unit/framework"
"github.com/jetstack/navigator/internal/test/util/generate"
"github.com/jetstack/navigator/pkg/controllers/cassandra/actions"
)

func TestCreateNodePool(t *testing.T) {
type testT struct {
kubeObjects []runtime.Object
navObjects []runtime.Object
cluster generate.CassandraClusterConfig
nodePool generate.CassandraClusterNodePoolConfig
expectedStatefulSet generate.StatefulSetConfig
expectedErr bool
}
tests := map[string]testT{
"A statefulset is created if one does not already exist": {
cluster: generate.CassandraClusterConfig{
Name: "cluster1",
Namespace: "ns1",
},
nodePool: generate.CassandraClusterNodePoolConfig{
Name: "pool1",
},
expectedStatefulSet: generate.StatefulSetConfig{
Name: "cass-cluster1-pool1",
Namespace: "ns1",
Replicas: int32Ptr(0),
},
},
"Idempotent: CreateNodePool can be executed again without error": {
kubeObjects: []runtime.Object{
generate.StatefulSet(
generate.StatefulSetConfig{
Name: "cass-cluster1-pool1",
Namespace: "ns1",
Replicas: int32Ptr(10),
},
),
},
cluster: generate.CassandraClusterConfig{Name: "cluster1", Namespace: "ns1"},
nodePool: generate.CassandraClusterNodePoolConfig{
Name: "pool1",
},
expectedStatefulSet: generate.StatefulSetConfig{
Name: "cass-cluster1-pool1",
Namespace: "ns1",
Replicas: int32Ptr(10),
},
expectedErr: false,
},
}

for name, test := range tests {
t.Run(
name,
func(t *testing.T) {
fixture := &framework.StateFixture{
T: t,
KubeObjects: test.kubeObjects,
NavigatorObjects: test.navObjects,
}
fixture.Start()
defer fixture.Stop()
state := fixture.State()
a := &actions.CreateNodePool{
Cluster: generate.CassandraCluster(test.cluster),
NodePool: generate.CassandraClusterNodePool(test.nodePool),
}
err := a.Execute(state)
if !test.expectedErr && err != nil {
t.Errorf("Unexpected error: %s", err)
}
if test.expectedErr && err == nil {
t.Errorf("Expected an error")
}
actualStatefulSet, err := fixture.KubeClient().
AppsV1beta1().
StatefulSets(test.expectedStatefulSet.Namespace).
Get(test.expectedStatefulSet.Name, metav1.GetOptions{})
if err != nil {
t.Fatalf("Unexpected error retrieving statefulset: %v", err)
}
generate.AssertStatefulSetMatches(t, test.expectedStatefulSet, actualStatefulSet)
},
)
}
}
42 changes: 42 additions & 0 deletions pkg/controllers/cassandra/actions/scaleout.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package actions

import (
"fmt"

"github.com/jetstack/navigator/pkg/apis/navigator/v1alpha1"
"github.com/jetstack/navigator/pkg/controllers"
"github.com/jetstack/navigator/pkg/controllers/cassandra/nodepool"
)

type ScaleOut struct {
Cluster *v1alpha1.CassandraCluster
NodePool *v1alpha1.CassandraClusterNodePool
}

var _ controllers.Action = &ScaleOut{}

func (a *ScaleOut) Name() string {
return "ScaleOut"
}

func (a *ScaleOut) Execute(s *controllers.State) error {
ss := nodepool.StatefulSetForCluster(a.Cluster, a.NodePool)
ss, err := s.StatefulSetLister.StatefulSets(ss.Namespace).Get(ss.Name)
if err != nil {
return err
}
ss = ss.DeepCopy()
if ss.Spec.Replicas == nil || *ss.Spec.Replicas < a.NodePool.Replicas {
ss.Spec.Replicas = &a.NodePool.Replicas
_, err = s.Clientset.AppsV1beta1().StatefulSets(ss.Namespace).Update(ss)
return err
}
if *ss.Spec.Replicas > a.NodePool.Replicas {
return fmt.Errorf(
"the NodePool.Replicas value (%d) "+
"is less than the existing StatefulSet.Replicas value (%d)",
a.NodePool.Replicas, *ss.Spec.Replicas,
)
}
return nil
}
Loading

0 comments on commit 42f02a1

Please sign in to comment.