Skip to content

Commit

Permalink
Merge pull request kubernetes#34685 from resouer/eclass-2
Browse files Browse the repository at this point in the history
Automatic merge from submit-queue

Refactor scheduler to enable switch on equivalence cache

Part 2 of kubernetes#30844 

Refactoring to enable easier switch on of equivalence cache.
  • Loading branch information
Kubernetes Submit Queue authored Oct 18, 2016
2 parents 27a7c01 + 50eaeaa commit bcbdcd1
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 20 deletions.
14 changes: 7 additions & 7 deletions plugin/pkg/scheduler/algorithm/predicates/predicates.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ func PredicateMetadata(pod *api.Pod, nodeInfoMap map[string]*schedulercache.Node
}
return &predicateMetadata{
podBestEffort: isPodBestEffort(pod),
podRequest: getResourceRequest(pod),
podPorts: getUsedPorts(pod),
podRequest: GetResourceRequest(pod),
podPorts: GetUsedPorts(pod),
matchingAntiAffinityTerms: matchingTerms,
}
}
Expand Down Expand Up @@ -417,7 +417,7 @@ func (c *VolumeZoneChecker) predicate(pod *api.Pod, meta interface{}, nodeInfo *
return true, nil, nil
}

func getResourceRequest(pod *api.Pod) *schedulercache.Resource {
func GetResourceRequest(pod *api.Pod) *schedulercache.Resource {
result := schedulercache.Resource{}
for _, container := range pod.Spec.Containers {
requests := container.Resources.Requests
Expand Down Expand Up @@ -459,7 +459,7 @@ func PodFitsResources(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.N
podRequest = predicateMeta.podRequest
} else {
// We couldn't parse metadata - fallback to computing it.
podRequest = getResourceRequest(pod)
podRequest = GetResourceRequest(pod)
}
if podRequest.MilliCPU == 0 && podRequest.Memory == 0 && podRequest.NvidiaGPU == 0 {
return len(predicateFails) == 0, predicateFails, nil
Expand Down Expand Up @@ -702,14 +702,14 @@ func PodFitsHostPorts(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.N
wantPorts = predicateMeta.podPorts
} else {
// We couldn't parse metadata - fallback to computing it.
wantPorts = getUsedPorts(pod)
wantPorts = GetUsedPorts(pod)
}
if len(wantPorts) == 0 {
return true, nil, nil
}

// TODO: Aggregate it at the NodeInfo level.
existingPorts := getUsedPorts(nodeInfo.Pods()...)
existingPorts := GetUsedPorts(nodeInfo.Pods()...)
for wport := range wantPorts {
if wport != 0 && existingPorts[wport] {
return false, []algorithm.PredicateFailureReason{ErrPodNotFitsHostPorts}, nil
Expand All @@ -718,7 +718,7 @@ func PodFitsHostPorts(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.N
return true, nil, nil
}

func getUsedPorts(pods ...*api.Pod) map[int]bool {
func GetUsedPorts(pods ...*api.Pod) map[int]bool {
ports := make(map[int]bool)
for _, pod := range pods {
for j := range pod.Spec.Containers {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,7 @@ func TestGetUsedPorts(t *testing.T) {
}

for _, test := range tests {
ports := getUsedPorts(test.pods...)
ports := GetUsedPorts(test.pods...)
if !reflect.DeepEqual(test.ports, ports) {
t.Errorf("%s: expected %v, got %v", "test get used ports", test.ports, ports)
}
Expand Down
62 changes: 50 additions & 12 deletions plugin/pkg/scheduler/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ type ConfigFactory struct {

scheduledPodPopulator *cache.Controller
nodePopulator *cache.Controller
pvPopulator *cache.Controller
pvcPopulator *cache.Controller
servicePopulator *cache.Controller
controllerPopulator *cache.Controller

schedulerCache schedulercache.Cache

Expand All @@ -93,6 +97,9 @@ type ConfigFactory struct {

// Indicate the "all topologies" set for empty topologyKey when it's used for PreferredDuringScheduling pod anti-affinity.
FailureDomains string

// Equivalence class cache
EquivalencePodCache *scheduler.EquivalenceCache
}

// Initializes the factory.
Expand Down Expand Up @@ -147,15 +154,48 @@ func NewConfigFactory(client clientset.Interface, schedulerName string, hardPodA
},
)

// TODO(harryz) need to fill all the handlers here and below for equivalence cache
c.PVLister.Store, c.pvPopulator = cache.NewInformer(
c.createPersistentVolumeLW(),
&api.PersistentVolume{},
0,
cache.ResourceEventHandlerFuncs{},
)

c.PVCLister.Store, c.pvcPopulator = cache.NewInformer(
c.createPersistentVolumeClaimLW(),
&api.PersistentVolumeClaim{},
0,
cache.ResourceEventHandlerFuncs{},
)

c.ServiceLister.Indexer, c.servicePopulator = cache.NewIndexerInformer(
c.createServiceLW(),
&api.Service{},
0,
cache.ResourceEventHandlerFuncs{},
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)

c.ControllerLister.Indexer, c.controllerPopulator = cache.NewIndexerInformer(
c.createControllerLW(),
&api.ReplicationController{},
0,
cache.ResourceEventHandlerFuncs{},
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)

return c
}

// TODO(harryz) need to update all the handlers here and below for equivalence cache
func (c *ConfigFactory) addPodToCache(obj interface{}) {
pod, ok := obj.(*api.Pod)
if !ok {
glog.Errorf("cannot convert to *api.Pod: %v", obj)
return
}

if err := c.schedulerCache.AddPod(pod); err != nil {
glog.Errorf("scheduler cache AddPod failed: %v", err)
}
Expand All @@ -172,6 +212,7 @@ func (c *ConfigFactory) updatePodInCache(oldObj, newObj interface{}) {
glog.Errorf("cannot convert newObj to *api.Pod: %v", newObj)
return
}

if err := c.schedulerCache.UpdatePod(oldPod, newPod); err != nil {
glog.Errorf("scheduler cache UpdatePod failed: %v", err)
}
Expand Down Expand Up @@ -204,6 +245,7 @@ func (c *ConfigFactory) addNodeToCache(obj interface{}) {
glog.Errorf("cannot convert to *api.Node: %v", obj)
return
}

if err := c.schedulerCache.AddNode(node); err != nil {
glog.Errorf("scheduler cache AddNode failed: %v", err)
}
Expand All @@ -220,6 +262,7 @@ func (c *ConfigFactory) updateNodeInCache(oldObj, newObj interface{}) {
glog.Errorf("cannot convert newObj to *api.Node: %v", newObj)
return
}

if err := c.schedulerCache.UpdateNode(oldNode, newNode); err != nil {
glog.Errorf("scheduler cache UpdateNode failed: %v", err)
}
Expand Down Expand Up @@ -407,20 +450,15 @@ func (f *ConfigFactory) Run() {
// Begin populating nodes.
go f.nodePopulator.Run(f.StopEverything)

// Watch PVs & PVCs
// They may be listed frequently for scheduling constraints, so provide a local up-to-date cache.
cache.NewReflector(f.createPersistentVolumeLW(), &api.PersistentVolume{}, f.PVLister.Store, 0).RunUntil(f.StopEverything)
cache.NewReflector(f.createPersistentVolumeClaimLW(), &api.PersistentVolumeClaim{}, f.PVCLister.Store, 0).RunUntil(f.StopEverything)
// Begin populating pv & pvc
go f.pvPopulator.Run(f.StopEverything)
go f.pvcPopulator.Run(f.StopEverything)

// Watch and cache all service objects. Scheduler needs to find all pods
// created by the same services or ReplicationControllers/ReplicaSets, so that it can spread them correctly.
// Cache this locally.
cache.NewReflector(f.createServiceLW(), &api.Service{}, f.ServiceLister.Indexer, 0).RunUntil(f.StopEverything)
// Begin populating services
go f.servicePopulator.Run(f.StopEverything)

// Watch and cache all ReplicationController objects. Scheduler needs to find all pods
// created by the same services or ReplicationControllers/ReplicaSets, so that it can spread them correctly.
// Cache this locally.
cache.NewReflector(f.createControllerLW(), &api.ReplicationController{}, f.ControllerLister.Indexer, 0).RunUntil(f.StopEverything)
// Begin populating controllers
go f.controllerPopulator.Run(f.StopEverything)

// Watch and cache all ReplicaSet objects. Scheduler needs to find all pods
// created by the same services or ReplicationControllers/ReplicaSets, so that it can spread them correctly.
Expand Down
4 changes: 4 additions & 0 deletions plugin/pkg/scheduler/generic_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ type genericScheduler struct {
lastNodeIndex uint64

cachedNodeInfoMap map[string]*schedulercache.NodeInfo

equivalenceCache *EquivalenceCache
}

// Schedule tries to schedule the given pod to one of node in the node list.
Expand Down Expand Up @@ -99,6 +101,8 @@ func (g *genericScheduler) Schedule(pod *api.Pod, nodeLister algorithm.NodeListe
return "", err
}

// TODO(harryz) Check if equivalenceCache is enabled and call scheduleWithEquivalenceClass here

trace.Step("Computing predicates")
filteredNodes, failedPredicateMap, err := findNodesThatFit(pod, g.cachedNodeInfoMap, nodes, g.predicates, g.extenders)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions plugin/pkg/scheduler/generic_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,7 @@ func TestGenericScheduler(t *testing.T) {
for _, name := range test.nodes {
cache.AddNode(&api.Node{ObjectMeta: api.ObjectMeta{Name: name}})
}

scheduler := NewGenericScheduler(
cache, test.predicates, algorithm.EmptyMetadataProducer,
test.prioritizers, []algorithm.SchedulerExtender{})
Expand Down

0 comments on commit bcbdcd1

Please sign in to comment.