Skip to content

Commit

Permalink
feat: extend h-sacle api to support 'scaleOut' and 'scaleIn' operation (
Browse files Browse the repository at this point in the history
  • Loading branch information
wangyelei authored Jun 6, 2024
1 parent 29e0bf1 commit 3fa254d
Show file tree
Hide file tree
Showing 38 changed files with 9,043 additions and 7,292 deletions.
17 changes: 15 additions & 2 deletions apis/apps/v1alpha1/cluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -1651,8 +1651,11 @@ func (t *InstanceTemplate) GetName() string {
return t.Name
}

func (t *InstanceTemplate) GetReplicas() *int32 {
return t.Replicas
func (t *InstanceTemplate) GetReplicas() int32 {
if t.Replicas != nil {
return *t.Replicas
}
return defaultInstanceTemplateReplicas
}

// GetClusterUpRunningPhases returns Cluster running or partially running phases.
Expand Down Expand Up @@ -1697,3 +1700,13 @@ func GetComponentUpRunningPhase() []ClusterComponentPhase {
func ComponentPodsAreReady(podsAreReady *bool) bool {
return podsAreReady != nil && *podsAreReady
}

// GetInstanceTemplateName get the instance template name by instance name.
func GetInstanceTemplateName(clusterName, componentName, instanceName string) string {
workloadPrefix := fmt.Sprintf("%s-%s", clusterName, componentName)
compInsKey := instanceName[:strings.LastIndex(instanceName, "-")]
if compInsKey == workloadPrefix {
return ""
}
return strings.Replace(compInsKey, workloadPrefix+"-", "", 1)
}
112 changes: 86 additions & 26 deletions apis/apps/v1alpha1/opsrequest_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ type SpecificOpsRequest struct {
Upgrade *Upgrade `json:"upgrade,omitempty"`

// Lists HorizontalScaling objects, each specifying scaling requirements for a Component,
// including desired total replica counts, configurations for new instances, modifications for existing instances,
// and instance downscaling options.
// including desired replica changes, configurations for new instances, modifications for existing instances,
// and take offline/online the specified instances.
//
// +optional
// +patchMergeKey=componentName
Expand Down Expand Up @@ -360,7 +360,12 @@ type VerticalScaling struct {
corev1.ResourceRequirements `json:",inline"`

// Specifies the desired compute resources of the instance template that need to vertical scale.
Instances []InstanceResourceTemplate `json:"instances,omitempty"`
// +patchMergeKey=name
// +patchStrategy=merge,retainKeys
// +listType=map
// +listMapKey=name
// +optional
Instances []InstanceResourceTemplate `json:"instances,omitempty" patchStrategy:"merge,retainKeys" patchMergeKey:"name"`
}

type InstanceResourceTemplate struct {
Expand Down Expand Up @@ -403,7 +408,12 @@ type VolumeExpansion struct {
VolumeClaimTemplates []OpsRequestVolumeClaimTemplate `json:"volumeClaimTemplates" patchStrategy:"merge,retainKeys" patchMergeKey:"name"`

// Specifies the desired storage size of the instance template that need to volume expand.
Instances []InstanceVolumeClaimTemplate `json:"instances,omitempty"`
// +patchMergeKey=name
// +patchStrategy=merge,retainKeys
// +listType=map
// +listMapKey=name
// +optional
Instances []InstanceVolumeClaimTemplate `json:"instances,omitempty" patchStrategy:"merge,retainKeys" patchMergeKey:"name"`
}

type OpsRequestVolumeClaimTemplate struct {
Expand All @@ -425,34 +435,84 @@ type HorizontalScaling struct {
// Specifies the name of the Component.
ComponentOps `json:",inline"`

// Specifies the number of total replicas.
//
// +kubebuilder:validation:Required
// Deprecated: since v0.9, use scaleOut and scaleIn instead.
// Specifies the number of replicas for the component. Cannot be used with "scaleIn" and "scaleOut".
// +kubebuilder:deprecatedversion:warning="This field has been deprecated since 0.9.0"
// +kubebuilder:validation:Minimum=0
Replicas int32 `json:"replicas"`
// +optional
Replicas *int32 `json:"replicas,omitempty"`

// Contains a list of InstanceTemplate objects.
// Each InstanceTemplate object allows for modifying replica counts or specifying configurations for new instances during scaling.
//
// The field supports two main use cases:
//
// - Modifying replica count:
// Specify the desired replica count for existing instances with a particular configuration using Name and Replicas fields.
// To modify the replica count, the Name and Replicas fields of the InstanceTemplate object should be provided.
// Only these fields are used for matching and adjusting replicas; other fields are ignored.
// The Replicas value overrides any existing count.
// - Configuring new instances:
// Define the configuration for new instances added during scaling, including resource requirements, labels, annotations, etc.
// New instances are created based on the provided InstanceTemplate.
// Specifies the replica changes for scaling out components and instance templates,
// and brings offline instances back online. Can be used in conjunction with the "scaleIn" operation.
// Note: Any configuration that deletes instances is considered invalid.
//
// +optional
Instances []InstanceTemplate `json:"instances,omitempty"`
ScaleOut *ScaleOut `json:"scaleOut,omitempty"`

// Specifies the names of instances to be scaled down.
// This provides control over which specific instances are targeted for termination when reducing the replica count.
//
// Specifies the replica changes for scaling in components and instance templates,
// and takes specified instances offline. Can be used in conjunction with the "scaleOut" operation.
// Note: Any configuration that creates instances is considered invalid.
// +optional
OfflineInstances []string `json:"offlineInstances,omitempty"`
ScaleIn *ScaleIn `json:"scaleIn,omitempty"`
}

// ScaleOut defines the configuration for a scale-out operation.
type ScaleOut struct {

// Modifies the replicas of the component and instance templates.
ReplicaChanger `json:",inline"`

// Defines the configuration for new instances added during scaling, including resource requirements, labels, annotations, etc.
// New instances are created based on the provided instance templates.
// +optional
// +patchMergeKey=name
// +patchStrategy=merge,retainKeys
// +listType=map
// +listMapKey=name
NewInstances []InstanceTemplate `json:"newInstances,omitempty" patchStrategy:"merge,retainKeys" patchMergeKey:"name"`

// Specifies the instances in the offline list to bring back online.
// +optional
OfflineInstancesToOnline []string `json:"offlineInstancesToOnline,omitempty"`
}

// ScaleIn defines the configuration for a scale-in operation.
type ScaleIn struct {

// Modifies the replicas of the component and instance templates.
ReplicaChanger `json:",inline"`

// Specifies the instance names that need to be taken offline.
// +optional
OnlineInstancesToOffline []string `json:"onlineInstancesToOffline,omitempty"`
}

// ReplicaChanger defines the parameters for changing the number of replicas.
type ReplicaChanger struct {
// Specifies the replica changes for the component.
// +kubebuilder:validation:Minimum=0
ReplicaChanges *int32 `json:"replicaChanges,omitempty"`

// Modifies the desired replicas count for existing InstanceTemplate.
// if the inst
// +patchMergeKey=name
// +patchStrategy=merge,retainKeys
// +listType=map
// +listMapKey=name
// +optional
Instances []InstanceReplicasTemplate `json:"instances,omitempty" patchStrategy:"merge,retainKeys" patchMergeKey:"name"`
}

// InstanceReplicasTemplate defines the template for instance replicas.
type InstanceReplicasTemplate struct {
// Specifies the name of the instance template.
// +kubebuilder:validation:Required
Name string `json:"name"`

// Specifies the replica changes for the instance template.
// +kubebuilder:validation:Minimum=0
// +kubebuilder:validation:Required
ReplicaChanges int32 `json:"replicaChanges"`
}

// Reconfigure defines the parameters for updating a Component's configuration.
Expand Down
129 changes: 127 additions & 2 deletions apis/apps/v1alpha1/opsrequest_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,12 +416,137 @@ func (r *OpsRequest) validateHorizontalScaling(_ context.Context, _ client.Clien
if len(horizontalScalingList) == 0 {
return notEmptyError("spec.horizontalScaling")
}

compOpsList := make([]ComponentOps, len(horizontalScalingList))
hScaleMap := map[string]HorizontalScaling{}
for i, v := range horizontalScalingList {
compOpsList[i] = v.ComponentOps
hScaleMap[v.ComponentName] = horizontalScalingList[i]
}
return r.checkComponentExistence(cluster, compOpsList)
if err := r.checkComponentExistence(cluster, compOpsList); err != nil {
return err
}
for _, comSpec := range cluster.Spec.ComponentSpecs {
if hScale, ok := hScaleMap[comSpec.Name]; ok {
if err := r.validateHorizontalScalingSpec(hScale, comSpec, cluster.Name, false); err != nil {
return err
}
}
}
for _, shardingSpec := range cluster.Spec.ShardingSpecs {
if hScale, ok := hScaleMap[shardingSpec.Name]; ok {
if err := r.validateHorizontalScalingSpec(hScale, shardingSpec.Template, cluster.Name, true); err != nil {
return err
}
}
}
return nil
}

// CountOfflineOrOnlineInstances calculate the number of instances that need to be brought online and offline corresponding to the instance template name.
func (r *OpsRequest) CountOfflineOrOnlineInstances(clusterName, componentName string, hScaleInstanceNames []string) map[string]int32 {
offlineOrOnlineInsCountMap := map[string]int32{}
for _, insName := range hScaleInstanceNames {
insTplName := GetInstanceTemplateName(clusterName, componentName, insName)
offlineOrOnlineInsCountMap[insTplName]++
}
return offlineOrOnlineInsCountMap
}

func (r *OpsRequest) validateHorizontalScalingSpec(hScale HorizontalScaling, compSpec ClusterComponentSpec, clusterName string, isSharding bool) error {
scaleIn := hScale.ScaleIn
scaleOut := hScale.ScaleOut
if hScale.Replicas != nil && (scaleIn != nil || scaleOut != nil) {
return fmt.Errorf(`"replicas" has been deprecated and cannot be used with "scaleOut" and "scaleIn"`)
}
if hScale.Replicas != nil {
return nil
}
if lastCompConfiguration, ok := r.Status.LastConfiguration.Components[hScale.ComponentName]; ok {
// use last component configuration snapshot
compSpec.Instances = lastCompConfiguration.Instances
compSpec.Replicas = *lastCompConfiguration.Replicas
compSpec.OfflineInstances = lastCompConfiguration.OfflineInstances
}
compInsTplMap := map[string]int32{}
for _, v := range compSpec.Instances {
compInsTplMap[v.Name] = v.GetReplicas()
}
// Rules:
// 1. length of offlineInstancesToOnline or onlineInstancesToOffline can't greater than the configured replicaChanges for the component.
// 2. replicaChanges for component must greater than or equal to the sum of replicaChanges configured in instance templates.
validateHScaleOperation := func(replicaChanger ReplicaChanger, newInstances []InstanceTemplate, offlineOrOnlineInsNames []string, isScaleIn bool) error {
msgPrefix := "ScaleIn:"
hScaleInstanceFieldName := "onlineInstancesToOffline"
if !isScaleIn {
msgPrefix = "ScaleOut:"
hScaleInstanceFieldName = "offlineInstancesToOnline"
}
if isSharding && len(offlineOrOnlineInsNames) > 0 {
return fmt.Errorf(`cannot specify %s for a sharding component "%s"`, hScaleInstanceFieldName, hScale.ComponentName)
}
if replicaChanger.ReplicaChanges != nil && len(offlineOrOnlineInsNames) > int(*replicaChanger.ReplicaChanges) {
return fmt.Errorf(`the length of %s can't be greater than the "replicaChanges" for the component`, hScaleInstanceFieldName)
}
offlineOrOnlineInsCountMap := r.CountOfflineOrOnlineInstances(clusterName, hScale.ComponentName, offlineOrOnlineInsNames)
insTplChangeMap := map[string]int32{}
allReplicaChanges := int32(0)
for _, v := range replicaChanger.Instances {
compInsReplicas, ok := compInsTplMap[v.Name]
if !ok {
return fmt.Errorf(`%s cannot find the instance template "%s" in component "%s"`,
msgPrefix, v.Name, hScale.ComponentName)
}
if isScaleIn && v.ReplicaChanges > compInsReplicas {
return fmt.Errorf(`%s "replicaChanges" of instanceTemplate "%s" can't be greater than %d`,
msgPrefix, v.Name, compInsReplicas)
}
allReplicaChanges += v.ReplicaChanges
insTplChangeMap[v.Name] = v.ReplicaChanges
}
for insTplName, replicaCount := range offlineOrOnlineInsCountMap {
replicaChanges, ok := insTplChangeMap[insTplName]
if !ok {
allReplicaChanges += replicaCount
continue
}
if replicaChanges < replicaCount {
return fmt.Errorf(`"replicaChanges" can't be less than %d when %d instances of the instance template "%s" are configured in %s`,
replicaCount, replicaCount, insTplName, hScaleInstanceFieldName)
}
}
for _, insTpl := range newInstances {
if _, ok := compInsTplMap[insTpl.Name]; ok {
return fmt.Errorf(`new instance template "%s" already exists in component "%s"`, insTpl.Name, hScale.ComponentName)
}
allReplicaChanges += insTpl.GetReplicas()
}
if replicaChanger.ReplicaChanges != nil && allReplicaChanges > *replicaChanger.ReplicaChanges {
return fmt.Errorf(`%s "replicaChanges" can't be less than the sum of "replicaChanges" for specified instance templates`, msgPrefix)
}
return nil
}
if scaleIn != nil {
if err := validateHScaleOperation(scaleIn.ReplicaChanger, nil, scaleIn.OnlineInstancesToOffline, true); err != nil {
return err
}
if scaleIn.ReplicaChanges != nil && *scaleIn.ReplicaChanges > compSpec.Replicas {
return fmt.Errorf(`"scaleIn.replicaChanges" can't be greater than %d for component "%s"`, compSpec.Replicas, hScale.ComponentName)
}
}
if scaleOut != nil {
if err := validateHScaleOperation(scaleOut.ReplicaChanger, scaleOut.NewInstances, scaleOut.OfflineInstancesToOnline, false); err != nil {
return err
}
if len(scaleOut.OfflineInstancesToOnline) > 0 {
offlineInstanceSet := sets.New(compSpec.OfflineInstances...)
for _, offlineInsName := range scaleOut.OfflineInstancesToOnline {
if _, ok := offlineInstanceSet[offlineInsName]; !ok {
return fmt.Errorf(`cannot find the offline instance "%s" in component "%s" for scaleOut operation`, offlineInsName, hScale.ComponentName)
}
}
}
}
return nil
}

// validateVolumeExpansion validates volumeExpansion api when spec.type is VolumeExpansion
Expand Down
Loading

0 comments on commit 3fa254d

Please sign in to comment.