Skip to content

Commit

Permalink
Merge pull request kubernetes#93473 from Nordix/sched-fix-mael
Browse files Browse the repository at this point in the history
Change nodeInfolist building logic in scheduler
  • Loading branch information
k8s-ci-robot authored Sep 7, 2020
2 parents 93eec92 + fc5edb8 commit 2481e88
Show file tree
Hide file tree
Showing 4 changed files with 145 additions and 150 deletions.
8 changes: 5 additions & 3 deletions pkg/scheduler/internal/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,9 +279,11 @@ func (cache *schedulerCache) updateNodeInfoSnapshotList(snapshot *Snapshot, upda
if updateAll {
// Take a snapshot of the nodes order in the tree
snapshot.nodeInfoList = make([]*framework.NodeInfo, 0, cache.nodeTree.numNodes)
cache.nodeTree.resetExhausted()
for i := 0; i < cache.nodeTree.numNodes; i++ {
nodeName := cache.nodeTree.next()
nodesList, err := cache.nodeTree.list()
if err != nil {
klog.Error(err)
}
for _, nodeName := range nodesList {
if n := snapshot.nodeInfoMap[nodeName]; n != nil {
snapshot.nodeInfoList = append(snapshot.nodeInfoList, n)
if len(n.PodsWithAffinity) > 0 {
Expand Down
37 changes: 26 additions & 11 deletions pkg/scheduler/internal/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1090,7 +1090,11 @@ func TestNodeOperators(t *testing.T) {
if !found {
t.Errorf("Failed to find node %v in internalcache.", node.Name)
}
if cache.nodeTree.numNodes != 1 || cache.nodeTree.next() != node.Name {
nodesList, err := cache.nodeTree.list()
if err != nil {
t.Fatal(err)
}
if cache.nodeTree.numNodes != 1 || nodesList[len(nodesList)-1] != node.Name {
t.Errorf("cache.nodeTree is not updated correctly after adding node: %v", node.Name)
}

Expand Down Expand Up @@ -1134,7 +1138,11 @@ func TestNodeOperators(t *testing.T) {
t.Errorf("Failed to update node in schedulertypes:\n got: %+v \nexpected: %+v", got, expected)
}
// Check nodeTree after update
if cache.nodeTree.numNodes != 1 || cache.nodeTree.next() != node.Name {
nodesList, err = cache.nodeTree.list()
if err != nil {
t.Fatal(err)
}
if cache.nodeTree.numNodes != 1 || nodesList[len(nodesList)-1] != node.Name {
t.Errorf("unexpected cache.nodeTree after updating node: %v", node.Name)
}

Expand All @@ -1147,8 +1155,12 @@ func TestNodeOperators(t *testing.T) {
} else if n != nil {
t.Errorf("The node object for %v should be nil", node.Name)
}
// Check node is removed from nodeTree.
if cache.nodeTree.numNodes != 0 || cache.nodeTree.next() != "" {
// Check node is removed from nodeTree as well.
nodesList, err = cache.nodeTree.list()
if err != nil {
t.Fatal(err)
}
if cache.nodeTree.numNodes != 0 || len(nodesList) != 0 {
t.Errorf("unexpected cache.nodeTree after removing node: %v", node.Name)
}
// Pods are still in the pods cache.
Expand Down Expand Up @@ -1306,7 +1318,7 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) {
updateSnapshot := func() operation {
return func() {
cache.UpdateSnapshot(snapshot)
if err := compareCacheWithNodeInfoSnapshot(cache, snapshot); err != nil {
if err := compareCacheWithNodeInfoSnapshot(t, cache, snapshot); err != nil {
t.Error(err)
}
}
Expand Down Expand Up @@ -1487,14 +1499,14 @@ func TestSchedulerCache_UpdateSnapshot(t *testing.T) {
if err := cache.UpdateSnapshot(snapshot); err != nil {
t.Error(err)
}
if err := compareCacheWithNodeInfoSnapshot(cache, snapshot); err != nil {
if err := compareCacheWithNodeInfoSnapshot(t, cache, snapshot); err != nil {
t.Error(err)
}
})
}
}

func compareCacheWithNodeInfoSnapshot(cache *schedulerCache, snapshot *Snapshot) error {
func compareCacheWithNodeInfoSnapshot(t *testing.T, cache *schedulerCache, snapshot *Snapshot) error {
// Compare the map.
if len(snapshot.nodeInfoMap) != len(cache.nodes) {
return fmt.Errorf("unexpected number of nodes in the snapshot. Expected: %v, got: %v", len(cache.nodes), len(snapshot.nodeInfoMap))
Expand All @@ -1512,8 +1524,11 @@ func compareCacheWithNodeInfoSnapshot(cache *schedulerCache, snapshot *Snapshot)

expectedNodeInfoList := make([]*framework.NodeInfo, 0, cache.nodeTree.numNodes)
expectedHavePodsWithAffinityNodeInfoList := make([]*framework.NodeInfo, 0, cache.nodeTree.numNodes)
for i := 0; i < cache.nodeTree.numNodes; i++ {
nodeName := cache.nodeTree.next()
nodesList, err := cache.nodeTree.list()
if err != nil {
t.Fatal(err)
}
for _, nodeName := range nodesList {
if n := snapshot.nodeInfoMap[nodeName]; n != nil {
expectedNodeInfoList = append(expectedNodeInfoList, n)
if len(n.PodsWithAffinity) > 0 {
Expand Down Expand Up @@ -1576,7 +1591,7 @@ func TestSchedulerCache_updateNodeInfoSnapshotList(t *testing.T) {

updateSnapshot := func(t *testing.T) {
cache.updateNodeInfoSnapshotList(snapshot, true)
if err := compareCacheWithNodeInfoSnapshot(cache, snapshot); err != nil {
if err := compareCacheWithNodeInfoSnapshot(t, cache, snapshot); err != nil {
t.Error(err)
}
}
Expand Down Expand Up @@ -1672,7 +1687,7 @@ func TestSchedulerCache_updateNodeInfoSnapshotList(t *testing.T) {

// Always update the snapshot at the end of operations and compare it.
cache.updateNodeInfoSnapshotList(snapshot, true)
if err := compareCacheWithNodeInfoSnapshot(cache, snapshot); err != nil {
if err := compareCacheWithNodeInfoSnapshot(t, cache, snapshot); err != nil {
t.Error(err)
}
nodeNames := make([]string, len(snapshot.nodeInfoList))
Expand Down
87 changes: 30 additions & 57 deletions pkg/scheduler/internal/cache/node_tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ limitations under the License.
package cache

import (
"errors"
"fmt"

"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/klog/v2"
utilnode "k8s.io/kubernetes/pkg/util/node"
)
Expand All @@ -29,37 +30,15 @@ import (
// NodeTree is NOT thread-safe, any concurrent updates/reads from it must be synchronized by the caller.
// It is used only by schedulerCache, and should stay as such.
type nodeTree struct {
tree map[string]*nodeArray // a map from zone (region-zone) to an array of nodes in the zone.
zones []string // a list of all the zones in the tree (keys)
zoneIndex int
numNodes int
}

// nodeArray is a struct that has nodes that are in a zone.
// We use a slice (as opposed to a set/map) to store the nodes because iterating over the nodes is
// a lot more frequent than searching them by name.
type nodeArray struct {
nodes []string
lastIndex int
}

func (na *nodeArray) next() (nodeName string, exhausted bool) {
if len(na.nodes) == 0 {
klog.Error("The nodeArray is empty. It should have been deleted from NodeTree.")
return "", false
}
if na.lastIndex >= len(na.nodes) {
return "", true
}
nodeName = na.nodes[na.lastIndex]
na.lastIndex++
return nodeName, false
tree map[string][]string // a map from zone (region-zone) to an array of nodes in the zone.
zones []string // a list of all the zones in the tree (keys)
numNodes int
}

// newNodeTree creates a NodeTree from nodes.
func newNodeTree(nodes []*v1.Node) *nodeTree {
nt := &nodeTree{
tree: make(map[string]*nodeArray),
tree: make(map[string][]string),
}
for _, n := range nodes {
nt.addNode(n)
Expand All @@ -72,16 +51,16 @@ func newNodeTree(nodes []*v1.Node) *nodeTree {
func (nt *nodeTree) addNode(n *v1.Node) {
zone := utilnode.GetZoneKey(n)
if na, ok := nt.tree[zone]; ok {
for _, nodeName := range na.nodes {
for _, nodeName := range na {
if nodeName == n.Name {
klog.Warningf("node %q already exist in the NodeTree", n.Name)
return
}
}
na.nodes = append(na.nodes, n.Name)
nt.tree[zone] = append(na, n.Name)
} else {
nt.zones = append(nt.zones, zone)
nt.tree[zone] = &nodeArray{nodes: []string{n.Name}, lastIndex: 0}
nt.tree[zone] = []string{n.Name}
}
klog.V(2).Infof("Added node %q in group %q to NodeTree", n.Name, zone)
nt.numNodes++
Expand All @@ -91,10 +70,10 @@ func (nt *nodeTree) addNode(n *v1.Node) {
func (nt *nodeTree) removeNode(n *v1.Node) error {
zone := utilnode.GetZoneKey(n)
if na, ok := nt.tree[zone]; ok {
for i, nodeName := range na.nodes {
for i, nodeName := range na {
if nodeName == n.Name {
na.nodes = append(na.nodes[:i], na.nodes[i+1:]...)
if len(na.nodes) == 0 {
nt.tree[zone] = append(na[:i], na[i+1:]...)
if len(nt.tree[zone]) == 0 {
nt.removeZone(zone)
}
klog.V(2).Infof("Removed node %q in group %q from NodeTree", n.Name, zone)
Expand Down Expand Up @@ -135,36 +114,30 @@ func (nt *nodeTree) updateNode(old, new *v1.Node) {
nt.addNode(new)
}

func (nt *nodeTree) resetExhausted() {
for _, na := range nt.tree {
na.lastIndex = 0
}
nt.zoneIndex = 0
}

// next returns the name of the next node. NodeTree iterates over zones and in each zone iterates
// list returns the list of names of the node. NodeTree iterates over zones and in each zone iterates
// over nodes in a round robin fashion.
func (nt *nodeTree) next() string {
func (nt *nodeTree) list() ([]string, error) {
if len(nt.zones) == 0 {
return ""
return nil, nil
}
nodesList := make([]string, 0, nt.numNodes)
numExhaustedZones := 0
for {
if nt.zoneIndex >= len(nt.zones) {
nt.zoneIndex = 0
nodeIndex := 0
for len(nodesList) < nt.numNodes {
if numExhaustedZones >= len(nt.zones) { // all zones are exhausted.
return nodesList, errors.New("all zones exhausted before reaching count of nodes expected")
}
zone := nt.zones[nt.zoneIndex]
nt.zoneIndex++
// We do not check the exhausted zones before calling next() on the zone. This ensures
// that if more nodes are added to a zone after it is exhausted, we iterate over the new nodes.
nodeName, exhausted := nt.tree[zone].next()
if exhausted {
numExhaustedZones++
if numExhaustedZones >= len(nt.zones) { // all zones are exhausted. we should reset.
nt.resetExhausted()
for zoneIndex := 0; zoneIndex < len(nt.zones); zoneIndex++ {
na := nt.tree[nt.zones[zoneIndex]]
if nodeIndex >= len(na) { // If the zone is exhausted, continue
if nodeIndex == len(na) { // If it is the first time the zone is exhausted
numExhaustedZones++
}
continue
}
} else {
return nodeName
nodesList = append(nodesList, na[nodeIndex])
}
nodeIndex++
}
return nodesList, nil
}
Loading

0 comments on commit 2481e88

Please sign in to comment.