Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pod monitoring #9

Merged
merged 10 commits into from
Feb 26, 2018
Merged
Next Next commit
Adding pod monitoring
  • Loading branch information
ewoutp committed Feb 16, 2018
commit 012e7cf82a21df11b170fea6d7720f427900af29
60 changes: 60 additions & 0 deletions pkg/apis/arangodb/v1alpha/deployment_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,55 @@ func (ds DeploymentStatusMembers) ContainsID(id string) bool {
ds.SyncWorkers.ContainsID(id)
}

// MemberStatusByPodName returns a reference to the element in the given set of lists that has the given pod name.
// If no such element exists, nil is returned.
func (ds DeploymentStatusMembers) MemberStatusByPodName(podName string) (MemberStatus, ServerGroup, bool) {
if result, found := ds.Single.ElementByPodName(podName); found {
return result, ServerGroupSingle, true
}
if result, found := ds.Agents.ElementByPodName(podName); found {
return result, ServerGroupAgents, true
}
if result, found := ds.DBServers.ElementByPodName(podName); found {
return result, ServerGroupDBServers, true
}
if result, found := ds.Coordinators.ElementByPodName(podName); found {
return result, ServerGroupCoordinators, true
}
if result, found := ds.SyncMasters.ElementByPodName(podName); found {
return result, ServerGroupSyncMasters, true
}
if result, found := ds.SyncWorkers.ElementByPodName(podName); found {
return result, ServerGroupSyncWorkers, true
}
return MemberStatus{}, 0, false
}

// UpdateMemberStatus updates the given status in the given group.
func (ds *DeploymentStatusMembers) UpdateMemberStatus(status MemberStatus, group ServerGroup) error {
var err error
switch group {
case ServerGroupSingle:
err = ds.Single.Update(status)
case ServerGroupAgents:
err = ds.Agents.Update(status)
case ServerGroupDBServers:
err = ds.DBServers.Update(status)
case ServerGroupCoordinators:
err = ds.Coordinators.Update(status)
case ServerGroupSyncMasters:
err = ds.SyncMasters.Update(status)
case ServerGroupSyncWorkers:
err = ds.SyncWorkers.Update(status)
default:
return maskAny(errors.Wrapf(NotFoundError, "ServerGroup %d is not known", group))
}
if err != nil {
return maskAny(err)
}
return nil
}

// MemberStatusList is a list of MemberStatus entries
type MemberStatusList []MemberStatus

Expand All @@ -99,6 +148,17 @@ func (l MemberStatusList) ContainsID(id string) bool {
return false
}

// ElementByPodName returns the element in the given list that has the given pod name and true.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

searches for first MemberStatus whose name matches a given string. If such a status it found a (status, true) is returned, otherwise (MemberStatus{},false) is returned.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed text a bit different.

// If no such element exists, false is returned.
func (l MemberStatusList) ElementByPodName(podName string) (MemberStatus, bool) {
for i, x := range l {
if x.PodName == podName {
return l[i], true
}
}
return MemberStatus{}, false
}

