Skip to content

Add node-group selector list field to API specs #2098

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Apr 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/workloads/async/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ compute:
cpu: <string | int | float> # CPU request per replica. One unit of CPU corresponds to one virtual CPU; fractional requests are allowed, and can be specified as a floating point number or via the "m" suffix (default: 200m)
gpu: <int> # GPU request per replica. One unit of GPU corresponds to one virtual GPU (default: 0)
mem: <string> # memory request per replica. One unit of memory is one byte and can be expressed as an integer or by using one of these suffixes: K, M, G, T (or their power-of two counterparts: Ki, Mi, Gi, Ti) (default: Null)
node_groups: <list:string> # to select specific node groups (optional)
```

## Autoscaling
Expand Down
1 change: 1 addition & 0 deletions docs/workloads/batch/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ compute:
cpu: <string | int | float> # CPU request per worker. One unit of CPU corresponds to one virtual CPU; fractional requests are allowed, and can be specified as a floating point number or via the "m" suffix (default: 200m)
gpu: <int> # GPU request per worker. One unit of GPU corresponds to one virtual GPU (default: 0)
mem: <string> # memory request per worker. One unit of memory is one byte and can be expressed as an integer or by using one of these suffixes: K, M, G, T (or their power-of two counterparts: Ki, Mi, Gi, Ti) (default: Null)
node_groups: <list:string> # to select specific node groups (optional)
```

## Networking
Expand Down
1 change: 1 addition & 0 deletions docs/workloads/realtime/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ compute:
cpu: <string | int | float> # CPU request per replica. One unit of CPU corresponds to one virtual CPU; fractional requests are allowed, and can be specified as a floating point number or via the "m" suffix (default: 200m)
gpu: <int> # GPU request per replica. One unit of GPU corresponds to one virtual GPU (default: 0)
mem: <string> # memory request per replica. One unit of memory is one byte and can be expressed as an integer or by using one of these suffixes: K, M, G, T (or their power-of two counterparts: Ki, Mi, Gi, Ti) (default: Null)
node_groups: <list:string> # to select specific node groups (optional)
```

## Autoscaling
Expand Down
1 change: 1 addition & 0 deletions docs/workloads/task/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,5 @@
gpu: <int> # GPU request per worker. One unit of GPU corresponds to one virtual GPU (default: 0)
inf: <int> # Inferentia request per worker. One unit corresponds to one Inferentia ASIC with 4 NeuronCores and 8GB of cache memory. Each process will have one NeuronCore Group with (4 * inf / processes_per_replica) NeuronCores, so your model should be compiled to run on (4 * inf / processes_per_replica) NeuronCores. (default: 0)
mem: <string> # memory request per worker. One unit of memory is one byte and can be expressed as an integer or by using one of these suffixes: K, M, G, T (or their power-of two counterparts: Ki, Mi, Gi, Ti) (default: Null)
node_groups: <list:string> # to select specific node groups (optional)
```
37 changes: 24 additions & 13 deletions pkg/lib/configreader/string_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,24 @@ import (
"github.com/cortexlabs/cortex/pkg/lib/cast"
"github.com/cortexlabs/cortex/pkg/lib/errors"
"github.com/cortexlabs/cortex/pkg/lib/slices"
s "github.com/cortexlabs/cortex/pkg/lib/strings"
)

type StringListValidation struct {
Required bool
Default []string
AllowExplicitNull bool
AllowEmpty bool
CantBeSpecifiedErrStr *string
CastSingleItem bool
DisallowDups bool
MinLength int
MaxLength int
InvalidLengths []int
AllowCortexResources bool
RequireCortexResources bool
Validator func([]string) ([]string, error)
Required bool
Default []string
AllowExplicitNull bool
AllowEmpty bool
CantBeSpecifiedErrStr *string
CastSingleItem bool
DisallowDups bool
MinLength int
MaxLength int
InvalidLengths []int
AllowCortexResources bool
RequireCortexResources bool
ElementStringValidation *StringValidation // Required, Default, AllowEmpty, TreatNullAsEmpty & Validator fields not applicable here
Validator func([]string) ([]string, error)
}

func StringList(inter interface{}, v *StringListValidation) ([]string, error) {
Expand Down Expand Up @@ -129,6 +131,15 @@ func validateStringList(val []string, v *StringListValidation) ([]string, error)
}
}

if v.ElementStringValidation != nil {
for i, element := range val {
err := ValidateStringVal(element, v.ElementStringValidation)
if err != nil {
return nil, errors.Wrap(err, s.Index(i))
}
}
}

