Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

skip creation later to improve visibility of errors #1013

Merged
merged 3 commits into from
Jun 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pkg/apis/acid.zalan.do/v1/marshal.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (p *Postgresql) UnmarshalJSON(data []byte) error {
}

tmp.Error = err.Error()
tmp.Status = PostgresStatus{PostgresClusterStatus: ClusterStatusInvalid}
tmp.Status.PostgresClusterStatus = ClusterStatusInvalid

*p = Postgresql(tmp)

Expand All @@ -112,10 +112,10 @@ func (p *Postgresql) UnmarshalJSON(data []byte) error {

if clusterName, err := extractClusterName(tmp2.ObjectMeta.Name, tmp2.Spec.TeamID); err != nil {
tmp2.Error = err.Error()
tmp2.Status = PostgresStatus{PostgresClusterStatus: ClusterStatusInvalid}
tmp2.Status.PostgresClusterStatus = ClusterStatusInvalid
} else if err := validateCloneClusterDescription(&tmp2.Spec.Clone); err != nil {
tmp2.Error = err.Error()
tmp2.Status = PostgresStatus{PostgresClusterStatus: ClusterStatusInvalid}
tmp2.Status.PostgresClusterStatus = ClusterStatusInvalid
} else {
tmp2.Spec.ClusterName = clusterName
}
Expand Down
41 changes: 6 additions & 35 deletions pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package cluster
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"reflect"
"regexp"
Expand Down Expand Up @@ -181,34 +180,6 @@ func (c *Cluster) GetReference() *v1.ObjectReference {
return ref
}

// SetStatus of Postgres cluster
// TODO: eventually switch to updateStatus() for kubernetes 1.11 and above
func (c *Cluster) setStatus(status string) {
var pgStatus acidv1.PostgresStatus
pgStatus.PostgresClusterStatus = status

patch, err := json.Marshal(struct {
PgStatus interface{} `json:"status"`
}{&pgStatus})

if err != nil {
c.logger.Errorf("could not marshal status: %v", err)
}

// we cannot do a full scale update here without fetching the previous manifest (as the resourceVersion may differ),
// however, we could do patch without it. In the future, once /status subresource is there (starting Kubernetes 1.11)
// we should take advantage of it.
newspec, err := c.KubeClient.AcidV1ClientSet.AcidV1().Postgresqls(c.clusterNamespace()).Patch(
context.TODO(), c.Name, types.MergePatchType, patch, metav1.PatchOptions{}, "status")
if err != nil {
c.logger.Errorf("could not update status: %v", err)
// return as newspec is empty, see PR654
return
}
// update the spec, maintaining the new resourceVersion.
c.setSpec(newspec)
}

func (c *Cluster) isNewCluster() bool {
return c.Status.Creating()
}
Expand Down Expand Up @@ -257,13 +228,13 @@ func (c *Cluster) Create() error {

defer func() {
if err == nil {
c.setStatus(acidv1.ClusterStatusRunning) //TODO: are you sure it's running?
c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusRunning) //TODO: are you sure it's running?
} else {
c.setStatus(acidv1.ClusterStatusAddFailed)
c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusAddFailed)
}
}()

c.setStatus(acidv1.ClusterStatusCreating)
c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusCreating)
c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Create", "Started creation of new cluster resources")

if err = c.enforceMinResourceLimits(&c.Spec); err != nil {
Expand Down Expand Up @@ -630,14 +601,14 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
c.mu.Lock()
defer c.mu.Unlock()

c.setStatus(acidv1.ClusterStatusUpdating)
c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusUpdating)
c.setSpec(newSpec)

defer func() {
if updateFailed {
c.setStatus(acidv1.ClusterStatusUpdateFailed)
c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusUpdateFailed)
} else {
c.setStatus(acidv1.ClusterStatusRunning)
c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusRunning)
}
}()

Expand Down
4 changes: 2 additions & 2 deletions pkg/cluster/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ func (c *Cluster) Sync(newSpec *acidv1.Postgresql) error {
defer func() {
if err != nil {
c.logger.Warningf("error while syncing cluster state: %v", err)
c.setStatus(acidv1.ClusterStatusSyncFailed)
c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusSyncFailed)
} else if !c.Status.Running() {
c.setStatus(acidv1.ClusterStatusRunning)
c.KubeClient.SetPostgresCRDStatus(c.clusterName(), acidv1.ClusterStatusRunning)
}
}()

Expand Down
11 changes: 11 additions & 0 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/tools/reference"
)