// Add a member to the list.
// Returns an AlreadyExistsError if the ID of the given member already exists.
func (l *MemberStatusList) Add(m MemberStatus) error {
Expand Down
30 changes: 24 additions & 6 deletions pkg/deployment/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,17 @@ type Dependencies struct {
type deploymentEventType string

const (
eventModifyDeployment deploymentEventType = "Modify"
eventArangoDeploymentUpdated deploymentEventType = "ArangoDeploymentUpdated"
eventPodAdded deploymentEventType = "PodAdded"
eventPodUpdated deploymentEventType = "PodUpdated"
eventPodDeleted deploymentEventType = "PodDeleted"
)

// deploymentEvent holds an event passed from the controller to the deployment.
type deploymentEvent struct {
Type deploymentEventType
Deployment *api.ArangoDeployment
Pod *v1.Pod
}

const (
Expand Down Expand Up @@ -98,6 +102,7 @@ func New(config Config, deps Dependencies, apiObject *api.ArangoDeployment) (*De
}

go d.run()
go d.listenForPodEvents()

return d, nil
}
Expand All @@ -106,7 +111,7 @@ func New(config Config, deps Dependencies, apiObject *api.ArangoDeployment) (*De
// This sends an update event in the deployment event queue.
func (d *Deployment) Update(apiObject *api.ArangoDeployment) {
d.send(&deploymentEvent{
Type: eventModifyDeployment,
Type: eventArangoDeploymentUpdated,
Deployment: apiObject,
})
}
Expand Down Expand Up @@ -178,20 +183,24 @@ func (d *Deployment) run() {
case event := <-d.eventCh:
// Got event from event queue
switch event.Type {
case eventModifyDeployment:
if err := d.handleUpdateEvent(event); err != nil {
case eventArangoDeploymentUpdated:
if err := d.handleArangoDeploymentUpdatedEvent(event); err != nil {
d.failOnError(err, "Failed to handle deployment update")
return
}
case eventPodAdded, eventPodUpdated, eventPodDeleted:
if err := d.inspectPods(); err != nil {
d.createEvent(k8sutil.NewErrorEvent("Pod inspection failed", err, d.apiObject))
}
default:
panic("unknown event type" + event.Type)
}
}
}
}

// handleUpdateEvent processes the given event coming from the deployment event queue.
func (d *Deployment) handleUpdateEvent(event *deploymentEvent) error {
// handleArangoDeploymentUpdatedEvent is called when the deployment is updated by the user.
func (d *Deployment) handleArangoDeploymentUpdatedEvent(event *deploymentEvent) error {
// TODO
return nil
}
Expand Down Expand Up @@ -269,3 +278,12 @@ func (d *Deployment) reportFailedStatus() {

retry.Retry(op, time.Hour*24*365)
}

// isOwnerOf returns true if the given object belong to this deployment.
func (d *Deployment) isOwnerOf(obj metav1.Object) bool {
ownerRefs := obj.GetOwnerReferences()
if len(ownerRefs) < 1 {
return false
}
return ownerRefs[0].UID != d.apiObject.UID
}
1 change: 0 additions & 1 deletion pkg/deployment/pods.go → pkg/deployment/pod_creator.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"strconv"

api "github.com/arangodb/k8s-operator/pkg/apis/arangodb/v1alpha"

"github.com/arangodb/k8s-operator/pkg/util/arangod"
"github.com/arangodb/k8s-operator/pkg/util/k8sutil"
)
Expand Down
80 changes: 80 additions & 0 deletions pkg/deployment/pod_informer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
//
// DISCLAIMER
//
// Copyright 2018 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
// Author Ewout Prangsma
//

package deployment

import (
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/client-go/tools/cache"
)

// listenForPodEvents keep listening for changes in pod until the given channel is closed.
func (d *Deployment) listenForPodEvents() {
source := cache.NewListWatchFromClient(
d.deps.KubeCli.CoreV1().RESTClient(),
"pods",
d.apiObject.GetNamespace(),
fields.Everything())

getPod := func(obj interface{}) (*v1.Pod, bool) {
pod, ok := obj.(*v1.Pod)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
return nil, false
}
pod, ok = tombstone.Obj.(*v1.Pod)
return pod, ok
}
return pod, true
}

_, informer := cache.NewIndexerInformer(source, &v1.Pod{}, 0, cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
if p, ok := getPod(obj); ok && d.isOwnerOf(p) {
d.send(&deploymentEvent{
Type: eventPodAdded,
Pod: p,
})
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
if p, ok := getPod(newObj); ok && d.isOwnerOf(p) {
d.send(&deploymentEvent{
Type: eventPodUpdated,
Pod: p,
})
}
},
DeleteFunc: func(obj interface{}) {
if p, ok := getPod(obj); ok && d.isOwnerOf(p) {
d.send(&deploymentEvent{
Type: eventPodDeleted,
Pod: p,
})
}
},
}, cache.Indexers{})

informer.Run(d.stopCh)
}
74 changes: 74 additions & 0 deletions pkg/deployment/pod_inspector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
//
// DISCLAIMER
//
// Copyright 2018 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
// Author Ewout Prangsma
//

package deployment

import (
"github.com/arangodb/k8s-operator/pkg/util/k8sutil"

api "github.com/arangodb/k8s-operator/pkg/apis/arangodb/v1alpha"
)

