diff --git a/plugin/pkg/scheduler/algorithm/predicates/predicates.go b/plugin/pkg/scheduler/algorithm/predicates/predicates.go index 81085b72a7b15..53ed83e1d34c8 100644 --- a/plugin/pkg/scheduler/algorithm/predicates/predicates.go +++ b/plugin/pkg/scheduler/algorithm/predicates/predicates.go @@ -91,8 +91,8 @@ func PredicateMetadata(pod *api.Pod, nodeInfoMap map[string]*schedulercache.Node } return &predicateMetadata{ podBestEffort: isPodBestEffort(pod), - podRequest: getResourceRequest(pod), - podPorts: getUsedPorts(pod), + podRequest: GetResourceRequest(pod), + podPorts: GetUsedPorts(pod), matchingAntiAffinityTerms: matchingTerms, } } @@ -417,7 +417,7 @@ func (c *VolumeZoneChecker) predicate(pod *api.Pod, meta interface{}, nodeInfo * return true, nil, nil } -func getResourceRequest(pod *api.Pod) *schedulercache.Resource { +func GetResourceRequest(pod *api.Pod) *schedulercache.Resource { result := schedulercache.Resource{} for _, container := range pod.Spec.Containers { requests := container.Resources.Requests @@ -459,7 +459,7 @@ func PodFitsResources(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.N podRequest = predicateMeta.podRequest } else { // We couldn't parse metadata - fallback to computing it. - podRequest = getResourceRequest(pod) + podRequest = GetResourceRequest(pod) } if podRequest.MilliCPU == 0 && podRequest.Memory == 0 && podRequest.NvidiaGPU == 0 { return len(predicateFails) == 0, predicateFails, nil @@ -702,14 +702,14 @@ func PodFitsHostPorts(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.N wantPorts = predicateMeta.podPorts } else { // We couldn't parse metadata - fallback to computing it. - wantPorts = getUsedPorts(pod) + wantPorts = GetUsedPorts(pod) } if len(wantPorts) == 0 { return true, nil, nil } // TODO: Aggregate it at the NodeInfo level. - existingPorts := getUsedPorts(nodeInfo.Pods()...) + existingPorts := GetUsedPorts(nodeInfo.Pods()...) for wport := range wantPorts { if wport != 0 && existingPorts[wport] { return false, []algorithm.PredicateFailureReason{ErrPodNotFitsHostPorts}, nil @@ -718,7 +718,7 @@ func PodFitsHostPorts(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.N return true, nil, nil } -func getUsedPorts(pods ...*api.Pod) map[int]bool { +func GetUsedPorts(pods ...*api.Pod) map[int]bool { ports := make(map[int]bool) for _, pod := range pods { for j := range pod.Spec.Containers { diff --git a/plugin/pkg/scheduler/algorithm/predicates/predicates_test.go b/plugin/pkg/scheduler/algorithm/predicates/predicates_test.go index 4e222bf3c6f05..7c308e7ae7782 100755 --- a/plugin/pkg/scheduler/algorithm/predicates/predicates_test.go +++ b/plugin/pkg/scheduler/algorithm/predicates/predicates_test.go @@ -467,7 +467,7 @@ func TestGetUsedPorts(t *testing.T) { } for _, test := range tests { - ports := getUsedPorts(test.pods...) + ports := GetUsedPorts(test.pods...) if !reflect.DeepEqual(test.ports, ports) { t.Errorf("%s: expected %v, got %v", "test get used ports", test.ports, ports) } diff --git a/plugin/pkg/scheduler/factory/factory.go b/plugin/pkg/scheduler/factory/factory.go index 3997c09a9b436..3cc793a054ea7 100644 --- a/plugin/pkg/scheduler/factory/factory.go +++ b/plugin/pkg/scheduler/factory/factory.go @@ -78,6 +78,10 @@ type ConfigFactory struct { scheduledPodPopulator *cache.Controller nodePopulator *cache.Controller + pvPopulator *cache.Controller + pvcPopulator *cache.Controller + servicePopulator *cache.Controller + controllerPopulator *cache.Controller schedulerCache schedulercache.Cache @@ -93,6 +97,9 @@ type ConfigFactory struct { // Indicate the "all topologies" set for empty topologyKey when it's used for PreferredDuringScheduling pod anti-affinity. FailureDomains string + + // Equivalence class cache + EquivalencePodCache *scheduler.EquivalenceCache } // Initializes the factory. @@ -147,15 +154,48 @@ func NewConfigFactory(client clientset.Interface, schedulerName string, hardPodA }, ) + // TODO(harryz) need to fill all the handlers here and below for equivalence cache + c.PVLister.Store, c.pvPopulator = cache.NewInformer( + c.createPersistentVolumeLW(), + &api.PersistentVolume{}, + 0, + cache.ResourceEventHandlerFuncs{}, + ) + + c.PVCLister.Store, c.pvcPopulator = cache.NewInformer( + c.createPersistentVolumeClaimLW(), + &api.PersistentVolumeClaim{}, + 0, + cache.ResourceEventHandlerFuncs{}, + ) + + c.ServiceLister.Indexer, c.servicePopulator = cache.NewIndexerInformer( + c.createServiceLW(), + &api.Service{}, + 0, + cache.ResourceEventHandlerFuncs{}, + cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, + ) + + c.ControllerLister.Indexer, c.controllerPopulator = cache.NewIndexerInformer( + c.createControllerLW(), + &api.ReplicationController{}, + 0, + cache.ResourceEventHandlerFuncs{}, + cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, + ) + return c } +// TODO(harryz) need to update all the handlers here and below for equivalence cache func (c *ConfigFactory) addPodToCache(obj interface{}) { pod, ok := obj.(*api.Pod) if !ok { glog.Errorf("cannot convert to *api.Pod: %v", obj) return } + if err := c.schedulerCache.AddPod(pod); err != nil { glog.Errorf("scheduler cache AddPod failed: %v", err) } @@ -172,6 +212,7 @@ func (c *ConfigFactory) updatePodInCache(oldObj, newObj interface{}) { glog.Errorf("cannot convert newObj to *api.Pod: %v", newObj) return } + if err := c.schedulerCache.UpdatePod(oldPod, newPod); err != nil { glog.Errorf("scheduler cache UpdatePod failed: %v", err) } @@ -204,6 +245,7 @@ func (c *ConfigFactory) addNodeToCache(obj interface{}) { glog.Errorf("cannot convert to *api.Node: %v", obj) return } + if err := c.schedulerCache.AddNode(node); err != nil { glog.Errorf("scheduler cache AddNode failed: %v", err) } @@ -220,6 +262,7 @@ func (c *ConfigFactory) updateNodeInCache(oldObj, newObj interface{}) { glog.Errorf("cannot convert newObj to *api.Node: %v", newObj) return } + if err := c.schedulerCache.UpdateNode(oldNode, newNode); err != nil { glog.Errorf("scheduler cache UpdateNode failed: %v", err) } @@ -407,20 +450,15 @@ func (f *ConfigFactory) Run() { // Begin populating nodes. go f.nodePopulator.Run(f.StopEverything) - // Watch PVs & PVCs - // They may be listed frequently for scheduling constraints, so provide a local up-to-date cache. - cache.NewReflector(f.createPersistentVolumeLW(), &api.PersistentVolume{}, f.PVLister.Store, 0).RunUntil(f.StopEverything) - cache.NewReflector(f.createPersistentVolumeClaimLW(), &api.PersistentVolumeClaim{}, f.PVCLister.Store, 0).RunUntil(f.StopEverything) + // Begin populating pv & pvc + go f.pvPopulator.Run(f.StopEverything) + go f.pvcPopulator.Run(f.StopEverything) - // Watch and cache all service objects. Scheduler needs to find all pods - // created by the same services or ReplicationControllers/ReplicaSets, so that it can spread them correctly. - // Cache this locally. - cache.NewReflector(f.createServiceLW(), &api.Service{}, f.ServiceLister.Indexer, 0).RunUntil(f.StopEverything) + // Begin populating services + go f.servicePopulator.Run(f.StopEverything) - // Watch and cache all ReplicationController objects. Scheduler needs to find all pods - // created by the same services or ReplicationControllers/ReplicaSets, so that it can spread them correctly. - // Cache this locally. - cache.NewReflector(f.createControllerLW(), &api.ReplicationController{}, f.ControllerLister.Indexer, 0).RunUntil(f.StopEverything) + // Begin populating controllers + go f.controllerPopulator.Run(f.StopEverything) // Watch and cache all ReplicaSet objects. Scheduler needs to find all pods // created by the same services or ReplicationControllers/ReplicaSets, so that it can spread them correctly. diff --git a/plugin/pkg/scheduler/generic_scheduler.go b/plugin/pkg/scheduler/generic_scheduler.go index 8f2e25e17b632..3a3156871122d 100644 --- a/plugin/pkg/scheduler/generic_scheduler.go +++ b/plugin/pkg/scheduler/generic_scheduler.go @@ -71,6 +71,8 @@ type genericScheduler struct { lastNodeIndex uint64 cachedNodeInfoMap map[string]*schedulercache.NodeInfo + + equivalenceCache *EquivalenceCache } // Schedule tries to schedule the given pod to one of node in the node list. @@ -99,6 +101,8 @@ func (g *genericScheduler) Schedule(pod *api.Pod, nodeLister algorithm.NodeListe return "", err } + // TODO(harryz) Check if equivalenceCache is enabled and call scheduleWithEquivalenceClass here + trace.Step("Computing predicates") filteredNodes, failedPredicateMap, err := findNodesThatFit(pod, g.cachedNodeInfoMap, nodes, g.predicates, g.extenders) if err != nil { diff --git a/plugin/pkg/scheduler/generic_scheduler_test.go b/plugin/pkg/scheduler/generic_scheduler_test.go index b9ffd87b63102..9c651f4751098 100644 --- a/plugin/pkg/scheduler/generic_scheduler_test.go +++ b/plugin/pkg/scheduler/generic_scheduler_test.go @@ -300,6 +300,7 @@ func TestGenericScheduler(t *testing.T) { for _, name := range test.nodes { cache.AddNode(&api.Node{ObjectMeta: api.ObjectMeta{Name: name}}) } + scheduler := NewGenericScheduler( cache, test.predicates, algorithm.EmptyMetadataProducer, test.prioritizers, []algorithm.SchedulerExtender{})