// Controller represents operator controller
Expand Down Expand Up @@ -442,6 +443,16 @@ func (c *Controller) getEffectiveNamespace(namespaceFromEnvironment, namespaceFr
return namespace
}

// GetReference of Postgres CR object
// i.e. required to emit events to this resource
func (c *Controller) GetReference(postgresql *acidv1.Postgresql) *v1.ObjectReference {
ref, err := reference.GetReference(scheme.Scheme, postgresql)
if err != nil {
c.logger.Errorf("could not get reference for Postgresql CR %v/%v: %v", postgresql.Namespace, postgresql.Name, err)
}
return ref
}

// hasOwnership returns true if the controller is the "owner" of the postgresql.
// Whether it's owner is determined by the value of 'acid.zalan.do/controller'
// annotation. If the value matches the controllerID then it owns it, or if the
Expand Down
19 changes: 15 additions & 4 deletions pkg/controller/postgresql.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,14 +421,25 @@ func (c *Controller) queueClusterEvent(informerOldSpec, informerNewSpec *acidv1.
}

if clusterError != "" && eventType != EventDelete {
c.logger.
WithField("cluster-name", clusterName).
Debugf("skipping %q event for the invalid cluster: %s", eventType, clusterError)
c.logger.WithField("cluster-name", clusterName).Debugf("skipping %q event for the invalid cluster: %s", eventType, clusterError)

switch eventType {
case EventAdd:
FxKu marked this conversation as resolved.
Show resolved Hide resolved
c.KubeClient.SetPostgresCRDStatus(clusterName, acidv1.ClusterStatusAddFailed)
c.eventRecorder.Eventf(c.GetReference(informerNewSpec), v1.EventTypeWarning, "Create", "%v", clusterError)
case EventUpdate:
c.KubeClient.SetPostgresCRDStatus(clusterName, acidv1.ClusterStatusUpdateFailed)
c.eventRecorder.Eventf(c.GetReference(informerNewSpec), v1.EventTypeWarning, "Update", "%v", clusterError)
default:
c.KubeClient.SetPostgresCRDStatus(clusterName, acidv1.ClusterStatusSyncFailed)
c.eventRecorder.Eventf(c.GetReference(informerNewSpec), v1.EventTypeWarning, "Sync", "%v", clusterError)
}

return
}

// Don't pass the spec directly from the informer, since subsequent modifications of it would be reflected
// in the informer internal state, making it incohherent with the actual Kubernetes object (and, as a side
// in the informer internal state, making it incoherent with the actual Kubernetes object (and, as a side
// effect, the modified state will be returned together with subsequent events).

workerID := c.clusterWorkerID(clusterName)
Expand Down
30 changes: 30 additions & 0 deletions pkg/util/k8sutil/k8sutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,13 @@ import (
"reflect"

b64 "encoding/base64"
"encoding/json"

batchv1beta1 "k8s.io/api/batch/v1beta1"
clientbatchv1beta1 "k8s.io/client-go/kubernetes/typed/batch/v1beta1"

acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1"
"github.com/zalando/postgres-operator/pkg/spec"
apiappsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
policybeta1 "k8s.io/api/policy/v1beta1"
Expand Down Expand Up @@ -156,6 +159,33 @@ func NewFromConfig(cfg *rest.Config) (KubernetesClient, error) {
return kubeClient, nil
}

// SetPostgresCRDStatus of Postgres cluster
func (client *KubernetesClient) SetPostgresCRDStatus(clusterName spec.NamespacedName, status string) (*acidv1.Postgresql, error) {
var pg *acidv1.Postgresql
var pgStatus acidv1.PostgresStatus
pgStatus.PostgresClusterStatus = status

patch, err := json.Marshal(struct {
PgStatus interface{} `json:"status"`
}{&pgStatus})

if err != nil {
return pg, fmt.Errorf("could not marshal status: %v", err)
}

// we cannot do a full scale update here without fetching the previous manifest (as the resourceVersion may differ),
// however, we could do patch without it. In the future, once /status subresource is there (starting Kubernetes 1.11)
// we should take advantage of it.
pg, err = client.AcidV1ClientSet.AcidV1().Postgresqls(clusterName.Namespace).Patch(
context.TODO(), clusterName.Name, types.MergePatchType, patch, metav1.PatchOptions{}, "status")
if err != nil {
return pg, fmt.Errorf("could not update status: %v", err)
}

// update the spec, maintaining the new resourceVersion.
return pg, nil
}

// SameService compares the Services
func SameService(cur, new *v1.Service) (match bool, reason string) {
//TODO: improve comparison
Expand Down