Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
65755aa
Rename impl: field to name:
PhilippMatthes Jan 21, 2026
e1c6f8c
Use filters:, weighers:, and detectors: (wip)
PhilippMatthes Jan 21, 2026
a307312
Use Filter and Weigher instead of BaseStep
PhilippMatthes Jan 21, 2026
3c202ba
WIP: I want to separate filters and weighers in code
PhilippMatthes Jan 21, 2026
9672580
Split filter and weigher in implementation
PhilippMatthes Jan 22, 2026
4dadf0d
Rename PrepareResult -> IncludeAllHostsFromRequest
PhilippMatthes Jan 22, 2026
7f7b5a8
WIP
PhilippMatthes Jan 22, 2026
1d424d7
Ran into the wrong direction
PhilippMatthes Jan 22, 2026
7cbeb3d
Revert "Ran into the wrong direction"
PhilippMatthes Jan 22, 2026
75f45e0
xRevert "WIP"
PhilippMatthes Jan 22, 2026
9c68706
Simplify spec again
PhilippMatthes Jan 23, 2026
8b0fab0
Use critical and non critical errors during pipeline initialization
PhilippMatthes Jan 23, 2026
8e8bd2a
Moving first knowledge checking logic to step (wip)
PhilippMatthes Jan 23, 2026
29cd127
Apply pattern to weighers
PhilippMatthes Jan 26, 2026
bf7f821
Fix linting issues from last commit
PhilippMatthes Jan 26, 2026
2abff2e
Add multiplier to apply to weighers
PhilippMatthes Jan 26, 2026
0878bdb
Improve code structuring in scheduling/lib
PhilippMatthes Jan 26, 2026
8db7422
Multiplier should be nilable
PhilippMatthes Jan 26, 2026
a5ff244
Rename opts to params
PhilippMatthes Jan 26, 2026
efc143c
Merge branch 'main' into refine-pipeline-crd
PhilippMatthes Jan 26, 2026
ca17468
Adjust pipeline yaml specs
PhilippMatthes Jan 26, 2026
a3ef2ef
Remove unnecessary createDecisions helm value
PhilippMatthes Jan 26, 2026
a507a03
Cleanup: enable kvm features in local dev setup [skip ci]
PhilippMatthes Jan 26, 2026
04c9d59
Set all steps ready condition when nothing fails
PhilippMatthes Jan 26, 2026
8bb5479
Fix: reconcile knowledge when rawlength is 0
PhilippMatthes Jan 26, 2026
8c13d81
Fix: reconcile datasource when object count is 0
PhilippMatthes Jan 26, 2026
3e4647a
Split into weigher, filter, detector impl WIP
PhilippMatthes Jan 26, 2026
a61b457
Fix descheduler code
PhilippMatthes Jan 27, 2026
4885dc9
Rename descheduler/step -> descheduler/detector
PhilippMatthes Jan 27, 2026
8b1159f
Pull descheduler code into lib for reusability and better alignment w…
PhilippMatthes Jan 27, 2026
01f7b80
Rename cycle detector -> cycle breaker to avoid confusion with detectors
PhilippMatthes Jan 27, 2026
e536331
Split supported_steps -> supported_[filters|weighers|detectors]
PhilippMatthes Jan 27, 2026
729ecd6
Fuse nova scheduling into scheduling/nova
PhilippMatthes Jan 27, 2026
fc2c1cf
Fuse manila scheduling into scheduling/manila
PhilippMatthes Jan 27, 2026
483b571
Fuse cinder scheduling into scheduling/cinder
PhilippMatthes Jan 27, 2026
b449721
machines + pods + explanation
PhilippMatthes Jan 27, 2026
7cbe704
e2e checks into scheduling domains
PhilippMatthes Jan 27, 2026
6fd38d9
Adjust metric names
PhilippMatthes Jan 27, 2026
e657359
Improve coverage in scheduling/nova
PhilippMatthes Jan 27, 2026
2a81d7c
Improve coverage in scheduling/lib
PhilippMatthes Jan 27, 2026
9816fc1
More tests
PhilippMatthes Jan 27, 2026
4c534e2
Add substatus for filters, weighers, and detectors
PhilippMatthes Jan 27, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion api/delegation/cinder/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@

package api

import "log/slog"
import (
"log/slog"

"github.com/cobaltcore-dev/cortex/internal/scheduling/lib"
)

