Skip to content

Commit

Permalink
influxdb scaler: add support for integer query results (#1977)
Browse files Browse the repository at this point in the history
Signed-off-by: Adam Gardner <adam.gardner@magicmemories.com>
Co-authored-by: Ahmed ElSayed <ahmels@microsoft.com>
Co-authored-by: Zbynek Roubalik <726523+zroubalik@users.noreply.github.com>
  • Loading branch information
3 people authored Aug 2, 2021
1 parent 337bc0f commit 7e7d42b
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 8 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
- IBM MQ scaler password handling fix ([#1939](https://github.com/kedacore/keda/pull/1939))
- Metrics APIServer: Add ratelimiting parameters to override client ([#1944](https://github.com/kedacore/keda/pull/1944))
- Optimize KafkaScaler by fetching all topic offsets using a single HTTP request ([#1956](https://github.com/kedacore/keda/pull/1956))
- Adjusts InfluxDB scaler to support queries that return integers in addition to those that return floats ([#1977](https://github.com/kedacore/keda/pull/1977))

### Breaking Changes

Expand Down
12 changes: 7 additions & 5 deletions pkg/scalers/influxdb_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,12 +170,14 @@ func queryInfluxDB(queryAPI api.QueryAPI, query string) (float64, error) {
return 0, fmt.Errorf("no results found from query")
}

val, ok := result.Record().Value().(float64)
if !ok {
return 0, fmt.Errorf("value could not be parsed into a float")
switch valRaw := result.Record().Value().(type) {
case float64:
return valRaw, nil
case int64:
return float64(valRaw), nil
default:
return 0, fmt.Errorf("value of type %T could not be converted into a float", valRaw)
}

return val, nil
}

// GetMetrics connects to influxdb via the client and returns a value based on the query
Expand Down
60 changes: 57 additions & 3 deletions tests/scalers/influxdb.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ test.before((t) => {
t.is(0, exitCode, 'Influxdb is not in a ready state')
})

test.serial('Should start off deployment with 0 replicas and scale to 2 replicas when scaled object is applied', (t) => {
test.serial('Should start off deployment with 0 replicas and scale to 2 replicas when scaled object with float query is applied', (t) => {
const { authToken, orgName } = runWriteJob(t)
const basicDeploymentTmpFile = tmp.fileSync()
fs.writeFileSync(basicDeploymentTmpFile.name, basicDeploymentYaml)
Expand All @@ -72,7 +72,7 @@ test.serial('Should start off deployment with 0 replicas and scale to 2 replicas
t.is(numReplicasBefore, '0', 'Number of replicas should be 0 to start with')

const scaledObjectTmpFile = tmp.fileSync()
fs.writeFileSync(scaledObjectTmpFile.name, scaledObjectYaml.replace('{{INFLUXDB_AUTH_TOKEN}}', authToken).replace('{{INFLUXDB_ORG_NAME}}', orgName))
fs.writeFileSync(scaledObjectTmpFile.name, scaledObjectYamlFloat.replace('{{INFLUXDB_AUTH_TOKEN}}', authToken).replace('{{INFLUXDB_ORG_NAME}}', orgName))

t.is(0, sh.exec(`kubectl apply --namespace ${influxdbNamespaceName} -f ${scaledObjectTmpFile.name}`).code)

Expand All @@ -90,6 +90,35 @@ test.serial('Should start off deployment with 0 replicas and scale to 2 replicas
t.is(numReplicasAfter, '2', 'Number of replicas should have scaled to 2')
})

test.serial('Should start off deployment with 0 replicas and scale to 2 replicas when scaled object with int query is applied', (t) => {
const { authToken, orgName } = runWriteJob(t)
const basicDeploymentTmpFile = tmp.fileSync()
fs.writeFileSync(basicDeploymentTmpFile.name, basicDeploymentYaml)

t.is(0, sh.exec(`kubectl apply --namespace ${influxdbNamespaceName} -f ${basicDeploymentTmpFile.name}`).code)

const numReplicasBefore = sh.exec(`kubectl get deployment --namespace ${influxdbNamespaceName} ${nginxDeploymentName} -o jsonpath='{.spec.replicas}'`).stdout
t.is(numReplicasBefore, '0', 'Number of replicas should be 0 to start with')

const scaledObjectTmpFile = tmp.fileSync()
fs.writeFileSync(scaledObjectTmpFile.name, scaledObjectYamlInt.replace('{{INFLUXDB_AUTH_TOKEN}}', authToken).replace('{{INFLUXDB_ORG_NAME}}', orgName))

t.is(0, sh.exec(`kubectl apply --namespace ${influxdbNamespaceName} -f ${scaledObjectTmpFile.name}`).code)

// polling/waiting for deployment to scale to desired amount of replicas
let numReplicasAfter = '1'
for (let i = 0; i < 15; i++){
numReplicasAfter = sh.exec(`kubectl get deployment --namespace ${influxdbNamespaceName} ${nginxDeploymentName} -o jsonpath='{.spec.replicas}'`).stdout
if (numReplicasAfter !== '2') {
sh.exec('sleep 2s')
} else {
break
}
}

t.is(numReplicasAfter, '2', 'Number of replicas should have scaled to 2')
})

test.after.always((t) => {
t.is(0, sh.exec(`kubectl delete namespace ${influxdbNamespaceName}`).code, 'Should delete influxdb namespace')
})
Expand Down Expand Up @@ -154,7 +183,31 @@ spec:
type: ClusterIP
`

const scaledObjectYaml = `
const scaledObjectYamlFloat = `
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
name: influxdb-scaler
namespace: influxdb
spec:
scaleTargetRef:
name: nginx-deployment
maxReplicaCount: 2
triggers:
- type: influxdb
metadata:
authToken: {{INFLUXDB_AUTH_TOKEN}}
organizationName: {{INFLUXDB_ORG_NAME}}
serverURL: http://influxdb.influxdb.svc:8086
thresholdValue: "3"
query: |
from(bucket:"bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "stat")
|> map(fn: (r) => ({r with _value: float(v: r._value)}))
`

const scaledObjectYamlInt = `
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
Expand All @@ -175,6 +228,7 @@ spec:
from(bucket:"bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "stat")
|> map(fn: (r) => ({r with _value: int(v: r._value)}))
`

const influxdbWriteJobYaml = `
Expand Down

0 comments on commit 7e7d42b

Please sign in to comment.