Skip to content

Commit

Permalink
Add controller expectations for Deployer controller and make sure it …
Browse files Browse the repository at this point in the history
…won't start deployer pod more than once.
  • Loading branch information
tnozicka committed Oct 6, 2017
1 parent d47d1a1 commit 5da6d84
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 27 deletions.
73 changes: 49 additions & 24 deletions pkg/apps/controller/deployer/deployer_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"k8s.io/kubernetes/pkg/api/v1"
kcoreclient "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/typed/core/v1"
kcorelisters "k8s.io/kubernetes/pkg/client/listers/core/v1"
kcontroller "k8s.io/kubernetes/pkg/controller"

deployapi "github.com/openshift/origin/pkg/apps/apis/apps"
deployutil "github.com/openshift/origin/pkg/apps/util"
Expand Down Expand Up @@ -69,6 +70,8 @@ type DeploymentController struct {

// queue contains replication controllers that need to be synced.
queue workqueue.RateLimitingInterface
// A TTLCache of deployer pod creates and deletes.
expectations *kcontroller.ControllerExpectations

// rcLister can list/get replication controllers from a shared informer's cache
rcLister kcorelisters.ReplicationControllerLister
Expand Down Expand Up @@ -97,6 +100,12 @@ type DeploymentController struct {
// to a terminal deployment status. Since this controller started using caches,
// the provided rc MUST be deep-copied beforehand (see work() in factory.go).
func (c *DeploymentController) handle(deployment *v1.ReplicationController, willBeDropped bool) error {
key, err := getKeyForReplicationController(deployment)
if err != nil {
utilruntime.HandleError(err)
return nil
}

// Copy all the annotations from the deployment.
updatedAnnotations := make(map[string]string)
for key, value := range deployment.Annotations {
Expand All @@ -109,7 +118,7 @@ func (c *DeploymentController) handle(deployment *v1.ReplicationController, will
deployerPodName := deployutil.DeployerPodNameForDeployment(deployment.Name)
deployer, deployerErr := c.podLister.Pods(deployment.Namespace).Get(deployerPodName)
if deployerErr == nil {
nextStatus = c.nextStatus(deployer, deployment, updatedAnnotations)
nextStatus = c.nextStatus(currentStatus, deployer, deployment, updatedAnnotations)
}

switch currentStatus {
Expand All @@ -134,27 +143,34 @@ func (c *DeploymentController) handle(deployment *v1.ReplicationController, will
return nil
}

// Generate a deployer pod spec.
deployerPod, err := c.makeDeployerPod(deployment)
if err != nil {
return fatalError(fmt.Sprintf("couldn't make deployer pod for %q: %v", deployutil.LabelForDeploymentV1(deployment), err))
}
// Create the deployer pod.
deploymentPod, err := c.pn.Pods(deployment.Namespace).Create(deployerPod)
// Retry on error.
if err != nil {
// if we cannot create a deployment pod (i.e lack of quota), match normal replica set experience and
// emit an event.
c.emitDeploymentEvent(deployment, v1.EventTypeWarning, "FailedCreate", fmt.Sprintf("Error creating deployer pod: %v", err))
return actionableError(fmt.Sprintf("couldn't create deployer pod for %q: %v", deployutil.LabelForDeploymentV1(deployment), err))
}
updatedAnnotations[deployapi.DeploymentPodAnnotation] = deploymentPod.Name
updatedAnnotations[deployapi.DeployerPodCreatedAtAnnotation] = deploymentPod.CreationTimestamp.String()
if deploymentPod.Status.StartTime != nil {
updatedAnnotations[deployapi.DeployerPodStartedAtAnnotation] = deploymentPod.Status.StartTime.String()
// We need to check the cache if we haven't already created deployer pod.
// In case the caches wouldn't be synced yet and we would receive the same RC with state New
// and the deployer pod would have been deleted (by user or otherwise) we would have recreated it again.
// Also a newer deployment might be already running so we would have 2 active deployer pods.
if c.expectations.SatisfiedExpectations(key) {
// Generate a deployer pod spec.
deployerPod, err := c.makeDeployerPod(deployment)
if err != nil {
return fatalError(fmt.Sprintf("couldn't make deployer pod for %q: %v", deployutil.LabelForDeploymentV1(deployment), err))
}
// Create the deployer pod.
deployerPod, err = c.pn.Pods(deployment.Namespace).Create(deployerPod)
// Retry on error.
if err != nil {
// if we cannot create a deployment pod (i.e lack of quota), match normal replica set experience and
// emit an event.
c.emitDeploymentEvent(deployment, v1.EventTypeWarning, "FailedCreate", fmt.Sprintf("Error creating deployer pod: %v", err))
return actionableError(fmt.Sprintf("couldn't create deployer pod for %q: %v", deployutil.LabelForDeploymentV1(deployment), err))
}
c.expectations.ExpectDeletions(key, 1)
updatedAnnotations[deployapi.DeploymentPodAnnotation] = deployerPod.Name
updatedAnnotations[deployapi.DeployerPodCreatedAtAnnotation] = deployerPod.CreationTimestamp.String()
if deployerPod.Status.StartTime != nil {
updatedAnnotations[deployapi.DeployerPodStartedAtAnnotation] = deployerPod.Status.StartTime.String()
}
nextStatus = deployapi.DeploymentStatusPending
glog.V(4).Infof("Created deployer pod %q for %q (RV: %s)", deployerPod.Name, deployutil.LabelForDeploymentV1(deployment), deployment.ResourceVersion)
}
nextStatus = deployapi.DeploymentStatusPending
glog.V(4).Infof("Created deployer pod %q for %q", deploymentPod.Name, deployutil.LabelForDeploymentV1(deployment))

// Most likely dead code since we never get an error different from 404 back from the cache.
case deployerErr != nil:
Expand All @@ -173,7 +189,8 @@ func (c *DeploymentController) handle(deployment *v1.ReplicationController, will
// to ensure that changes to 'unrelated' pods don't result in updates to
// the deployment. So, the image check will have to be done in other areas
// of the code as well.
if deployutil.DeploymentNameFor(deployer) != deployment.Name {
controllerRef := kcontroller.GetControllerOf(deployer)
if deployutil.DeploymentNameFor(deployer) != deployment.Name || (controllerRef != nil && controllerRef.UID != deployment.UID) {
nextStatus = deployapi.DeploymentStatusFailed
updatedAnnotations[deployapi.DeploymentStatusReasonAnnotation] = deployapi.DeploymentFailedUnrelatedDeploymentExists
c.emitDeploymentEvent(deployment, v1.EventTypeWarning, "FailedCreate", fmt.Sprintf("Error creating deployer pod since another pod with the same name (%q) exists", deployer.Name))
Expand Down Expand Up @@ -231,6 +248,8 @@ func (c *DeploymentController) handle(deployment *v1.ReplicationController, will
}

case deployapi.DeploymentStatusFailed:
c.expectations.ExpectDeletions(key, 0)

// Try to cleanup once more a cancelled deployment in case hook pods
// were created just after we issued the first cleanup request.
if deployutil.IsDeploymentCancelled(deployment) {
Expand All @@ -246,6 +265,8 @@ func (c *DeploymentController) handle(deployment *v1.ReplicationController, will
}

case deployapi.DeploymentStatusComplete:
c.expectations.ExpectDeletions(key, 0)

if err := c.cleanupDeployerPods(deployment); err != nil {
return err
}
Expand Down Expand Up @@ -282,7 +303,7 @@ func (c *DeploymentController) handle(deployment *v1.ReplicationController, will
return nil
}

func (c *DeploymentController) nextStatus(pod *v1.Pod, deployment *v1.ReplicationController, updatedAnnotations map[string]string) deployapi.DeploymentStatus {
func (c *DeploymentController) nextStatus(current deployapi.DeploymentStatus, pod *v1.Pod, deployment *v1.ReplicationController, updatedAnnotations map[string]string) deployapi.DeploymentStatus {
switch pod.Status.Phase {
case v1.PodPending:
return deployapi.DeploymentStatusPending
Expand Down Expand Up @@ -315,8 +336,12 @@ func (c *DeploymentController) nextStatus(pod *v1.Pod, deployment *v1.Replicatio
updatedAnnotations[deployapi.DeployerPodCompletedAtAnnotation] = completedTimestamp.String()
}
return deployapi.DeploymentStatusFailed

case v1.PodUnknown:
fallthrough
default:
return current
}
return deployapi.DeploymentStatusNew
}

// getPodTerminatedTimestamp gets the first terminated container in a pod and
Expand Down
25 changes: 22 additions & 3 deletions pkg/apps/controller/deployer/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ func NewDeployerController(
rn: kubeClientset.Core(),
pn: kubeClientset.Core(),

queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
expectations: kcontroller.NewControllerExpectations(),

rcLister: rcInformer.Lister(),
rcListerSynced: rcInformer.Informer().HasSynced,
Expand All @@ -68,6 +69,7 @@ func NewDeployerController(
})

podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.addPod,
UpdateFunc: c.updatePod,
DeleteFunc: c.deletePod,
})
Expand Down Expand Up @@ -124,6 +126,15 @@ func (c *DeploymentController) updateReplicationController(old, cur interface{})
c.enqueueReplicationController(curRC)
}

func (c *DeploymentController) addPod(obj interface{}) {
pod := obj.(*v1.Pod)

rc, err := c.rcForDeployerPod(pod)
if err == nil && rc != nil {
c.enqueueReplicationController(rc)
}
}

func (c *DeploymentController) updatePod(old, cur interface{}) {
// A periodic relist will send update events for all known pods.
curPod := cur.(*v1.Pod)
Expand Down Expand Up @@ -157,10 +168,18 @@ func (c *DeploymentController) deletePod(obj interface{}) {
}
}

func (c *DeploymentController) enqueueReplicationController(rc *v1.ReplicationController) {
func getKeyForReplicationController(rc *v1.ReplicationController) (string, error) {
key, err := kcontroller.KeyFunc(rc)
if err != nil {
utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %#v: %v", rc, err))
return key, fmt.Errorf("Couldn't get key for object %#v: %v", rc, err)
}
return key, nil
}

func (c *DeploymentController) enqueueReplicationController(rc *v1.ReplicationController) {
key, err := getKeyForReplicationController(rc)
if err != nil {
utilruntime.HandleError(err)
return
}
c.queue.Add(key)
Expand Down

0 comments on commit 5da6d84

Please sign in to comment.