// Host object from the Cinder scheduler pipeline.
type ExternalSchedulerHost struct {
Expand Down Expand Up @@ -46,6 +50,16 @@ func (r ExternalSchedulerRequest) GetTraceLogArgs() []slog.Attr {
slog.String("project", r.Context.ProjectID),
}
}
func (r ExternalSchedulerRequest) FilterSubjects(includedSubjects map[string]float64) lib.FilterWeigherPipelineRequest {
filteredHosts := make([]ExternalSchedulerHost, 0, len(includedSubjects))
for _, host := range r.Hosts {
if _, exists := includedSubjects[host.VolumeHost]; exists {
filteredHosts = append(filteredHosts, host)
}
}
r.Hosts = filteredHosts
return r
}

// Response generated by cortex for the Cinder scheduler.
// Cortex returns an ordered list of hosts that the share should be scheduled on.
Expand Down
11 changes: 11 additions & 0 deletions api/delegation/ironcore/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"log/slog"

ironcorev1alpha1 "github.com/cobaltcore-dev/cortex/api/delegation/ironcore/v1alpha1"
"github.com/cobaltcore-dev/cortex/internal/scheduling/lib"
)

type MachinePipelineRequest struct {
Expand All @@ -31,3 +32,13 @@ func (r MachinePipelineRequest) GetWeights() map[string]float64 {
func (r MachinePipelineRequest) GetTraceLogArgs() []slog.Attr {
return []slog.Attr{}
}
func (r MachinePipelineRequest) FilterSubjects(includedSubjects map[string]float64) lib.FilterWeigherPipelineRequest {
filteredPools := make([]ironcorev1alpha1.MachinePool, 0, len(includedSubjects))
for _, pool := range r.Pools {
if _, exists := includedSubjects[pool.Name]; exists {
filteredPools = append(filteredPools, pool)
}
}
r.Pools = filteredPools
return r
}
16 changes: 15 additions & 1 deletion api/delegation/manila/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@

package api

import "log/slog"
import (
"log/slog"

"github.com/cobaltcore-dev/cortex/internal/scheduling/lib"
)

// Host object from the Manila scheduler pipeline.
type ExternalSchedulerHost struct {
Expand Down Expand Up @@ -46,6 +50,16 @@ func (r ExternalSchedulerRequest) GetTraceLogArgs() []slog.Attr {
slog.String("project", r.Context.ProjectID),
}
}
func (r ExternalSchedulerRequest) FilterSubjects(includedSubjects map[string]float64) lib.FilterWeigherPipelineRequest {
filteredHosts := make([]ExternalSchedulerHost, 0, len(includedSubjects))
for _, host := range r.Hosts {
if _, exists := includedSubjects[host.ShareHost]; exists {
filteredHosts = append(filteredHosts, host)
}
}
r.Hosts = filteredHosts
return r
}

// Response generated by cortex for the Manila scheduler.
// Cortex returns an ordered list of hosts that the share should be scheduled on.
Expand Down
12 changes: 12 additions & 0 deletions api/delegation/nova/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"errors"
"fmt"
"log/slog"

"github.com/cobaltcore-dev/cortex/internal/scheduling/lib"
)

// Host object from the Nova scheduler pipeline.
Expand Down Expand Up @@ -69,6 +71,16 @@ func (r ExternalSchedulerRequest) GetTraceLogArgs() []slog.Attr {
slog.String("project", r.Context.ProjectID),
}
}
func (r ExternalSchedulerRequest) FilterSubjects(includedSubjects map[string]float64) lib.FilterWeigherPipelineRequest {
filteredHosts := make([]ExternalSchedulerHost, 0, len(includedSubjects))
for _, host := range r.Hosts {
if _, exists := includedSubjects[host.ComputeHost]; exists {
filteredHosts = append(filteredHosts, host)
}
}
r.Hosts = filteredHosts
return r
}

// Response generated by cortex for the Nova scheduler.
// Cortex returns an ordered list of hosts that the VM should be scheduled on.
Expand Down
11 changes: 11 additions & 0 deletions api/delegation/pods/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package pods
import (
"log/slog"

"github.com/cobaltcore-dev/cortex/internal/scheduling/lib"
corev1 "k8s.io/api/core/v1"
)

Expand Down Expand Up @@ -33,3 +34,13 @@ func (r PodPipelineRequest) GetWeights() map[string]float64 {
func (r PodPipelineRequest) GetTraceLogArgs() []slog.Attr {
return []slog.Attr{}
}
func (r PodPipelineRequest) FilterSubjects(includedSubjects map[string]float64) lib.FilterWeigherPipelineRequest {
filteredNodes := make([]corev1.Node, 0, len(includedSubjects))
for _, node := range r.Nodes {
if _, exists := includedSubjects[node.Name]; exists {
filteredNodes = append(filteredNodes, node)
}
}
r.Nodes = filteredNodes
return r
}
176 changes: 122 additions & 54 deletions api/v1alpha1/pipeline_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,63 +4,59 @@
package v1alpha1

import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
runtime "k8s.io/apimachinery/pkg/runtime"
)

