Skip to content

Commit

Permalink
feat: Introduce activationThreshold/minMetricValue for Cassandra Scal…
Browse files Browse the repository at this point in the history
…er (#3358)

Signed-off-by: Jorge Turrado <jorge_turrado@hotmail.es>
  • Loading branch information
Jorge Turrado Ferrero authored Jul 14, 2022
1 parent 0d57b1e commit af80d84
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 15 deletions.
34 changes: 22 additions & 12 deletions pkg/scalers/cassandra_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,18 @@ type cassandraScaler struct {

// CassandraMetadata defines metadata used by KEDA to query a Cassandra table.
type CassandraMetadata struct {
username string
password string
clusterIPAddress string
port int
consistency gocql.Consistency
protocolVersion int
keyspace string
query string
targetQueryValue int64
metricName string
scalerIndex int
username string
password string
clusterIPAddress string
port int
consistency gocql.Consistency
protocolVersion int
keyspace string
query string
targetQueryValue int64
activationTargetQueryValue int64
metricName string
scalerIndex int
}

var cassandraLog = logf.Log.WithName("cassandra_scaler")
Expand Down Expand Up @@ -83,6 +84,15 @@ func ParseCassandraMetadata(config *ScalerConfig) (*CassandraMetadata, error) {
return nil, fmt.Errorf("no targetQueryValue given")
}

meta.activationTargetQueryValue = 0
if val, ok := config.TriggerMetadata["activationTargetQueryValue"]; ok {
activationTargetQueryValue, err := strconv.ParseInt(val, 10, 64)
if err != nil {
return nil, fmt.Errorf("activationTargetQueryValue parsing error %s", err.Error())
}
meta.activationTargetQueryValue = activationTargetQueryValue
}

if val, ok := config.TriggerMetadata["username"]; ok {
meta.username = val
} else {
Expand Down Expand Up @@ -175,7 +185,7 @@ func (s *cassandraScaler) IsActive(ctx context.Context) (bool, error) {
return false, fmt.Errorf("error inspecting cassandra: %s", err)
}

return messages > 0, nil
return messages > s.metadata.activationTargetQueryValue, nil
}

// GetMetricSpecForScaling returns the MetricSpec for the Horizontal Pod Autoscaler.
Expand Down
21 changes: 18 additions & 3 deletions tests/scalers_go/cassandra/cassandra_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,14 +208,18 @@ spec:
keyspace: "{{.CassandraKeyspace}}"
query: "SELECT COUNT(*) FROM {{.CassandraKeyspace}}.{{.CassandraTableName}};"
targetQueryValue: "1"
activationTargetQueryValue: "4"
metricName: "{{.CassandraKeyspace}}"
authenticationRef:
name: keda-trigger-auth-cassandra-secret
`
insertDataTemplate = `BEGIN BATCH
insertDataTemplateA = `BEGIN BATCH
INSERT INTO {{.CassandraKeyspace}}.{{.CassandraTableName}} (name, surname, age) VALUES ('Mary', 'Paul', 30);
INSERT INTO {{.CassandraKeyspace}}.{{.CassandraTableName}} (name, surname, age) VALUES ('James', 'Miller', 25);
INSERT INTO {{.CassandraKeyspace}}.{{.CassandraTableName}} (name, surname, age) VALUES ('Lisa', 'Wilson', 29);
APPLY BATCH;`

insertDataTemplateB = `BEGIN BATCH
INSERT INTO {{.CassandraKeyspace}}.{{.CassandraTableName}} (name, surname, age) VALUES ('Bob', 'Taylor', 33);
INSERT INTO {{.CassandraKeyspace}}.{{.CassandraTableName}} (name, surname, age) VALUES ('Carol', 'Moore', 31);
INSERT INTO {{.CassandraKeyspace}}.{{.CassandraTableName}} (name, surname, age) VALUES ('Richard', 'Brown', 23);
Expand All @@ -235,6 +239,7 @@ func TestCassandraScaler(t *testing.T) {
"replica count should be %s after 3 minute", minReplicaCount)

// test scaling
testActivation(t, kc)
testScaleUp(t, kc)
testScaleDown(t, kc)

Expand Down Expand Up @@ -275,9 +280,19 @@ func checkIfCassandraStatusIsReady(t *testing.T, name string) error {
return errors.New("cassandra is not ready")
}

func testActivation(t *testing.T, kc *kubernetes.Clientset) {
t.Log("--- testing activation ---")
result, err := getCassandraInsertCmd(insertDataTemplateA)
assert.NoErrorf(t, err, "cannot parse log - %s", err)
out, errOut, _ := ExecCommandOnSpecificPod(t, "cassandra-client-0", testNamespace, fmt.Sprintf("bash cqlsh -u %s -p %s %s.%s --execute=\"%s\"", cassandraUsername, cassandraPassword, deploymentName, testNamespace, result))
t.Logf("Output: %s, Error: %s", out, errOut)

AssertReplicaCountNotChangeDuringTimePeriod(t, kc, deploymentName, testNamespace, minReplicaCount, 60)
}

func testScaleUp(t *testing.T, kc *kubernetes.Clientset) {
t.Log("--- testing scale up ---")
result, err := getCassandraInsertCmd()
result, err := getCassandraInsertCmd(insertDataTemplateB)
assert.NoErrorf(t, err, "cannot parse log - %s", err)
out, errOut, _ := ExecCommandOnSpecificPod(t, "cassandra-client-0", testNamespace, fmt.Sprintf("bash cqlsh -u %s -p %s %s.%s --execute=\"%s\"", cassandraUsername, cassandraPassword, deploymentName, testNamespace, result))
t.Logf("Output: %s, Error: %s", out, errOut)
Expand All @@ -296,7 +311,7 @@ func testScaleDown(t *testing.T, kc *kubernetes.Clientset) {
"replica count should be %s after 3 minutes", minReplicaCount)
}

func getCassandraInsertCmd() (string, error) {
func getCassandraInsertCmd(insertDataTemplate string) (string, error) {
tmpl, err := template.New("cassandra insert").Parse(insertDataTemplate)
var tpl bytes.Buffer
if err := tmpl.Execute(&tpl, templateData{CassandraKeyspace: cassandraKeyspace, CassandraTableName: cassandraTableName}); err != nil {
Expand Down

0 comments on commit af80d84

Please sign in to comment.