Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Solace scaler config #5856

Merged
merged 5 commits into from
Jun 5, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Update
Signed-off-by: SpiritZhou <iammrzhouzhenghan@gmail.com>
  • Loading branch information
SpiritZhou committed Jun 4, 2024
commit 74f87696f77cd0847bb83857e30d2aef724c1458
220 changes: 61 additions & 159 deletions pkg/scalers/solace_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,30 +81,54 @@ type SolaceScaler struct {
}

type SolaceMetadata struct {
// Scaler index
triggerIndex int

SolaceMetaSempBaseURL string `keda:"name=messageVpn, order=triggerMetadata"`

// Full SEMP URL to target queue (CONSTRUCTED IN CODE)
endpointURL string
solaceSempURL string
EndpointURL string
SolaceSempURL string

// Solace Message VPN
messageVpn string
queueName string
MessageVpn string `keda:"name=messageVpn, order=triggerMetadata"`
QueueName string `keda:"name=queueName, order=triggerMetadata"`

// Basic Auth Username
username string
Username string `keda:"name=username, order=authParams;triggerMetadata;resolvedEnv"`
// Basic Auth Password
password string
Password string `keda:"name=password, order=authParams;triggerMetadata;resolvedEnv"`

// Target Message Count
msgCountTarget int64
msgSpoolUsageTarget int64 // Spool Use Target in Megabytes
msgRxRateTarget int64 // Ingress Rate Target per consumer in msgs/second
MsgCountTarget int64 `keda:"name=messageCountTarget, order=triggerMetadata, optional"`
MsgSpoolUsageTarget int64 `keda:"name=messageSpoolUsageTarget, order=triggerMetadata, optional"` // Spool Use Target in Megabytes
MsgRxRateTarget int64 `keda:"name=messageReceiveRateTarget, order=triggerMetadata, optional"` // Ingress Rate Target per consumer in msgs/second

// Activation Target Message Count
activationMsgCountTarget int
activationMsgSpoolUsageTarget int // Spool Use Target in Megabytes
activationMsgRxRateTarget int // Ingress Rate Target per consumer in msgs/second
// Scaler index
triggerIndex int
ActivationMsgCountTarget int `keda:"name=activationMessageCountTarget, order=triggerMetadata, optional, default=0"`
ActivationMsgSpoolUsageTarget int `keda:"name=activationMessageSpoolUsageTarget, order=triggerMetadata, optional, default=0"` // Spool Use Target in Megabytes
ActivationMsgRxRateTarget int `keda:"name=activationMessageReceiveRateTarget, order=triggerMetadata, optional, default=0"` // Ingress Rate Target per consumer in msgs/second
}

func (s *SolaceMetadata) Validate() error {
if s.SolaceMetaSempBaseURL == "" {
return fmt.Errorf(solaceFoundMetaFalse, solaceMetaSempBaseURL)
}

if s.MessageVpn == "" {
return fmt.Errorf(solaceFoundMetaFalse, solaceMetaMsgVpn)
}

if s.QueueName == "" {
return fmt.Errorf(solaceFoundMetaFalse, solaceMetaQueueName)
}

// Check that we have at least one positive target value for the scaler
if s.MsgCountTarget < 1 && s.MsgSpoolUsageTarget < 1 && s.MsgRxRateTarget < 1 {
return fmt.Errorf("no target value found in the scaler configuration")
}

return nil
}

