Skip to content

Allow users to configure shared memory for IPC #1756

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Jan 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/workloads/batch/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
image: <string> # docker image to use for the Predictor (default: quay.io/cortexlabs/python-predictor-cpu:master or quay.io/cortexlabs/python-predictor-gpu:master based on compute)
env: <string: string> # dictionary of environment variables
log_level: <string> # log level that can be "debug", "info", "warning" or "error" (default: "info")
shm_size: <string> # size of shared memory (/dev/shm) for sharing data between multiple processes, e.g. 64Mi or 1Gi (default: Null)
networking:
endpoint: <string> # the endpoint for the API (default: <api_name>)
api_gateway: public | none # whether to create a public API Gateway endpoint for this API (if not, the API will still be accessible via the load balancer) (default: public, unless disabled cluster-wide)
Expand Down Expand Up @@ -50,6 +51,7 @@
tensorflow_serving_image: <string> # docker image to use for the TensorFlow Serving container (default: quay.io/cortexlabs/tensorflow-serving-gpu:master or quay.io/cortexlabs/tensorflow-serving-cpu:master based on compute)
env: <string: string> # dictionary of environment variables
log_level: <string> # log level that can be "debug", "info", "warning" or "error" (default: "info")
shm_size: <string> # size of shared memory (/dev/shm) for sharing data between multiple processes, e.g. 64Mi or 1Gi (default: Null)
networking:
endpoint: <string> # the endpoint for the API (default: <api_name>)
api_gateway: public | none # whether to create a public API Gateway endpoint for this API (if not, the API will still be accessible via the load balancer) (default: public, unless disabled cluster-wide)
Expand Down Expand Up @@ -80,6 +82,7 @@
image: <string> # docker image to use for the Predictor (default: quay.io/cortexlabs/onnx-predictor-gpu:master or quay.io/cortexlabs/onnx-predictor-cpu:master based on compute)
env: <string: string> # dictionary of environment variables
log_level: <string> # log level that can be "debug", "info", "warning" or "error" (default: "info")
shm_size: <string> # size of shared memory (/dev/shm) for sharing data between multiple processes, e.g. 64Mi or 1Gi (default: Null)
networking:
endpoint: <string> # the endpoint for the API (default: <api_name>)
api_gateway: public | none # whether to create a public API Gateway endpoint for this API (if not, the API will still be accessible via the load balancer) (default: public, unless disabled cluster-wide)
Expand Down
3 changes: 3 additions & 0 deletions docs/workloads/realtime/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
image: <string> # docker image to use for the Predictor (default: quay.io/cortexlabs/python-predictor-cpu:master or quay.io/cortexlabs/python-predictor-gpu:master based on compute)
env: <string: string> # dictionary of environment variables
log_level: <string> # log level that can be "debug", "info", "warning" or "error" (default: "info")
shm_size: <string> # size of shared memory (/dev/shm) for sharing data between multiple processes, e.g. 64Mi or 1Gi (default: Null)
networking:
endpoint: <string> # the endpoint for the API (default: <api_name>)
api_gateway: public | none # whether to create a public API Gateway endpoint for this API (if not, the API will still be accessible via the load balancer) (default: public, unless disabled cluster-wide) (aws only)
Expand Down Expand Up @@ -85,6 +86,7 @@
tensorflow_serving_image: <string> # docker image to use for the TensorFlow Serving container (default: quay.io/cortexlabs/tensorflow-serving-gpu:master or quay.io/cortexlabs/tensorflow-serving-cpu:master based on compute)
env: <string: string> # dictionary of environment variables
log_level: <string> # log level that can be "debug", "info", "warning" or "error" (default: "info")
shm_size: <string> # size of shared memory (/dev/shm) for sharing data between multiple processes, e.g. 64Mi or 1Gi (default: Null)
networking:
endpoint: <string> # the endpoint for the API (default: <api_name>)
api_gateway: public | none # whether to create a public API Gateway endpoint for this API (if not, the API will still be accessible via the load balancer) (default: public, unless disabled cluster-wide) (aws only)
Expand Down Expand Up @@ -136,6 +138,7 @@
image: <string> # docker image to use for the Predictor (default: quay.io/cortexlabs/onnx-predictor-gpu:master or quay.io/cortexlabs/onnx-predictor-cpu:master based on compute)
env: <string: string> # dictionary of environment variables
log_level: <string> # log level that can be "debug", "info", "warning" or "error" (default: "info")
shm_size: <string> # size of shared memory (/dev/shm) for sharing data between multiple processes, e.g. 64Mi or 1Gi (default: Null)
networking:
endpoint: <string> # the endpoint for the API (default: <api_name>)
api_gateway: public | none # whether to create a public API Gateway endpoint for this API (if not, the API will still be accessible via the load balancer) (default: public, unless disabled cluster-wide) (aws only)
Expand Down
57 changes: 53 additions & 4 deletions pkg/operator/operator/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,6 @@ func PythonPredictorContainers(api *spec.API) ([]kcore.Container, []kcore.Volume
apiPodResourceList["nvidia.com/gpu"] = *kresource.NewQuantity(api.Compute.GPU, kresource.DecimalSI)
apiPodResourceLimitsList["nvidia.com/gpu"] = *kresource.NewQuantity(api.Compute.GPU, kresource.DecimalSI)
}

} else {
volumes = append(volumes, kcore.Volume{
Name: "neuron-sock",
Expand Down Expand Up @@ -173,6 +172,22 @@ func PythonPredictorContainers(api *spec.API) ([]kcore.Container, []kcore.Volume
containers = append(containers, neuronContainer)
}

if api.Predictor.ShmSize != nil {
volumes = append(volumes, kcore.Volume{
Name: "dshm",
VolumeSource: kcore.VolumeSource{
EmptyDir: &kcore.EmptyDirVolumeSource{
Medium: kcore.StorageMediumMemory,
SizeLimit: k8s.QuantityPtr(api.Predictor.ShmSize.Quantity),
},
},
})
apiPodVolumeMounts = append(apiPodVolumeMounts, kcore.VolumeMount{
Name: "dshm",
MountPath: "/dev/shm",
})
}

containers = append(containers, kcore.Container{
Name: APIContainerName,
Image: api.Predictor.Image,
Expand Down Expand Up @@ -262,6 +277,22 @@ func TensorFlowPredictorContainers(api *spec.API) ([]kcore.Container, []kcore.Vo
containers = append(containers, neuronContainer)
}

if api.Predictor.ShmSize != nil {
volumes = append(volumes, kcore.Volume{
Name: "dshm",
VolumeSource: kcore.VolumeSource{
EmptyDir: &kcore.EmptyDirVolumeSource{
Medium: kcore.StorageMediumMemory,
SizeLimit: k8s.QuantityPtr(api.Predictor.ShmSize.Quantity),
},
},
})
volumeMounts = append(volumeMounts, kcore.VolumeMount{
Name: "dshm",
MountPath: "/dev/shm",
})
}

containers = append(containers, kcore.Container{
Name: APIContainerName,
Image: api.Predictor.Image,
Expand Down Expand Up @@ -294,9 +325,11 @@ func TensorFlowPredictorContainers(api *spec.API) ([]kcore.Container, []kcore.Vo
return containers, volumes
}

func ONNXPredictorContainers(api *spec.API) []kcore.Container {
func ONNXPredictorContainers(api *spec.API) ([]kcore.Container, []kcore.Volume) {
resourceList := kcore.ResourceList{}
resourceLimitsList := kcore.ResourceList{}
apiPodVolumeMounts := defaultVolumeMounts()
volumes := DefaultVolumes()
containers := []kcore.Container{}

if api.Compute.CPU != nil {
Expand All @@ -316,13 +349,29 @@ func ONNXPredictorContainers(api *spec.API) []kcore.Container {
resourceLimitsList["nvidia.com/gpu"] = *kresource.NewQuantity(api.Compute.GPU, kresource.DecimalSI)
}

if api.Predictor.ShmSize != nil {
volumes = append(volumes, kcore.Volume{
Name: "dshm",
VolumeSource: kcore.VolumeSource{
EmptyDir: &kcore.EmptyDirVolumeSource{
Medium: kcore.StorageMediumMemory,
SizeLimit: k8s.QuantityPtr(api.Predictor.ShmSize.Quantity),
},
},
})
apiPodVolumeMounts = append(apiPodVolumeMounts, kcore.VolumeMount{
Name: "dshm",
MountPath: "/dev/shm",
})
}

containers = append(containers, kcore.Container{
Name: APIContainerName,
Image: api.Predictor.Image,
ImagePullPolicy: kcore.PullAlways,
Env: getEnvVars(api, APIContainerName),
EnvFrom: baseEnvVars(),
VolumeMounts: defaultVolumeMounts(),
VolumeMounts: apiPodVolumeMounts,
ReadinessProbe: FileExistsProbe(_apiReadinessFile),
LivenessProbe: _apiLivenessProbe,
Lifecycle: nginxGracefulStopper(api.Kind),
Expand All @@ -338,7 +387,7 @@ func ONNXPredictorContainers(api *spec.API) []kcore.Container {
},
})

return containers
return containers, volumes
}

func getEnvVars(api *spec.API, container string) []kcore.EnvVar {
Expand Down
4 changes: 2 additions & 2 deletions pkg/operator/resources/batchapi/k8s_specs.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func tensorFlowPredictorJobSpec(api *spec.API, job *spec.Job) (*kbatch.Job, erro
}

func onnxPredictorJobSpec(api *spec.API, job *spec.Job) (*kbatch.Job, error) {
containers := operator.ONNXPredictorContainers(api)
containers, volumes := operator.ONNXPredictorContainers(api)

for i, container := range containers {
if container.Name == operator.APIContainerName {
Expand Down Expand Up @@ -186,7 +186,7 @@ func onnxPredictorJobSpec(api *spec.API, job *spec.Job) (*kbatch.Job, error) {
"workload": "true",
},
Tolerations: operator.Tolerations,
Volumes: operator.DefaultVolumes(),
Volumes: volumes,
ServiceAccountName: "default",
},
},
Expand Down
4 changes: 2 additions & 2 deletions pkg/operator/resources/realtimeapi/k8s_specs.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func pythonAPISpec(api *spec.API, prevDeployment *kapps.Deployment) *kapps.Deplo
}

func onnxAPISpec(api *spec.API, prevDeployment *kapps.Deployment) *kapps.Deployment {
containers := operator.ONNXPredictorContainers(api)
containers, volumes := operator.ONNXPredictorContainers(api)

if config.Provider == types.AWSProviderType {
containers = append(containers, operator.RequestMonitorContainer(api))
Expand Down Expand Up @@ -196,7 +196,7 @@ func onnxAPISpec(api *spec.API, prevDeployment *kapps.Deployment) *kapps.Deploym
"workload": "true",
},
Tolerations: operator.Tolerations,
Volumes: operator.DefaultVolumes(),
Volumes: volumes,
ServiceAccountName: "default",
},
},
Expand Down
10 changes: 10 additions & 0 deletions pkg/types/spec/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/cortexlabs/cortex/pkg/consts"
"github.com/cortexlabs/cortex/pkg/lib/errors"
"github.com/cortexlabs/cortex/pkg/lib/files"
"github.com/cortexlabs/cortex/pkg/lib/k8s"
libmath "github.com/cortexlabs/cortex/pkg/lib/math"
"github.com/cortexlabs/cortex/pkg/lib/sets/strset"
s "github.com/cortexlabs/cortex/pkg/lib/strings"
Expand Down Expand Up @@ -52,6 +53,8 @@ const (

ErrModelCachingNotSupportedWhenMultiprocessingEnabled = "spec.model_caching_not_supported_when_multiprocessing_enabled"

ErrShmSizeCannotExceedMem = "spec.shm_size_cannot_exceed_mem"

ErrFileNotFound = "spec.file_not_found"
ErrDirIsEmpty = "spec.dir_is_empty"
ErrMustBeRelativeProjectPath = "spec.must_be_relative_project_path"
Expand Down Expand Up @@ -228,6 +231,13 @@ func ErrorSurgeAndUnavailableBothZero() error {
})
}

func ErrorShmSizeCannotExceedMem(shmSize k8s.Quantity, mem k8s.Quantity) error {
return errors.WithStack(&errors.Error{
Kind: ErrShmSizeCannotExceedMem,
Message: fmt.Sprintf("predictor.shm_size (%s) cannot exceed compute.mem (%s)", shmSize.UserString, mem.UserString),
})
}

func ErrorModelCachingNotSupportedWhenMultiprocessingEnabled(desiredProcesses int32) error {
const maxNumProcesses int32 = 1
return errors.WithStack(&errors.Error{
Expand Down
14 changes: 14 additions & 0 deletions pkg/types/spec/validations.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,14 @@ func predictorValidation() *cr.StructFieldValidation {
GreaterThanOrEqualTo: pointer.Int32(1),
},
},
{
StructField: "ShmSize",
StringPtrValidation: &cr.StringPtrValidation{
Default: nil,
AllowExplicitNull: true,
},
Parser: k8s.QuantityParser(&k8s.QuantityValidation{}),
},
{
StructField: "LogLevel",
StringValidation: &cr.StringValidation{
Expand Down Expand Up @@ -795,6 +803,12 @@ func ValidateAPI(
}
}

if api.Predictor != nil && api.Predictor.ShmSize != nil && api.Compute.Mem != nil {
if api.Predictor.ShmSize.Cmp(api.Compute.Mem.Quantity) > 0 {
return ErrorShmSizeCannotExceedMem(*api.Predictor.ShmSize, *api.Compute.Mem)
}
}

return nil
}

Expand Down
10 changes: 10 additions & 0 deletions pkg/types/userconfig/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ type Predictor struct {
ServerSideBatching *ServerSideBatching `json:"server_side_batching" yaml:"server_side_batching"`
ProcessesPerReplica int32 `json:"processes_per_replica" yaml:"processes_per_replica"`
ThreadsPerProcess int32 `json:"threads_per_process" yaml:"threads_per_process"`
ShmSize *k8s.Quantity `json:"shm_size" yaml:"shm_size"`
PythonPath *string `json:"python_path" yaml:"python_path"`
LogLevel LogLevel `json:"log_level" yaml:"log_level"`
Image string `json:"image" yaml:"image"`
Expand Down Expand Up @@ -370,6 +371,10 @@ func (predictor *Predictor) UserStr() string {
sb.WriteString(fmt.Sprintf("%s: %s\n", ProcessesPerReplicaKey, s.Int32(predictor.ProcessesPerReplica)))
sb.WriteString(fmt.Sprintf("%s: %s\n", ThreadsPerProcessKey, s.Int32(predictor.ThreadsPerProcess)))

if predictor.ShmSize != nil {
sb.WriteString(fmt.Sprintf("%s: %s\n", ShmSize, predictor.ShmSize.UserString))
}

if len(predictor.Config) > 0 {
sb.WriteString(fmt.Sprintf("%s:\n", ConfigKey))
d, _ := yaml.Marshal(&predictor.Config)
Expand Down Expand Up @@ -603,6 +608,11 @@ func (api *API) TelemetryEvent(provider types.ProviderType) map[string]interface
event["predictor.type"] = api.Predictor.Type
event["predictor.processes_per_replica"] = api.Predictor.ProcessesPerReplica
event["predictor.threads_per_process"] = api.Predictor.ThreadsPerProcess

if api.Predictor.ShmSize != nil {
event["predictor.shm_size"] = api.Predictor.ShmSize.String()
}

event["predictor.log_level"] = api.Predictor.LogLevel

if api.Predictor.PythonPath != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/types/userconfig/config_key.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ const (
TensorFlowServingImageKey = "tensorflow_serving_image"
ProcessesPerReplicaKey = "processes_per_replica"
ThreadsPerProcessKey = "threads_per_process"
ShmSize = "shm_size"
LogLevelKey = "log_level"
ConfigKey = "config"
EnvKey = "env"
Expand Down