type DisabledValidationsSpec struct {
// Whether to validate that no subjects are removed or added from the scheduler
// step. This should only be disabled for scheduler steps that remove subjects.
// Thus, if no value is provided, the default is false.
SameSubjectNumberInOut bool `json:"sameSubjectNumberInOut,omitempty"`
// Whether to validate that, after running the step, there are remaining subjects.
// This should only be disabled for scheduler steps that are expected to
// remove all subjects.
SomeSubjectsRemain bool `json:"someSubjectsRemain,omitempty"`
}
type FilterSpec struct {
// The name of the scheduler step in the cortex implementation.
// Must match to a step implemented by the pipeline controller.
Name string `json:"name"`

type StepType string
// Additional configuration for the step that can be used
// +kubebuilder:validation:Optional
Params runtime.RawExtension `json:"params,omitempty"`

const (
// Step for assigning weights to hosts.
StepTypeWeigher StepType = "weigher"
// Step for filtering hosts.
StepTypeFilter StepType = "filter"
// Step for generating descheduling recommendations.
StepTypeDescheduler StepType = "descheduler"
)
// Additional description of the step which helps understand its purpose
// and decisions made by it.
// +kubebuilder:validation:Optional
Description string `json:"description,omitempty"`
}

type WeigherSpec struct {
// The validations to disable for this step. If none are provided, all
// applied validations are enabled.
// The name of the scheduler step in the cortex implementation.
// Must match to a step implemented by the pipeline controller.
Name string `json:"name"`

// Additional configuration for the step that can be used
// +kubebuilder:validation:Optional
DisabledValidations DisabledValidationsSpec `json:"disabledValidations,omitempty"`
}
Params runtime.RawExtension `json:"params,omitempty"`

// Additional description of the step which helps understand its purpose
// and decisions made by it.
// +kubebuilder:validation:Optional
Description string `json:"description,omitempty"`

type StepSpec struct {
// The type of the scheduler step.
Type StepType `json:"type"`
// If the type is "weigher", this contains additional configuration for it.
// Optional multiplier to apply to the step's output.
// This can be used to increase or decrease the weight of a step
// relative to other steps in the same pipeline.
// +kubebuilder:validation:Optional
Weigher *WeigherSpec `json:"weigher,omitempty"`
Multiplier *float64 `json:"multiplier,omitempty"`
}

type DetectorSpec struct {
// The name of the scheduler step in the cortex implementation.
Impl string `json:"impl"`
// Additional configuration for the extractor that can be used
// +kubebuilder:validation:Optional
Opts runtime.RawExtension `json:"opts,omitempty"`
// Knowledges this step depends on to be ready.
// Must match to a step implemented by the pipeline controller.
Name string `json:"name"`

// Additional configuration for the step that can be used
// +kubebuilder:validation:Optional
Knowledges []corev1.ObjectReference `json:"knowledges,omitempty"`
Params runtime.RawExtension `json:"params,omitempty"`

// Additional description of the step which helps understand its purpose
// and decisions made by it.
// +kubebuilder:validation:Optional
Description string `json:"description,omitempty"`

// Whether this step is mandatory for the pipeline to be runnable.
// +kubebuilder:default=true
Mandatory bool `json:"mandatory"`
}

type PipelineType string
Expand All @@ -69,41 +65,113 @@ const (
// Pipeline containing filter-weigher steps for initial placement,
// migration, etc. of instances.
PipelineTypeFilterWeigher PipelineType = "filter-weigher"
// Pipeline containing descheduler steps for generating descheduling
// Pipeline containing detector steps, e.g. for generating descheduling
// recommendations.
PipelineTypeDescheduler PipelineType = "descheduler"
PipelineTypeDetector PipelineType = "detector"
)