// SEMP API Response Root Struct
Expand Down Expand Up @@ -164,146 +188,24 @@ func NewSolaceScaler(config *scalersconfig.ScalerConfig) (Scaler, error) {

// Called by constructor
func parseSolaceMetadata(config *scalersconfig.ScalerConfig) (*SolaceMetadata, error) {
meta := SolaceMetadata{}
// GET THE SEMP API ENDPOINT
if val, ok := config.TriggerMetadata[solaceMetaSempBaseURL]; ok && val != "" {
meta.solaceSempURL = val
} else {
return nil, fmt.Errorf(solaceFoundMetaFalse, solaceMetaSempBaseURL)
}
// GET Message VPN
if val, ok := config.TriggerMetadata[solaceMetaMsgVpn]; ok && val != "" {
meta.messageVpn = val
} else {
return nil, fmt.Errorf(solaceFoundMetaFalse, solaceMetaMsgVpn)
}
// GET Queue Name
if val, ok := config.TriggerMetadata[solaceMetaQueueName]; ok && val != "" {
meta.queueName = val
} else {
return nil, fmt.Errorf(solaceFoundMetaFalse, solaceMetaQueueName)
}

// GET METRIC TARGET VALUES
// GET msgCountTarget
meta.msgCountTarget = 0
if val, ok := config.TriggerMetadata[solaceMetaMsgCountTarget]; ok && val != "" {
if msgCount, err := strconv.ParseInt(val, 10, 64); err == nil {
meta.msgCountTarget = msgCount
} else {
return nil, fmt.Errorf("can't parse [%s], not a valid integer: %w", solaceMetaMsgCountTarget, err)
}
}
// GET msgSpoolUsageTarget
meta.msgSpoolUsageTarget = 0
if val, ok := config.TriggerMetadata[solaceMetaMsgSpoolUsageTarget]; ok && val != "" {
if msgSpoolUsage, err := strconv.ParseInt(val, 10, 64); err == nil {
meta.msgSpoolUsageTarget = msgSpoolUsage * 1024 * 1024
} else {
return nil, fmt.Errorf("can't parse [%s], not a valid integer: %w", solaceMetaMsgSpoolUsageTarget, err)
}
}
// GET msgRcvRateTarget
meta.msgRxRateTarget = 0
if val, ok := config.TriggerMetadata[solaceMetaMsgRxRateTarget]; ok && val != "" {
if msgRcvRate, err := strconv.ParseInt(val, 10, 64); err == nil {
meta.msgRxRateTarget = msgRcvRate
} else {
return nil, fmt.Errorf("can't parse [%s], not a valid integer: %w", solaceMetaMsgRxRateTarget, err)
}
}

// Check that we have at least one positive target value for the scaler
if meta.msgCountTarget < 1 && meta.msgSpoolUsageTarget < 1 && meta.msgRxRateTarget < 1 {
return nil, fmt.Errorf("no target value found in the scaler configuration")
}

// GET ACTIVATION METRIC TARGET VALUES
// GET activationMsgCountTarget
meta.activationMsgCountTarget = 0
if val, ok := config.TriggerMetadata[solaceMetaActivationMsgCountTarget]; ok && val != "" {
if activationMsgCountTarget, err := strconv.Atoi(val); err == nil {
meta.activationMsgCountTarget = activationMsgCountTarget
} else {
return nil, fmt.Errorf("can't parse [%s], not a valid integer: %w", solaceMetaActivationMsgCountTarget, err)
}
}
// GET activationMsgSpoolUsageTarget
meta.activationMsgSpoolUsageTarget = 0
if val, ok := config.TriggerMetadata[solaceMetaActivationMsgSpoolUsageTarget]; ok && val != "" {
if activationMsgSpoolUsageTarget, err := strconv.Atoi(val); err == nil {
meta.activationMsgSpoolUsageTarget = activationMsgSpoolUsageTarget * 1024 * 1024
} else {
return nil, fmt.Errorf("can't parse [%s], not a valid integer: %w", solaceMetaActivationMsgSpoolUsageTarget, err)
}
}
meta.activationMsgRxRateTarget = 0
if val, ok := config.TriggerMetadata[solaceMetaActivationMsgRxRateTarget]; ok && val != "" {
if activationMsgRxRateTarget, err := strconv.Atoi(val); err == nil {
meta.activationMsgRxRateTarget = activationMsgRxRateTarget
} else {
return nil, fmt.Errorf("can't parse [%s], not a valid integer: %w", solaceMetaActivationMsgRxRateTarget, err)
}
meta := &SolaceMetadata{}
if err := config.TypedConfig(meta); err != nil {
return nil, fmt.Errorf("error parsing prometheus metadata: %w", err)
}
meta.triggerIndex = config.TriggerIndex

// Format Solace SEMP Queue Endpoint (REST URL)
meta.endpointURL = fmt.Sprintf(
meta.EndpointURL = fmt.Sprintf(
solaceSempEndpointURLTemplate,
meta.solaceSempURL,
meta.SolaceSempURL,
solaceAPIName,
solaceAPIVersion,
meta.messageVpn,
meta.MessageVpn,
solaceAPIObjectTypeQueue,
url.QueryEscape(meta.queueName),
url.QueryEscape(meta.QueueName),
)

// Get Credentials
var e error
if meta.username, meta.password, e = getSolaceSempCredentials(config); e != nil {
return nil, e
}

meta.triggerIndex = config.TriggerIndex

return &meta, nil
}

func getSolaceSempCredentials(config *scalersconfig.ScalerConfig) (u string, p string, err error) {
// GET CREDENTIALS
// The username must be a valid broker ADMIN user identifier with read access to SEMP for the broker, VPN, and relevant objects
// The scaler will attempt to acquire username and then password independently. For each:
// - Search K8S Secret (Encoded)
// - Search environment variable specified by config at 'usernameFromEnv' / 'passwordFromEnv'
// - Search 'username' / 'password' fields (Clear Text)
// Get username
if usernameSecret, ok := config.AuthParams[solaceMetaUsername]; ok && usernameSecret != "" {
u = usernameSecret
} else if usernameFromEnv, ok := config.TriggerMetadata[solaceMetaUsernameFromEnv]; ok && usernameFromEnv != "" {
if resolvedUser, ok := config.ResolvedEnv[config.TriggerMetadata[solaceMetaUsernameFromEnv]]; ok && resolvedUser != "" {
u = resolvedUser
} else {
return "", "", fmt.Errorf("username could not be resolved from the environment variable: %s", usernameFromEnv)
}
} else if usernameClear, ok := config.TriggerMetadata[solaceMetaUsername]; ok && usernameClear != "" {
u = usernameClear
} else {
return "", "", fmt.Errorf("username is required and not found in K8Secret, environment, or clear text")
}
// Get Password
if passwordSecret, ok := config.AuthParams[solaceMetaPassword]; ok && passwordSecret != "" {
p = passwordSecret
} else if passwordEnv, ok := config.TriggerMetadata[solaceMetaPasswordFromEnv]; ok && passwordEnv != "" {
if resolvedPassword, ok := config.ResolvedEnv[config.TriggerMetadata[solaceMetaPasswordFromEnv]]; ok && resolvedPassword != "" {
p = resolvedPassword
} else {
return "", "", fmt.Errorf("password could not be resolved from the environment variable: %s", passwordEnv)
}
} else if passwordClear, ok := config.TriggerMetadata[solaceMetaPassword]; ok && passwordClear != "" {
p = passwordClear
} else {
return "", "", fmt.Errorf("password is required and not found in K8Secret, environment, or clear text")
}
return u, p, nil
return meta, nil
}

// INTERFACE METHOD
Expand All @@ -317,37 +219,37 @@ func getSolaceSempCredentials(config *scalersconfig.ScalerConfig) (u string, p s
func (s *SolaceScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec {
var metricSpecList []v2.MetricSpec
// Message Count Target Spec
if s.metadata.msgCountTarget > 0 {
metricName := kedautil.NormalizeString(fmt.Sprintf("solace-%s-%s", s.metadata.queueName, solaceTriggermsgcount))
if s.metadata.MsgCountTarget > 0 {
metricName := kedautil.NormalizeString(fmt.Sprintf("solace-%s-%s", s.metadata.QueueName, solaceTriggermsgcount))
externalMetric := &v2.ExternalMetricSource{
Metric: v2.MetricIdentifier{
Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, metricName),
},
Target: GetMetricTarget(s.metricType, s.metadata.msgCountTarget),
Target: GetMetricTarget(s.metricType, s.metadata.MsgCountTarget),
}
metricSpec := v2.MetricSpec{External: externalMetric, Type: solaceExtMetricType}
metricSpecList = append(metricSpecList, metricSpec)
}
// Message Spool Usage Target Spec
if s.metadata.msgSpoolUsageTarget > 0 {
metricName := kedautil.NormalizeString(fmt.Sprintf("solace-%s-%s", s.metadata.queueName, solaceTriggermsgspoolusage))
if s.metadata.MsgSpoolUsageTarget > 0 {
metricName := kedautil.NormalizeString(fmt.Sprintf("solace-%s-%s", s.metadata.QueueName, solaceTriggermsgspoolusage))
externalMetric := &v2.ExternalMetricSource{
Metric: v2.MetricIdentifier{
Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, metricName),
},
Target: GetMetricTarget(s.metricType, s.metadata.msgSpoolUsageTarget),
Target: GetMetricTarget(s.metricType, s.metadata.MsgSpoolUsageTarget),
}
metricSpec := v2.MetricSpec{External: externalMetric, Type: solaceExtMetricType}
metricSpecList = append(metricSpecList, metricSpec)
}
// Message Receive Rate Target Spec
if s.metadata.msgRxRateTarget > 0 {
metricName := kedautil.NormalizeString(fmt.Sprintf("solace-%s-%s", s.metadata.queueName, solaceTriggermsgrxrate))
if s.metadata.MsgRxRateTarget > 0 {
metricName := kedautil.NormalizeString(fmt.Sprintf("solace-%s-%s", s.metadata.QueueName, solaceTriggermsgrxrate))
externalMetric := &v2.ExternalMetricSource{
Metric: v2.MetricIdentifier{
Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, metricName),
},
Target: GetMetricTarget(s.metricType, s.metadata.msgRxRateTarget),
Target: GetMetricTarget(s.metricType, s.metadata.MsgRxRateTarget),
}
metricSpec := v2.MetricSpec{External: externalMetric, Type: solaceExtMetricType}
metricSpecList = append(metricSpecList, metricSpec)
Expand All @@ -357,7 +259,7 @@ func (s *SolaceScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec

// returns SolaceMetricValues struct populated from broker SEMP endpoint
func (s *SolaceScaler) getSolaceQueueMetricsFromSEMP(ctx context.Context) (SolaceMetricValues, error) {
var scaledMetricEndpointURL = s.metadata.endpointURL
var scaledMetricEndpointURL = s.metadata.EndpointURL
var httpClient = s.httpClient
var sempResponse solaceSEMPResponse
var metricValues SolaceMetricValues
Expand All @@ -370,7 +272,7 @@ func (s *SolaceScaler) getSolaceQueueMetricsFromSEMP(ctx context.Context) (Solac
}

// Add HTTP Auth and Headers
request.SetBasicAuth(s.metadata.username, s.metadata.password)
request.SetBasicAuth(s.metadata.Username, s.metadata.Password)
request.Header.Set("Content-Type", "application/json")

// Call Solace SEMP API
Expand Down Expand Up @@ -429,9 +331,9 @@ func (s *SolaceScaler) GetMetricsAndActivity(ctx context.Context, metricName str
return []external_metrics.ExternalMetricValue{}, false, err
}
return []external_metrics.ExternalMetricValue{metric},
metricValues.msgCount > s.metadata.activationMsgCountTarget ||
metricValues.msgSpoolUsage > s.metadata.activationMsgSpoolUsageTarget ||
metricValues.msgRcvRate > s.metadata.activationMsgRxRateTarget,
metricValues.msgCount > s.metadata.ActivationMsgCountTarget ||
metricValues.msgSpoolUsage > s.metadata.ActivationMsgSpoolUsageTarget ||
metricValues.msgRcvRate > s.metadata.ActivationMsgRxRateTarget,
nil
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/scalers/solace_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -545,8 +545,8 @@ func TestSolaceParseSolaceMetadata(t *testing.T) {
default:
fmt.Println(" --> PASS")
}
if !testData.isError && strings.Contains(testData.metadata["queueName"], "/") && !strings.Contains(meta.endpointURL, url.QueryEscape(testData.metadata["queueName"])) {
t.Error("expected endpointURL to query escape special characters in the URL but got:", meta.endpointURL)
if !testData.isError && strings.Contains(testData.metadata["queueName"], "/") && !strings.Contains(meta.EndpointURL, url.QueryEscape(testData.metadata["queueName"])) {
t.Error("expected endpointURL to query escape special characters in the URL but got:", meta.EndpointURL)
fmt.Println(" --> FAIL")
}
}
Expand Down
Loading