Skip to content

Commit

Permalink
Evenly spread volumes of a StatefuleSet across nodes based on topology
Browse files Browse the repository at this point in the history
Signed-off-by: Deep Debroy <ddebroy@docker.com>
  • Loading branch information
ddebroy committed Oct 26, 2018
1 parent 7be433e commit 14cd3e4
Show file tree
Hide file tree
Showing 3 changed files with 352 additions and 12 deletions.
1 change: 1 addition & 0 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,7 @@ func (p *csiProvisioner) Provision(options controller.VolumeOptions) (*v1.Persis
p.client,
p.csiAPIClient,
driverState.driverName,
options.PVC.Name,
options.AllowedTopologies,
options.SelectedNode)
if err != nil {
Expand Down
87 changes: 75 additions & 12 deletions pkg/controller/topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@ import (
"fmt"
"github.com/container-storage-interface/spec/lib/go/csi/v0"
"github.com/golang/glog"
"hash/fnv"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
csiv1alpha1 "k8s.io/csi-api/pkg/apis/csi/v1alpha1"
csiclientset "k8s.io/csi-api/pkg/client/clientset/versioned"
"math/rand"
"sort"
"strconv"
"strings"
)

Expand Down Expand Up @@ -68,6 +70,7 @@ func GenerateAccessibilityRequirements(
kubeClient kubernetes.Interface,
csiAPIClient csiclientset.Interface,
driverName string,
pvcName string,
allowedTopologies []v1.TopologySelectorTerm,
selectedNode *v1.Node) (*csi.TopologyRequirement, error) {
requirement := &csi.TopologyRequirement{}
Expand Down Expand Up @@ -96,7 +99,15 @@ func GenerateAccessibilityRequirements(
requirement.Requisite = toCSITopology(requisiteTerms)

/* Preferred */
if selectedNode != nil {
var preferredTerms []topologyTerm
if selectedNode == nil {
// no node selected therefore ensure even spreading of StatefulSet volumes by sorting
// requisiteTerms and shifting the sorted terms based on hash of pvcName and replica index suffix
hash, index := getPVCNameHashAndIndexOffset(pvcName)
i := (hash + index) % uint32(len(requisiteTerms))
preferredTerms = sortAndShift(requisiteTerms, nil, i)
} else {
// selectedNode is set so use topology from that node to populate preferredTerms
// TODO (verult) reuse selected node info from aggregateTopologies
// TODO (verult) retry
nodeInfo, err := csiAPIClient.CsiV1alpha1().CSINodeInfos().Get(selectedNode.Name, metav1.GetOptions{})
Expand All @@ -110,7 +121,7 @@ func GenerateAccessibilityRequirements(
return nil, fmt.Errorf("topology labels from selected node %v does not match topology keys from CSINodeInfo %v", selectedNode.Labels, topologyKeys)
}

preferredTerms := sortAndShift(requisiteTerms, selectedTopology)
preferredTerms = sortAndShift(requisiteTerms, selectedTopology, 0)
if preferredTerms == nil {
// Topology from selected node is not in requisite. This case should never be hit:
// - If AllowedTopologies is specified, the scheduler should choose a node satisfying the
Expand All @@ -119,10 +130,8 @@ func GenerateAccessibilityRequirements(
// selected node.
return nil, fmt.Errorf("topology %v from selected node %q is not in requisite", selectedTopology, selectedNode.Name)
}

requirement.Preferred = toCSITopology(preferredTerms)
}

requirement.Preferred = toCSITopology(preferredTerms)
return requirement, nil
}

Expand Down Expand Up @@ -267,17 +276,21 @@ func deduplicate(terms []topologyTerm) []topologyTerm {
}

// Sort the given terms in place,
// then return a new list of terms equivalent to the sorted terms, but shifted so that the primary
// term is the first in the list.
func sortAndShift(terms []topologyTerm, primary topologyTerm) []topologyTerm {
// then return a new list of terms equivalent to the sorted terms, but shifted so that
// either the primary term (if specified) or term at shiftIndex is the first in the list.
func sortAndShift(terms []topologyTerm, primary topologyTerm, shiftIndex uint32) []topologyTerm {
var preferredTerms []topologyTerm
sort.Slice(terms, func(i, j int) bool {
return terms[i].less(terms[j])
})
for i, t := range terms {
if t.equal(primary) {
preferredTerms = append(terms[i:], terms[:i]...)
break
if primary == nil {
preferredTerms = append(terms[shiftIndex:], terms[:shiftIndex]...)
} else {
for i, t := range terms {
if t.equal(primary) {
preferredTerms = append(terms[i:], terms[:i]...)
break
}
}
}
return preferredTerms
Expand Down Expand Up @@ -367,3 +380,53 @@ func toCSITopology(terms []topologyTerm) []*csi.Topology {
}
return out
}

// identical to logic in getPVCNameHashAndIndexOffset in pkg/volume/util/util.go in-tree
// [https://github.com/kubernetes/kubernetes/blob/master/pkg/volume/util/util.go]
func getPVCNameHashAndIndexOffset(pvcName string) (hash uint32, index uint32) {
if pvcName == "" {
// We should always be called with a name; this shouldn't happen
hash = rand.Uint32()
} else {
hashString := pvcName

// Heuristic to make sure that volumes in a StatefulSet are spread across zones
// StatefulSet PVCs are (currently) named ClaimName-StatefulSetName-Id,
// where Id is an integer index.
// Note though that if a StatefulSet pod has multiple claims, we need them to be
// in the same zone, because otherwise the pod will be unable to mount both volumes,
// and will be unschedulable. So we hash _only_ the "StatefulSetName" portion when
// it looks like `ClaimName-StatefulSetName-Id`.
// We continue to round-robin volume names that look like `Name-Id` also; this is a useful
// feature for users that are creating statefulset-like functionality without using statefulsets.
lastDash := strings.LastIndexByte(pvcName, '-')
if lastDash != -1 {
statefulsetIDString := pvcName[lastDash+1:]
statefulsetID, err := strconv.ParseUint(statefulsetIDString, 10, 32)
if err == nil {
// Offset by the statefulsetID, so we round-robin across zones
index = uint32(statefulsetID)
// We still hash the volume name, but only the prefix
hashString = pvcName[:lastDash]

// In the special case where it looks like `ClaimName-StatefulSetName-Id`,
// hash only the StatefulSetName, so that different claims on the same StatefulSet
// member end up in the same zone.
// Note that StatefulSetName (and ClaimName) might themselves both have dashes.
// We actually just take the portion after the final - of ClaimName-StatefulSetName.
// For our purposes it doesn't much matter (just suboptimal spreading).
lastDash := strings.LastIndexByte(hashString, '-')
if lastDash != -1 {
hashString = hashString[lastDash+1:]
}
}
}

// We hash the (base) volume name, so we don't bias towards the first N zones
h := fnv.New32()
h.Write([]byte(hashString))
hash = h.Sum32()
}

return hash, index
}
Loading

0 comments on commit 14cd3e4

Please sign in to comment.