type PipelineSpec struct {
// SchedulingDomain defines in which scheduling domain this pipeline
// is used (e.g., nova, cinder, manila).
SchedulingDomain SchedulingDomain `json:"schedulingDomain"`
// An optional description of the pipeline.

// An optional description of the pipeline, helping understand its purpose.
// +kubebuilder:validation:Optional
Description string `json:"description,omitempty"`

// If this pipeline should create decision objects.
// When this is false, the pipeline will still process requests.
// +kubebuilder:default=false
CreateDecisions bool `json:"createDecisions,omitempty"`
// The type of the pipeline.

// The type of the pipeline, used to differentiate between
// filter-weigher and detector pipelines within the same
// scheduling domain.
//
// If the type is filter-weigher, the filter and weigher attributes
// must be set. If the type is detector, the detectors attribute
// must be set.
//
// +kubebuilder:validation:Enum=filter-weigher;detector
Type PipelineType `json:"type"`
// The ordered list of steps that make up this pipeline.
Steps []StepSpec `json:"steps,omitempty"`

// Ordered list of filters to apply in a scheduling pipeline.
//
// This attribute is set only if the pipeline type is filter-weigher.
// Filters remove host candidates from an initial set, leaving
// valid candidates. Filters are run before weighers are applied.
// +kubebuilder:validation:Optional
Filters []FilterSpec `json:"filters,omitempty"`

// Ordered list of weighers to apply in a scheduling pipeline.
//
// This attribute is set only if the pipeline type is filter-weigher.
// These weighers are run after filters are applied.
// +kubebuilder:validation:Optional
Weighers []WeigherSpec `json:"weighers,omitempty"`

// Ordered list of detectors to apply in a descheduling pipeline.
//
// This attribute is set only if the pipeline type is detector.
// Detectors find candidates for descheduling (migration off current host).
// These detectors are run after weighers are applied.
// +kubebuilder:validation:Optional
Detectors []DetectorSpec `json:"detectors,omitempty"`
}

const (
FilterConditionReady = "Ready"
WeigherConditionReady = "Ready"
DetectorConditionReady = "Ready"
)

type FilterStatus struct {
// The name of the filter.
Name string `json:"name"`

// The current status conditions of the filter.
// +kubebuilder:validation:Optional
Conditions []metav1.Condition `json:"conditions,omitempty" patchStrategy:"merge" patchMergeKey:"type"`
}

type WeigherStatus struct {
// The name of the weigher.
Name string `json:"name"`

// The current status conditions of the weigher.
// +kubebuilder:validation:Optional
Conditions []metav1.Condition `json:"conditions,omitempty" patchStrategy:"merge" patchMergeKey:"type"`
}

type DetectorStatus struct {
// The name of the detector.
Name string `json:"name"`

// The current status conditions of the detector.
// +kubebuilder:validation:Optional
Conditions []metav1.Condition `json:"conditions,omitempty" patchStrategy:"merge" patchMergeKey:"type"`
}

const (
// The pipeline is ready to be used.
PipelineConditionReady = "Ready"
// All steps in the pipeline are ready.
PipelineConditionAllStepsReady = "AllStepsReady"
)

type PipelineStatus struct {
// The total number of steps configured in the pipeline.
TotalSteps int `json:"totalSteps"`
// The number of steps that are ready.
ReadySteps int `json:"readySteps"`
// An overview of the readiness of the steps in the pipeline.
// Format: "ReadySteps / TotalSteps steps ready".
StepsReadyFrac string `json:"stepsReadyFrac,omitempty"`
// List of statuses for each filter in the pipeline.
// +kubebuilder:validation:Optional
Filters []FilterStatus `json:"filters,omitempty"`

// List of statuses for each weigher in the pipeline.
// +kubebuilder:validation:Optional
Weighers []WeigherStatus `json:"weighers,omitempty"`

// List of statuses for each detector in the pipeline.
// +kubebuilder:validation:Optional
Detectors []DetectorStatus `json:"detectors,omitempty"`

// The current status conditions of the pipeline.
// +kubebuilder:validation:Optional
Conditions []metav1.Condition `json:"conditions,omitempty" patchStrategy:"merge" patchMergeKey:"type"`
Expand All @@ -115,8 +183,8 @@ type PipelineStatus struct {
// +kubebuilder:printcolumn:name="Created",type="date",JSONPath=".metadata.creationTimestamp"
// +kubebuilder:printcolumn:name="Domain",type="string",JSONPath=".spec.schedulingDomain"
// +kubebuilder:printcolumn:name="Type",type="string",JSONPath=".spec.type"
// +kubebuilder:printcolumn:name="Steps",type="string",JSONPath=".status.stepsReadyFrac"
// +kubebuilder:printcolumn:name="Ready",type="string",JSONPath=".status.conditions[?(@.type=='Ready')].status"
// +kubebuilder:printcolumn:name="All Steps Ready",type="string",JSONPath=".status.conditions[?(@.type=='AllStepsReady')].status"
// +kubebuilder:printcolumn:name="Pipeline Ready",type="string",JSONPath=".status.conditions[?(@.type=='Ready')].status"

// Pipeline is the Schema for the decisions API
type Pipeline struct {
Expand Down
Loading