if v.Validator != nil {
return v.Validator(val)
}
Expand Down
53 changes: 47 additions & 6 deletions pkg/operator/operator/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
s "github.com/cortexlabs/cortex/pkg/lib/strings"
"github.com/cortexlabs/cortex/pkg/lib/urls"
"github.com/cortexlabs/cortex/pkg/operator/config"
"github.com/cortexlabs/cortex/pkg/types/clusterconfig"
"github.com/cortexlabs/cortex/pkg/types/spec"
"github.com/cortexlabs/cortex/pkg/types/userconfig"
kcore "k8s.io/api/core/v1"
Expand Down Expand Up @@ -1270,18 +1271,35 @@ func GenerateResourceTolerations() []kcore.Toleration {
return tolerations
}

func GeneratePreferredNodeAffinities() []kcore.PreferredSchedulingTerm {
affinities := []kcore.PreferredSchedulingTerm{}
func GenerateNodeAffinities(apiNodeGroups []string) *kcore.Affinity {
// node groups are ordered according to how the cluster config node groups are ordered
var nodeGroups []*clusterconfig.NodeGroup
for _, clusterNodeGroup := range config.ManagedConfig.NodeGroups {
for _, apiNodeGroupName := range apiNodeGroups {
if clusterNodeGroup.Name == apiNodeGroupName {
nodeGroups = append(nodeGroups, clusterNodeGroup)
}
}
}

numNodeGroups := len(apiNodeGroups)
if apiNodeGroups == nil {
nodeGroups = config.ManagedConfig.NodeGroups
numNodeGroups = len(config.ManagedConfig.NodeGroups)
}

requiredNodeGroups := []string{}
preferredAffinities := []kcore.PreferredSchedulingTerm{}

numNodeGroups := len(config.ManagedConfig.NodeGroups)
for idx, nodeGroup := range config.ManagedConfig.NodeGroups {
for idx, nodeGroup := range nodeGroups {
var nodeGroupPrefix string
if nodeGroup.Spot {
nodeGroupPrefix = "cx-ws-"
} else {
nodeGroupPrefix = "cx-wd-"
}
affinities = append(affinities, kcore.PreferredSchedulingTerm{

preferredAffinities = append(preferredAffinities, kcore.PreferredSchedulingTerm{
Weight: int32(100 * (1 - float64(idx)/float64(numNodeGroups))),
Preference: kcore.NodeSelectorTerm{
MatchExpressions: []kcore.NodeSelectorRequirement{
Expand All @@ -1293,9 +1311,32 @@ func GeneratePreferredNodeAffinities() []kcore.PreferredSchedulingTerm {
},
},
})
requiredNodeGroups = append(requiredNodeGroups, nodeGroupPrefix+nodeGroup.Name)
}

return affinities
var requiredNodeSelector *kcore.NodeSelector
if apiNodeGroups != nil {
requiredNodeSelector = &kcore.NodeSelector{
NodeSelectorTerms: []kcore.NodeSelectorTerm{
{
MatchExpressions: []kcore.NodeSelectorRequirement{
{
Key: "alpha.eksctl.io/nodegroup-name",
Operator: kcore.NodeSelectorOpIn,
Values: requiredNodeGroups,
},
},
},
},
}
}

return &kcore.Affinity{
NodeAffinity: &kcore.NodeAffinity{
PreferredDuringSchedulingIgnoredDuringExecution: preferredAffinities,
RequiredDuringSchedulingIgnoredDuringExecution: requiredNodeSelector,
},
}
}

func K8sName(apiName string) string {
Expand Down
20 changes: 7 additions & 13 deletions pkg/operator/resources/asyncapi/k8s_specs.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ var _gatewayHPATargetMemUtilization int32 = 80 // percentage

func gatewayDeploymentSpec(api spec.API, prevDeployment *kapps.Deployment, queueURL string) kapps.Deployment {
container := operator.AsyncGatewayContainers(api, queueURL)

return *k8s.Deployment(&k8s.DeploymentSpec{
Name: getGatewayK8sName(api.Name),
Replicas: 1,
Expand Down Expand Up @@ -71,11 +72,8 @@ func gatewayDeploymentSpec(api spec.API, prevDeployment *kapps.Deployment, queue
Containers: []kcore.Container{container},
NodeSelector: operator.NodeSelectors(),
Tolerations: operator.GenerateResourceTolerations(),
Affinity: &kcore.Affinity{
NodeAffinity: &kcore.NodeAffinity{
PreferredDuringSchedulingIgnoredDuringExecution: operator.GeneratePreferredNodeAffinities(),
},
}, ServiceAccountName: operator.ServiceAccountName,
Affinity: operator.GenerateNodeAffinities(api.Compute.NodeGroups),
ServiceAccountName: operator.ServiceAccountName,
},
},
})
Expand Down Expand Up @@ -207,14 +205,10 @@ func apiDeploymentSpec(api spec.API, prevDeployment *kapps.Deployment, queueURL
InitContainers: []kcore.Container{
operator.InitContainer(&api),
},
Containers: containers,
NodeSelector: operator.NodeSelectors(),
Tolerations: operator.GenerateResourceTolerations(),
Affinity: &kcore.Affinity{
NodeAffinity: &kcore.NodeAffinity{
PreferredDuringSchedulingIgnoredDuringExecution: operator.GeneratePreferredNodeAffinities(),
},
},
Containers: containers,
NodeSelector: operator.NodeSelectors(),
Tolerations: operator.GenerateResourceTolerations(),
Affinity: operator.GenerateNodeAffinities(api.Compute.NodeGroups),
Volumes: volumes,
ServiceAccountName: operator.ServiceAccountName,
},
Expand Down
8 changes: 8 additions & 0 deletions pkg/operator/resources/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ const (
ErrRealtimeAPIUsedByTrafficSplitter = "resources.realtime_api_used_by_traffic_splitter"
ErrAPIsNotDeployed = "resources.apis_not_deployed"
ErrGRPCNotSupportedForTrafficSplitter = "resources.grpc_not_supported_for_traffic_splitter"
ErrInvalidNodeGroupSelector = "resources.invalid_node_group_selector"
)

func ErrorOperationIsOnlySupportedForKind(resource operator.DeployedResource, supportedKind userconfig.Kind, supportedKinds ...userconfig.Kind) error {
Expand Down Expand Up @@ -116,3 +117,10 @@ func ErrorGRPCNotSupportedForTrafficSplitter(grpcAPIName string) error {
Message: fmt.Sprintf("api %s (of kind %s) is served using the grpc protocol and therefore, it cannot be used for the %s kind", grpcAPIName, userconfig.RealtimeAPIKind, userconfig.TrafficSplitterKind),
})
}

func ErrorInvalidNodeGroupSelector(selected string, availableNodeGroups []string) error {
return errors.WithStack(&errors.Error{
Kind: ErrInvalidNodeGroupSelector,
Message: fmt.Sprintf("node group %s doesn't exist; remove the node group selector to let Cortex determine automatically where to place the API or specify a valid node group name (%s)", selected, s.StrsOr(availableNodeGroups)),
})
}
24 changes: 8 additions & 16 deletions pkg/operator/resources/job/batchapi/k8s_specs.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,14 +86,10 @@ func pythonPredictorJobSpec(api *spec.API, job *spec.BatchJob) (*kbatch.Job, err
InitContainers: []kcore.Container{
operator.InitContainer(api),
},
Containers: containers,
NodeSelector: operator.NodeSelectors(),
Tolerations: operator.GenerateResourceTolerations(),
Affinity: &kcore.Affinity{
NodeAffinity: &kcore.NodeAffinity{
PreferredDuringSchedulingIgnoredDuringExecution: operator.GeneratePreferredNodeAffinities(),
},
},
Containers: containers,
NodeSelector: operator.NodeSelectors(),
Tolerations: operator.GenerateResourceTolerations(),
Affinity: operator.GenerateNodeAffinities(api.Compute.NodeGroups),
Volumes: volumes,
ServiceAccountName: operator.ServiceAccountName,
},
Expand Down Expand Up @@ -141,14 +137,10 @@ func tensorFlowPredictorJobSpec(api *spec.API, job *spec.BatchJob) (*kbatch.Job,
InitContainers: []kcore.Container{
operator.InitContainer(api),
},
Containers: containers,
NodeSelector: operator.NodeSelectors(),
Tolerations: operator.GenerateResourceTolerations(),
Affinity: &kcore.Affinity{
NodeAffinity: &kcore.NodeAffinity{
PreferredDuringSchedulingIgnoredDuringExecution: operator.GeneratePreferredNodeAffinities(),
},
},
Containers: containers,
NodeSelector: operator.NodeSelectors(),
Tolerations: operator.GenerateResourceTolerations(),
Affinity: operator.GenerateNodeAffinities(api.Compute.NodeGroups),
Volumes: volumes,
ServiceAccountName: operator.ServiceAccountName,
},
Expand Down
12 changes: 4 additions & 8 deletions pkg/operator/resources/job/taskapi/k8s_specs.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,14 +117,10 @@ func k8sJobSpec(api *spec.API, job *spec.TaskJob) *kbatch.Job {
InitContainers: []kcore.Container{
operator.TaskInitContainer(api),
},
Containers: containers,
NodeSelector: operator.NodeSelectors(),
Tolerations: operator.GenerateResourceTolerations(),
Affinity: &kcore.Affinity{
NodeAffinity: &kcore.NodeAffinity{
PreferredDuringSchedulingIgnoredDuringExecution: operator.GeneratePreferredNodeAffinities(),
},
},
Containers: containers,
NodeSelector: operator.NodeSelectors(),
Tolerations: operator.GenerateResourceTolerations(),
Affinity: operator.GenerateNodeAffinities(api.Compute.NodeGroups),
Volumes: volumes,
ServiceAccountName: operator.ServiceAccountName,
},
Expand Down
24 changes: 8 additions & 16 deletions pkg/operator/resources/realtimeapi/k8s_specs.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,10 @@ func tensorflowAPISpec(api *spec.API, prevDeployment *kapps.Deployment) *kapps.D
InitContainers: []kcore.Container{
operator.InitContainer(api),
},
Containers: containers,
NodeSelector: operator.NodeSelectors(),
Tolerations: operator.GenerateResourceTolerations(),
Affinity: &kcore.Affinity{
NodeAffinity: &kcore.NodeAffinity{
PreferredDuringSchedulingIgnoredDuringExecution: operator.GeneratePreferredNodeAffinities(),
},
},
Containers: containers,
NodeSelector: operator.NodeSelectors(),
Tolerations: operator.GenerateResourceTolerations(),
Affinity: operator.GenerateNodeAffinities(api.Compute.NodeGroups),
Volumes: volumes,
ServiceAccountName: operator.ServiceAccountName,
},
Expand Down Expand Up @@ -149,14 +145,10 @@ func pythonAPISpec(api *spec.API, prevDeployment *kapps.Deployment) *kapps.Deplo
InitContainers: []kcore.Container{
operator.InitContainer(api),
},
Containers: containers,
NodeSelector: operator.NodeSelectors(),
Tolerations: operator.GenerateResourceTolerations(),
Affinity: &kcore.Affinity{
NodeAffinity: &kcore.NodeAffinity{
PreferredDuringSchedulingIgnoredDuringExecution: operator.GeneratePreferredNodeAffinities(),
},
},
Containers: containers,
NodeSelector: operator.NodeSelectors(),
Tolerations: operator.GenerateResourceTolerations(),
Affinity: operator.GenerateNodeAffinities(api.Compute.NodeGroups),
Volumes: volumes,
ServiceAccountName: operator.ServiceAccountName,
},
Expand Down
24 changes: 24 additions & 0 deletions pkg/operator/resources/validations.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,31 @@ var _inferentiaMemReserve = kresource.MustParse("100Mi")
func validateK8sCompute(compute *userconfig.Compute, maxMemMap map[string]kresource.Quantity) error {
allErrors := []error{}
successfulLoops := 0

clusterNodeGroupNames := strset.New(config.ManagedConfig.GetNodeGroupNames()...)
apiNodeGroupNames := compute.NodeGroups

if apiNodeGroupNames != nil {
for _, ngName := range apiNodeGroupNames {
if !clusterNodeGroupNames.Has(ngName) {
return ErrorInvalidNodeGroupSelector(ngName, config.ManagedConfig.GetNodeGroupNames())
}
}
}

for _, instanceMetadata := range config.InstancesMetadata {
if apiNodeGroupNames != nil {
matchedNodeGroups := 0
for _, ngName := range apiNodeGroupNames {
if config.ManagedConfig.GetNodeGroupByName(ngName).InstanceType == instanceMetadata.Type {
matchedNodeGroups++
}
}
if matchedNodeGroups == 0 {
continue
}
}

maxMemLoop := maxMemMap[instanceMetadata.Type]
maxMemLoop.Sub(_cortexMemReserve)

Expand Down
20 changes: 20 additions & 0 deletions pkg/types/clusterconfig/cluster_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1419,6 +1419,26 @@ func (mc *ManagedConfig) GetAllInstanceTypes() []string {
return allInstanceTypes.Slice()
}

func (mc *ManagedConfig) GetNodeGroupByName(name string) *NodeGroup {
for _, ng := range mc.NodeGroups {
if ng.Name == name {
matchedNodeGroup := *ng
return &matchedNodeGroup
}
}

return nil
}

func (mc *ManagedConfig) GetNodeGroupNames() []string {
allNodeGroupNames := []string{}
for _, ng := range mc.NodeGroups {
allNodeGroupNames = append(allNodeGroupNames, ng.Name)
}

return allNodeGroupNames
}

func validateClusterName(clusterName string) (string, error) {
if !_strictS3BucketRegex.MatchString(clusterName) {
return "", errors.Wrap(ErrorDidNotMatchStrictS3Regex(), clusterName)
Expand Down
12 changes: 12 additions & 0 deletions pkg/types/spec/validations.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,18 @@ func computeValidation() *cr.StructFieldValidation {
GreaterThanOrEqualTo: pointer.Int64(0),
},
},
{
StructField: "NodeGroups",
StringListValidation: &cr.StringListValidation{
Required: false,
Default: nil,
AllowExplicitNull: true,
AllowEmpty: false,
ElementStringValidation: &cr.StringValidation{
AlphaNumericDashUnderscore: true,
},
},
},
},
},
}
Expand Down
Loading