// inspectPods lists all pods that belong to the given deployment and updates
// the member status of the deployment accordingly.
func (d *Deployment) inspectPods() error {
log := d.deps.Log

log.Debug().Msg("inspecting pods")
pods, err := d.deps.KubeCli.CoreV1().Pods(d.apiObject.GetNamespace()).List(k8sutil.DeploymentListOpt(d.apiObject.GetName()))
if err != nil {
log.Debug().Err(err).Msg("Failed to list pods")
return maskAny(err)
}

// Update member status from all pods found
for _, p := range pods.Items {
// Check ownership
if !d.isOwnerOf(&p) {
continue
}

// Find member status
memberStatus, group, found := d.status.Members.MemberStatusByPodName(p.GetName())
if !found {
continue
}

// Update state
log.Debug().Str("pod-name", p.GetName()).Msg("found member status for pod")
if memberStatus.State == api.MemberStateCreating {
if k8sutil.IsPodReady(&p) {
memberStatus.State = api.MemberStateReady
if err := d.status.Members.UpdateMemberStatus(memberStatus, group); err != nil {
return maskAny(err)
}
log.Debug().Str("pod-name", p.GetName()).Msg("updated member status for pod to ready")
}
}
}

// Save status
if err := d.updateCRStatus(); err != nil {
return maskAny(err)
}
return nil
}
13 changes: 11 additions & 2 deletions pkg/util/k8sutil/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,24 @@ func NewMemberAddEvent(memberName, role string, apiObject APIObject) *v1.Event {
return event
}

// MemberRemoveEvent creates an event indicating that an existing member was removed.
func MemberRemoveEvent(memberName, role string, apiObject APIObject) *v1.Event {
// NewMemberRemoveEvent creates an event indicating that an existing member was removed.
func NewMemberRemoveEvent(memberName, role string, apiObject APIObject) *v1.Event {
event := newDeploymentEvent(apiObject)
event.Type = v1.EventTypeNormal
event.Reason = fmt.Sprintf("%s Removed", strings.Title(role))
event.Message = fmt.Sprintf("Existing %s %s removed from the deployment", role, memberName)
return event
}

// NewErrorEvent creates an even of type error.
func NewErrorEvent(reason string, err error, apiObject APIObject) *v1.Event {
event := newDeploymentEvent(apiObject)
event.Type = v1.EventTypeWarning
event.Reason = strings.Title(reason)
event.Message = err.Error()
return event
}

// newDeploymentEvent creates a new event for the given api object & owner.
func newDeploymentEvent(apiObject APIObject) *v1.Event {
t := time.Now()
Expand Down
18 changes: 18 additions & 0 deletions pkg/util/k8sutil/pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,24 @@ const (
ArangodVolumeMountDir = "/data"
)

// IsPodReady returns true if the PodReady condition on
// the given pod is set to true.
func IsPodReady(pod *v1.Pod) bool {
condition := getPodReadyCondition(&pod.Status)
return condition != nil && condition.Status == v1.ConditionTrue
}

// getPodReadyCondition returns the PodReady condition in the given status.
// If not found, nil is returned.
func getPodReadyCondition(status *v1.PodStatus) *v1.PodCondition {
for i := range status.Conditions {
if status.Conditions[i].Type == v1.PodReady {
return &status.Conditions[i]
}
}
return nil
}

// CreatePodName returns the name of the pod for a member with
// a given id in a deployment with a given name.
func CreatePodName(deploymentName, role, id string) string {
Expand Down
8 changes: 8 additions & 0 deletions pkg/util/k8sutil/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ package k8sutil

import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
)

// addOwnerRefToObject adds given owner reference to given object
Expand All @@ -42,3 +43,10 @@ func LabelsForDeployment(deploymentName, role string) map[string]string {
}
return l
}

// DeploymentListOpt creates a ListOptions matching all labels for the given deployment name.
func DeploymentListOpt(deploymentName string) metav1.ListOptions {
return metav1.ListOptions{
LabelSelector: labels.SelectorFromSet(LabelsForDeployment(deploymentName, "")).String(),
}
}