Skip to content
This repository has been archived by the owner on Sep 19, 2022. It is now read-only.

Commit

Permalink
minor fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
johnugeorge committed Dec 17, 2018
1 parent a4b9016 commit 6d2b2ec
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 43 deletions.
34 changes: 19 additions & 15 deletions pkg/controller.v1beta1/pytorch/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,18 +177,11 @@ func (pc *PyTorchController) Run(threadiness int, stopCh <-chan struct{}) error

// Wait for the caches to be synced before starting workers.
log.Info("Waiting for informer caches to sync")
if ok := cache.WaitForCacheSync(stopCh, pc.jobInformerSynced); !ok {
return fmt.Errorf("failed to wait for job caches to sync")
}

if ok := cache.WaitForCacheSync(stopCh, pc.PodInformerSynced); !ok {
return fmt.Errorf("failed to wait for pod caches to sync")
}

if ok := cache.WaitForCacheSync(stopCh, pc.ServiceInformerSynced); !ok {
return fmt.Errorf("failed to wait for service caches to sync")
if ok := cache.WaitForCacheSync(stopCh, pc.jobInformerSynced,
pc.PodInformerSynced, pc.ServiceInformerSynced); !ok {
return fmt.Errorf("failed to wait for caches to sync")
}

log.Infof("Starting %v workers", threadiness)
// Launch workers to process PyTorchJob resources.
for i := 0; i < threadiness; i++ {
Expand All @@ -213,15 +206,26 @@ func (pc *PyTorchController) runWorker() {
// processNextWorkItem will read a single work item off the workqueue and
// attempt to process it, by calling the syncHandler.
func (pc *PyTorchController) processNextWorkItem() bool {
key, quit := pc.WorkQueue.Get()
obj, quit := pc.WorkQueue.Get()
if quit {
return false
}
defer pc.WorkQueue.Done(key)
defer pc.WorkQueue.Done(obj)

var key string
var ok bool
if key, ok = obj.(string); !ok {
// As the item in the workqueue is actually invalid, we call
// Forget here else we'd go into a loop of attempting to
// process a work item that is invalid.
pc.WorkQueue.Forget(obj)
utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
return true
}

logger := pylogger.LoggerForKey(key.(string))
logger := pylogger.LoggerForKey(key)

pytorchJob, err := pc.getPyTorchJobFromKey(key.(string))
pytorchJob, err := pc.getPyTorchJobFromKey(key)
if err != nil {
if err == errNotExists {
logger.Infof("PyTorchJob has been deleted: %v", key)
Expand All @@ -240,7 +244,7 @@ func (pc *PyTorchController) processNextWorkItem() bool {
}

// Sync PyTorchJob to mapch the actual state to this desired state.
forget, err := pc.syncHandler(key.(string))
forget, err := pc.syncHandler(key)
if err == nil {
if forget {
pc.WorkQueue.Forget(key)
Expand Down
6 changes: 1 addition & 5 deletions pkg/controller.v1beta1/pytorch/informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,7 @@ func (pc *PyTorchController) getPyTorchJobFromKey(key string) (*v1beta1.PyTorchJ
return nil, errNotExists
}

job, err := jobFromUnstructured(obj)
if err != nil {
return nil, err
}
return job, nil
return jobFromUnstructured(obj)
}

func jobFromUnstructured(obj interface{}) (*v1beta1.PyTorchJob, error) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller.v1beta1/pytorch/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
)

const (
failedMarshalPyTorchJobReason = "FailedInvalidPyTorchJobSpec"
failedMarshalPyTorchJobReason = "InvalidPyTorchJobSpec"
)

// When a pod is added, set the defaults and enqueue the current pytorchjob.
Expand Down
35 changes: 19 additions & 16 deletions pkg/controller.v2/pytorch/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,18 +177,11 @@ func (pc *PyTorchController) Run(threadiness int, stopCh <-chan struct{}) error

// Wait for the caches to be synced before starting workers.
log.Info("Waiting for informer caches to sync")
if ok := cache.WaitForCacheSync(stopCh, pc.jobInformerSynced); !ok {
return fmt.Errorf("failed to wait for job caches to sync")
}

if ok := cache.WaitForCacheSync(stopCh, pc.PodInformerSynced); !ok {
return fmt.Errorf("failed to wait for pod caches to sync")
}

if ok := cache.WaitForCacheSync(stopCh, pc.ServiceInformerSynced); !ok {
return fmt.Errorf("failed to wait for service caches to sync")
if ok := cache.WaitForCacheSync(stopCh, pc.jobInformerSynced,
pc.PodInformerSynced, pc.ServiceInformerSynced); !ok {
return fmt.Errorf("failed to wait for caches to sync")
}

log.Infof("Starting %v workers", threadiness)
// Launch workers to process PyTorchJob resources.
for i := 0; i < threadiness; i++ {
Expand All @@ -213,15 +206,25 @@ func (pc *PyTorchController) runWorker() {
// processNextWorkItem will read a single work item off the workqueue and
// attempt to process it, by calling the syncHandler.
func (pc *PyTorchController) processNextWorkItem() bool {
key, quit := pc.WorkQueue.Get()
obj, quit := pc.WorkQueue.Get()
if quit {
return false
}
defer pc.WorkQueue.Done(key)

logger := pylogger.LoggerForKey(key.(string))
defer pc.WorkQueue.Done(obj)

var key string
var ok bool
if key, ok = obj.(string); !ok {
// As the item in the workqueue is actually invalid, we call
// Forget here else we'd go into a loop of attempting to
// process a work item that is invalid.
pc.WorkQueue.Forget(obj)
utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
return true
}
logger := pylogger.LoggerForKey(key)

pytorchJob, err := pc.getPyTorchJobFromKey(key.(string))
pytorchJob, err := pc.getPyTorchJobFromKey(key)
if err != nil {
if err == errNotExists {
logger.Infof("PyTorchJob has been deleted: %v", key)
Expand All @@ -240,7 +243,7 @@ func (pc *PyTorchController) processNextWorkItem() bool {
}

// Sync PyTorchJob to mapch the actual state to this desired state.
forget, err := pc.syncHandler(key.(string))
forget, err := pc.syncHandler(key)
if err == nil {
if forget {
pc.WorkQueue.Forget(key)
Expand Down
6 changes: 1 addition & 5 deletions pkg/controller.v2/pytorch/informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,7 @@ func (pc *PyTorchController) getPyTorchJobFromKey(key string) (*v1alpha2.PyTorch
return nil, errNotExists
}

job, err := jobFromUnstructured(obj)
if err != nil {
return nil, err
}
return job, nil
return jobFromUnstructured(obj)
}

func jobFromUnstructured(obj interface{}) (*v1alpha2.PyTorchJob, error) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller.v2/pytorch/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
)

const (
failedMarshalPyTorchJobReason = "FailedInvalidPyTorchJobSpec"
failedMarshalPyTorchJobReason = "InvalidPyTorchJobSpec"
)

// When a pod is added, set the defaults and enqueue the current pytorchjob.
Expand Down

0 comments on commit 6d2b2ec

Please sign in to comment.