From dc989e6eafeb36830fe736b2f7aba865bfd17ecc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=AB=98=E5=BC=BA?= Date: Thu, 31 Dec 2020 16:26:24 +0800 Subject: [PATCH] add mongoDB scaler MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 高强 --- CHANGELOG.md | 11 ++ go.mod | 1 + go.sum | 3 + pkg/scalers/mongo.go | 291 ++++++++++++++++++++++++++++++++++ pkg/scalers/mongo_test.go | 87 ++++++++++ pkg/scaling/scale_handler.go | 2 + tests/scalers/mongodb.test.ts | 225 ++++++++++++++++++++++++++ 7 files changed, 620 insertions(+) create mode 100644 pkg/scalers/mongo.go create mode 100644 pkg/scalers/mongo_test.go create mode 100644 tests/scalers/mongodb.test.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index c8a3d1d1b8a..e816bc4d045 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,10 @@ ### New - Can use Pod Identity with Azure Event Hub scaler ([#994](https://github.com/kedacore/keda/issues/994)) - Introducing InfluxDB scaler ([#1239](https://github.com/kedacore/keda/issues/1239)) +- Add Redis cluster support for Redis list and Redis streams scalers ([#1437](https://github.com/kedacore/keda/pull/1437)) +- Global authentication credentials can be managed using ClusterTriggerAuthentication objects ([#1452](https://github.com/kedacore/keda/pull/1452)) +- Introducing OpenStack Swift scaler ([#1342](https://github.com/kedacore/keda/issues/1342)) +- Introducing MongoDB scaler ([#1465](https://github.com/kedacore/keda/issues/1465)) ### Improvements - Support add ScaledJob's label to its job ([#1311](https://github.com/kedacore/keda/issues/1311)) @@ -27,6 +31,13 @@ - Mask password in postgres scaler auto generated metricName. ([PR #1381](https://github.com/kedacore/keda/pull/1381)) - Bug fix for pending jobs in ScaledJob's accurateScalingStrategy . ([#1323](https://github.com/kedacore/keda/issues/1323)) - Fix memory leak because of unclosed scalers. ([#1413](https://github.com/kedacore/keda/issues/1413)) +- Override the vhost on a RabbitMQ scaler via `vhostName` in the metadata. ([#1451](https://github.com/kedacore/keda/pull/1451)) +- Optimize Kafka scaler's `getLagForPartition` function. ([#1464](https://github.com/kedacore/keda/pull/1464)) +- Reduce unnecessary /scale requests from ScaledObject controller ([#1453](https://github.com/kedacore/keda/pull/1453)) +- Add support for the WATCH_NAMESPACE environment variable to the operator ([#1474](https://github.com/kedacore/keda/pull/1474)) +- Automatically determine the RabbitMQ protocol when possible, and support setting the protocl via TriggerAuthentication ([#1459](https://github.com/kedacore/keda/pulls/1459),[#1483](https://github.com/kedacore/keda/pull/1483)) +- Improve performance when fetching pod information ([#1457](https://github.com/kedacore/keda/pull/1457)) +- Improve performance when fetching current scaling information on Deployments ([#1458](https://github.com/kedacore/keda/pull/1458)) ### Breaking Changes diff --git a/go.mod b/go.mod index 2598537b930..3a3895ec81a 100644 --- a/go.mod +++ b/go.mod @@ -36,6 +36,7 @@ require ( github.com/stretchr/testify v1.6.1 github.com/tidwall/gjson v1.6.4 github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c + go.mongodb.org/mongo-driver v1.1.2 golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897 // indirect google.golang.org/api v0.36.0 google.golang.org/genproto v0.0.0-20200904004341-0bd0a958aa1d diff --git a/go.sum b/go.sum index ebf3d01b3b1..f64781c987a 100644 --- a/go.sum +++ b/go.sum @@ -550,6 +550,7 @@ github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG github.com/go-sql-driver/mysql v1.4.1/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= github.com/go-sql-driver/mysql v1.5.0 h1:ozyZYNQW3x3HtqT1jira07DN2PArx2v7/mN66gGcHOs= github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= +github.com/go-stack/stack v1.8.0 h1:5SgMzNM5HxrEjV0ww2lTmX6E2Izsfxas4+YHWRs3Lsk= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/go-test/deep v1.0.2-0.20181118220953-042da051cf31/go.mod h1:wGDj63lr65AM2AQyKZd/NYHGb0R+1RLqB8NKt3aSFNA= github.com/go-test/deep v1.0.4/go.mod h1:wGDj63lr65AM2AQyKZd/NYHGb0R+1RLqB8NKt3aSFNA= @@ -891,6 +892,7 @@ github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfV github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:CzGEWj7cYgsdH8dAjBGEr58BoE7ScuLd+fwFZ44+/x8= +github.com/kedacore/keda v1.5.0 h1:c8xA1Vo3H7rPwFiWUX3CBXnjBSrbYDmUs9iEfDlf4bQ= github.com/kelseyhightower/envconfig v1.3.0/go.mod h1:cccZRl6mQpaq41TPp5QxidR+Sa3axMbJDNb//FQX6Gg= github.com/kelseyhightower/envconfig v1.4.0/go.mod h1:cccZRl6mQpaq41TPp5QxidR+Sa3axMbJDNb//FQX6Gg= github.com/kevinburke/ssh_config v0.0.0-20190725054713-01f96b0aa0cd/go.mod h1:CT57kijsi8u/K/BOFA39wgDQJ9CxiF4nAY/ojJ6r6mM= @@ -1395,6 +1397,7 @@ go.etcd.io/etcd v0.0.0-20191023171146-3cf2f69b5738 h1:VcrIfasaLFkyjk6KNlXQSzO+B0 go.etcd.io/etcd v0.0.0-20191023171146-3cf2f69b5738/go.mod h1:dnLIgRNXwCJa5e+c6mIZCrds/GIG4ncV9HhK5PX7jPg= go.mongodb.org/mongo-driver v1.0.3/go.mod h1:u7ryQJ+DOzQmeO7zB6MHyr8jkEQvC8vH7qLUO4lqsUM= go.mongodb.org/mongo-driver v1.1.1/go.mod h1:u7ryQJ+DOzQmeO7zB6MHyr8jkEQvC8vH7qLUO4lqsUM= +go.mongodb.org/mongo-driver v1.1.2 h1:jxcFYjlkl8xaERsgLo+RNquI0epW6zuy/ZRQs6jnrFA= go.mongodb.org/mongo-driver v1.1.2/go.mod h1:u7ryQJ+DOzQmeO7zB6MHyr8jkEQvC8vH7qLUO4lqsUM= go.opencensus.io v0.15.0/go.mod h1:UffZAU+4sDEINUGP/B7UfBBkq4fqLu9zXAX7ke6CHW0= go.opencensus.io v0.17.0/go.mod h1:mp1VrMQxhlqqDpKvH4UcQUa4YwlzNmymAjPrDdfxNpI= diff --git a/pkg/scalers/mongo.go b/pkg/scalers/mongo.go new file mode 100644 index 00000000000..305d694aac0 --- /dev/null +++ b/pkg/scalers/mongo.go @@ -0,0 +1,291 @@ +package scalers + +import ( + "context" + "errors" + "fmt" + "strconv" + "time" + + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/bson/primitive" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" + "go.mongodb.org/mongo-driver/mongo/readpref" + "go.mongodb.org/mongo-driver/x/bsonx" + "k8s.io/api/autoscaling/v2beta2" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/metrics/pkg/apis/external_metrics" + logf "sigs.k8s.io/controller-runtime/pkg/log" + + kedautil "github.com/kedacore/keda/v2/pkg/util" +) + +// mongoDBScaler is support for mongoDB in keda. +type mongoDBScaler struct { + metadata *mongoDBMetadata + client *mongo.Client +} + +// mongoDBMetadata specify mongoDB scaler params. +type mongoDBMetadata struct { + // The string is used by connected with mongoDB. + // +optional + connectionString string + // Specify the host to connect to the mongoDB server,if the connectionString be provided, don't need specify this param. + // +optional + host string + // Specify the port to connect to the mongoDB server,if the connectionString be provided, don't need specify this param. + // +optional + port string + // Specify the username to connect to the mongoDB server,if the connectionString be provided, don't need specify this param. + // +optional + username string + // Specify the password to connect to the mongoDB server,if the connectionString be provided, don't need specify this param. + // +optional + password string + + // The name of the database to be queried. + // +required + dbName string + // The name of the collection to be queried. + // +required + collection string + // A mongoDB filter doc,used by specify DB. + // +required + query string + // A threshold that is used as targetAverageValue in HPA + // +required + queryValue int + + metricName string +} + +// Default variables and settings +const ( + mongoDBDefaultTimeOut = 10 * time.Second + defaultCollection = "default" + defaultDB = "test" + defaultQueryValue = 1 +) + +type mongoDBFields struct { + ID primitive.ObjectID `bson:"_id, omitempty"` +} + +var mongoDBLog = logf.Log.WithName("mongoDB_scaler") + +// NewMongoDBScaler creates a new mongoDB scaler +func NewMongoDBScaler(config *ScalerConfig) (Scaler, error) { + ctx, cancel := context.WithTimeout(context.Background(), mongoDBDefaultTimeOut) + defer cancel() + + meta, connStr, err := parseMongoDBMetadata(config) + if err != nil { + return nil, fmt.Errorf("failed to parsing mongoDB metadata,because of %v", err) + } + + opt := options.Client().ApplyURI(connStr) + client, err := mongo.Connect(ctx, opt) + if err != nil { + return nil, fmt.Errorf("failed to establish connection with mongoDB,because of %v", err) + } + + if err = client.Ping(ctx, readpref.Primary()); err != nil { + return nil, fmt.Errorf("failed to ping mongoDB,because of %v", err) + } + + return &mongoDBScaler{ + metadata: meta, + client: client, + }, nil +} + +func parseMongoDBMetadata(config *ScalerConfig) (*mongoDBMetadata, string, error) { + var connStr string + // setting default metadata + meta := mongoDBMetadata{ + collection: defaultCollection, + query: "", + queryValue: defaultQueryValue, + dbName: defaultDB, + } + + // parse metaData from ScaledJob config + if val, ok := config.TriggerMetadata["collection"]; ok { + meta.collection = val + } else { + return nil, "", fmt.Errorf("no collection given") + } + + if val, ok := config.TriggerMetadata["query"]; ok { + meta.query = val + } else { + return nil, "", fmt.Errorf("no query given") + } + + if val, ok := config.TriggerMetadata["queryValue"]; ok { + queryValue, err := strconv.Atoi(val) + if err != nil { + return nil, "", fmt.Errorf("failed to convert %v to int,because of %v", queryValue, err.Error()) + } + meta.queryValue = queryValue + } else { + return nil, "", fmt.Errorf("no queryValue given") + } + + if val, ok := config.TriggerMetadata["dbName"]; ok { + meta.dbName = val + } else { + return nil, "", fmt.Errorf("no dbName given") + } + + // Resolve connectionString + if c, ok := config.AuthParams["connectionString"]; ok { + meta.connectionString = c + } else if v, ok := config.TriggerMetadata["connectionStringFromEnv"]; ok { + meta.connectionString = config.ResolvedEnv[v] + } else { + meta.connectionString = "" + if val, ok := config.TriggerMetadata["host"]; ok { + meta.host = val + } else { + return nil, "", fmt.Errorf("no host given") + } + if val, ok := config.TriggerMetadata["port"]; ok { + meta.port = val + } else { + return nil, "", fmt.Errorf("no port given") + } + + if val, ok := config.TriggerMetadata["username"]; ok { + meta.username = val + } else { + return nil, "", fmt.Errorf("no username given") + } + // get password from env or authParams + if v, ok := config.AuthParams["password"]; ok { + meta.password = v + } else if v, ok := config.TriggerMetadata["passwordFromEnv"]; ok { + meta.password = config.ResolvedEnv[v] + } + + if len(meta.password) == 0 { + return nil, "", fmt.Errorf("no password given") + } + } + + if meta.connectionString != "" { + connStr = meta.connectionString + } else { + // Build connection str + addr := fmt.Sprintf("%s:%s", meta.host, meta.port) + auth := fmt.Sprintf("%s:%s", meta.username, meta.password) + connStr = "mongodb://" + auth + "@" + addr + } + + if val, ok := config.TriggerMetadata["metricName"]; ok { + meta.metricName = kedautil.NormalizeString(fmt.Sprintf("mongodb-%s", val)) + } else { + maskedURL, err := kedautil.MaskPartOfURL(connStr, kedautil.Hostname) + if err != nil { + return nil, "", fmt.Errorf("failure masking part of url") + } + meta.metricName = kedautil.NormalizeString(fmt.Sprintf("mongodb-%s-%s", maskedURL, meta.collection)) + } + return &meta, connStr, nil +} + +func (s *mongoDBScaler) IsActive(ctx context.Context) (bool, error) { + result, err := s.getQueryResult() + if err != nil { + mongoDBLog.Error(err, fmt.Sprintf("failed to get query result by mongoDB,because of %v", err)) + return false, err + } + return result > 0, nil +} + +// Close disposes of mongoDB connections +func (s *mongoDBScaler) Close() error { + if s.client != nil { + err := s.client.Disconnect(context.TODO()) + if err != nil { + mongoDBLog.Error(err, fmt.Sprintf("failed to close mongoDB connection,because of %v", err)) + return err + } + } + + return nil +} + +// getQueryResult query mongoDB by meta.query +func (s *mongoDBScaler) getQueryResult() (int, error) { + ctx, cancel := context.WithTimeout(context.Background(), mongoDBDefaultTimeOut) + defer cancel() + + filter, err := json2BsonDoc(s.metadata.query) + if err != nil { + mongoDBLog.Error(err, fmt.Sprintf("failed to convert query param to bson.Doc,because of %v", err)) + return 0, err + } + + docs_num, err := s.client.Database(s.metadata.dbName).Collection(s.metadata.collection).CountDocuments(ctx, filter) + if err != nil { + mongoDBLog.Error(err, fmt.Sprintf("failed to query %v in %v,because of %v", s.metadata.dbName, s.metadata.collection, err)) + return 0, err + } + + return int(docs_num), nil +} + +// GetMetrics query from mongoDB,and return to external metrics +func (s *mongoDBScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) { + num, err := s.getQueryResult() + if err != nil { + return []external_metrics.ExternalMetricValue{}, fmt.Errorf("failed to inspect momgo,because of %v", err) + } + + metric := external_metrics.ExternalMetricValue{ + MetricName: metricName, + Value: *resource.NewQuantity(int64(num), resource.DecimalSI), + Timestamp: metav1.Now(), + } + + return append([]external_metrics.ExternalMetricValue{}, metric), nil +} + +// GetMetricSpecForScaling get the query value for scaling +func (s *mongoDBScaler) GetMetricSpecForScaling() []v2beta2.MetricSpec { + targetQueryValue := resource.NewQuantity(int64(s.metadata.queryValue), resource.DecimalSI) + + externalMetric := &v2beta2.ExternalMetricSource{ + Metric: v2beta2.MetricIdentifier{ + Name: s.metadata.metricName, + }, + Target: v2beta2.MetricTarget{ + Type: v2beta2.AverageValueMetricType, + AverageValue: targetQueryValue, + }, + } + metricSpec := v2beta2.MetricSpec{ + External: externalMetric, Type: externalMetricType, + } + return []v2beta2.MetricSpec{metricSpec} +} + +// json2BsonDoc convert Json to Bson.Doc +func json2BsonDoc(js string) (doc bsonx.Doc, err error) { + doc = bsonx.Doc{} + err = bson.UnmarshalExtJSON([]byte(js), true, &doc) + if err != nil { + return nil, err + } + + if len(doc) == 0 { + return nil, errors.New("empty bson document") + } + + return doc, nil +} diff --git a/pkg/scalers/mongo_test.go b/pkg/scalers/mongo_test.go new file mode 100644 index 00000000000..8b8a63e2e79 --- /dev/null +++ b/pkg/scalers/mongo_test.go @@ -0,0 +1,87 @@ +package scalers + +import ( + "testing" + + "go.mongodb.org/mongo-driver/mongo" +) + +var testMongoDBResolvedEnv = map[string]string{ + "MongoDB_CONN_STR": "test_conn_str", + "MongoDB_PASSWORD": "test", +} + +type parseMongoDBMetadataTestData struct { + metadata map[string]string + resolvedEnv map[string]string + raisesError bool +} + +type mongoDBMetricIdentifier struct { + metadataTestData *parseMongoDBMetadataTestData + name string +} + +var testMONGODBMetadata = []parseMongoDBMetadataTestData{ + // No metadata + { + metadata: map[string]string{}, + resolvedEnv: testMongoDBResolvedEnv, + raisesError: true, + }, + // connectionStringFromEnv + { + metadata: map[string]string{"query": `{"name":"John"}`, "collection": "demo", "queryValue": "12", "connectionStringFromEnv": "Mongo_CONN_STR", "dbName": "test"}, + resolvedEnv: testMongoDBResolvedEnv, + raisesError: false, + }, + // with metric name + { + metadata: map[string]string{"query": `{"name":"John"}`, "metricName": "hpa", "collection": "demo", "queryValue": "12", "connectionStringFromEnv": "Mongo_CONN_STR", "dbName": "test"}, + resolvedEnv: testMongoDBResolvedEnv, + raisesError: false, + }, +} + +var mongoDBMetricIdentifiers = []mongoDBMetricIdentifier{ + {metadataTestData: &testMONGODBMetadata[2], name: "mongodb-hpa"}, +} + +func TestParseMongoDBMetadata(t *testing.T) { + for _, testData := range testMONGODBMetadata { + _, _, err := parseMongoDBMetadata(&ScalerConfig{TriggerMetadata: testData.metadata}) + if err != nil && !testData.raisesError { + t.Error("Expected success but got error:", err) + } + if err == nil && testData.raisesError { + t.Error("Expected error but got success") + } + } +} + +func TestMongoDBGetMetricSpecForScaling(t *testing.T) { + for _, testData := range mongoDBMetricIdentifiers { + meta, _, err := parseMongoDBMetadata(&ScalerConfig{ResolvedEnv: testData.metadataTestData.resolvedEnv, TriggerMetadata: testData.metadataTestData.metadata}) + if err != nil { + t.Fatal("Could not parse metadata:", err) + } + mockMongoDBScaler := mongoDBScaler{meta, &mongo.Client{}} + + metricSpec := mockMongoDBScaler.GetMetricSpecForScaling() + metricName := metricSpec[0].External.Metric.Name + if metricName != testData.name { + t.Error("Wrong External metric source name:", metricName) + } + } +} + +func TestJson2BsonDoc(t *testing.T) { + var testJSON = `{"name":"carson"}` + doc, err := json2BsonDoc(testJSON) + if err != nil { + t.Error("convert testJson to Bson.Doc err:", err) + } + if doc == nil { + t.Error("the doc is nil") + } +} diff --git a/pkg/scaling/scale_handler.go b/pkg/scaling/scale_handler.go index bba860fbf39..a0cbd1e7f97 100644 --- a/pkg/scaling/scale_handler.go +++ b/pkg/scaling/scale_handler.go @@ -450,6 +450,8 @@ func buildScaler(triggerType string, config *scalers.ScalerConfig) (scalers.Scal return scalers.NewCPUMemoryScaler(corev1.ResourceMemory, config) case "metrics-api": return scalers.NewMetricsAPIScaler(config) + case "mongoDB": + return scalers.NewMongoDBScaler(config) case "mysql": return scalers.NewMySQLScaler(config) case "postgresql": diff --git a/tests/scalers/mongodb.test.ts b/tests/scalers/mongodb.test.ts new file mode 100644 index 00000000000..59840fb6177 --- /dev/null +++ b/tests/scalers/mongodb.test.ts @@ -0,0 +1,225 @@ +import * as async from 'async' +import * as fs from 'fs' +import * as sh from 'shelljs' +import * as tmp from 'tmp' +import test from 'ava' + +const mongoDBNamespace = 'mongodb' +const testNamespace = 'mongodb-test' +const mongoDBUsername = 'test_user' +const mongoDBPassword = 'test_password' +const mongoDBDatabase = 'test' +const mongodbCollection = "test_collection" +const mongoJobName = "mongodb-job" + +test.before(t => { + // install mongoDB + sh.exec(`kubectl create namespace ${mongoDBNamespace}`) + const mongoDBTmpFile = tmp.fileSync() + fs.writeFileSync(mongoDBTmpFile.name, mongoDBdeployYaml) + + t.is(0, sh.exec(`kubectl apply --namespace ${mongoDBNamespace} -f ${mongoDBTmpFile.name}`).code, 'creating a MongoDB deployment should work.') + // wait for mongoDB to load + let mongoDBReadyReplicaCount = '0' + for (let i = 0; i < 30; i++) { + mongoDBReadyReplicaCount = sh.exec(`kubectl get deploy/mongodb -n ${mongoDBNamespace} -o jsonpath='{.status.readyReplicas}'`).stdout + if (mongoDBReadyReplicaCount != '1') { + sh.exec('sleep 2s') + } + } + t.is('1', mongoDBReadyReplicaCount, 'MongoDB is not in a ready state') + + const createUserJS = `db.createUser({ user:"${mongoDBUsername}",pwd:"${mongoDBPassword}",roles:[{ role:"readWrite", db: "${mongoDBDatabase}"}]})` + const LoginJS = `db.auth("${mongoDBUsername}","${mongoDBPassword}")` + + const mongoDBPod = sh.exec(`kubectl get po -n ${mongoDBNamespace} -o jsonpath='{.items[0].metadata.name}'`).stdout + t.not(mongoDBPod, '') + sh.exec(`kubectl exec -n ${mongoDBNamespace} ${mongoDBPod} -- mongo --eval \'${createUserJS}\'`) + sh.exec(`kubectl exec -n ${mongoDBNamespace} ${mongoDBPod} -- mongo --eval \'${LoginJS}\'`) + + sh.config.silent = true + // create test namespace + sh.exec(`kubectl create namespace ${testNamespace}`) + + // deploy streams consumer app, scaled job etc. + const tmpFile = tmp.fileSync() + const ConnectionString = Buffer.from(`mongodb://${mongoDBUsername}:${mongoDBPassword}@mongodb-svc.${mongoDBNamespace}.svc.cluster.local:27017/${mongoDBDatabase}`).toString() + const base64ConnectionString = Buffer.from(`mongodb://${mongoDBUsername}:${mongoDBPassword}@mongodb-svc.${mongoDBNamespace}.svc.cluster.local:27017/${mongoDBDatabase}`).toString('base64') + + fs.writeFileSync(tmpFile.name, deployYaml. + replace(/{{MONGODB_CONNECTION_STRING_BASE64}}/g, base64ConnectionString). + replace(/{{MONGODB_JOB_NAME}}/g, mongoJobName). + replace(/{{MONGODB_DATABASE}}/g, mongoDBDatabase). + replace(/{{MONGODB_COLLECTION}}/g, mongodbCollection). + replace(/{{MONGODB_CONNECTION_STRING}}/g,ConnectionString)) + + t.is( + 0, + sh.exec(`kubectl apply -f ${tmpFile.name} --namespace ${testNamespace}`).code, + 'creating a deployment should work..' + ) + +}) + +test.serial('Job should have 0 job on start', t => { + const jobCount = sh.exec( + `kubectl get job --namespace ${testNamespace}` + ).stdout + t.is(jobCount, '', 'job count should start out as 0') +}) + +test.serial(`Job should scale to 5 then back to 0`, t => { + // insert data to mongodb + const InsertJS = `db.${mongodbCollection}.insert([ + {"region":"eu-1","state":"running","plan":"planA","goods":"apple"}, + {"region":"eu-1","state":"running","plan":"planA","goods":"orange"}, + {"region":"eu-1","state":"running","plan":"planA","goods":"strawberry"}, + {"region":"eu-1","state":"running","plan":"planA","goods":"cherry"}, + {"region":"eu-1","state":"running","plan":"planA","goods":"pineapple"} + ])` + const mongoDBPod = sh.exec(`kubectl get po -n ${mongoDBNamespace} -o jsonpath='{.items[0].metadata.name}'`).stdout + t.not(mongoDBPod, '') + + t.is( + 0, + sh.exec(`kubectl exec -n ${mongoDBNamespace} ${mongoDBPod} -- mongo --eval \'${InsertJS}\'`).code, + 'insert 5 mongo record' + ) + + let jobCount = '0' + // maxJobCount = real Job + first line of output + const maxJobCount = '6' + + for (let i = 0; i < 30 && jobCount !== maxJobCount; i++) { + jobCount = sh.exec( + `kubectl get job --namespace ${testNamespace} | wc -l` + ).stdout.replace(/[\r\n]/g,"") + + if (jobCount !== maxJobCount) { + sh.exec('sleep 2s') + } + } + + t.is(maxJobCount, jobCount, `Job count should be ${maxJobCount} after 60 seconds`) + + for (let i = 0; i < 36 && jobCount !== '0'; i++) { + jobCount = sh.exec( + `kubectl get job --namespace ${testNamespace} | wc -l` + ).stdout.replace(/[\r\n]/g,"") + if (jobCount !== '0') { + sh.exec('sleep 5s') + } + } + + t.is('0', jobCount, 'Job count should be 0 after 3 minutes') +}) + +test.after.always.cb('clean up mongodb deployment', t => { + const resources = [ + `scaledJob.keda.sh/${mongoJobName}`, + 'triggerauthentication.keda.sh/mongodb-trigger', + `deployment.apps/mongodb`, + 'secret/mongodb-secret', + ] + + for (const resource of resources) { + sh.exec(`kubectl delete ${resource} --namespace ${testNamespace}`) + } + sh.exec(`kubectl delete namespace ${testNamespace}`) + + // uninstall mongodb + sh.exec(`kubectl delete --namespace ${mongoDBNamespace} deploy/mongodb`) + sh.exec(`kubectl delete namespace ${mongoDBNamespace}`) + + t.end() +}) + +const mongoDBdeployYaml = ` +apiVersion: apps/v1 +kind: Deployment +metadata: + name: mongodb +spec: + replicas: 1 + selector: + matchLabels: + name: mongodb + template: + metadata: + labels: + name: mongodb + spec: + containers: + - name: mongodb + image: mongo:4.2.1 + imagePullPolicy: IfNotPresent + ports: + - containerPort: 27017 + name: mongodb + protocol: TCP +--- +kind: Service +apiVersion: v1 +metadata: + name: mongodb-svc +spec: + type: ClusterIP + ports: + - name: mongodb + port: 27017 + targetPort: 27017 + protocol: TCP + selector: + name: mongodb +` + +const deployYaml = ` +apiVersion: keda.sh/v1alpha1 +kind: ScaledJob +metadata: + name: {{MONGODB_JOB_NAME}} +spec: + jobTargetRef: + template: + spec: + containers: + - name: mongodb-update + image: 1314520999/mongodb-update:latest + args: + - --connectStr={{MONGODB_CONNECTION_STRING}} + - --dataBase={{MONGODB_DATABASE}} + - --collection={{MONGODB_COLLECTION}} + imagePullPolicy: IfNotPresent + restartPolicy: Never + backoffLimit: 1 + pollingInterval: 20 + successfulJobsHistoryLimit: 0 + failedJobsHistoryLimit: 10 + triggers: + - type: mongoDB + metadata: + dbName: {{MONGODB_DATABASE}} + collection: {{MONGODB_COLLECTION}} + query: '{"region":"eu-1","state":"running","plan":"planA"}' + queryValue: "1" + authenticationRef: + name: mongodb-trigger +--- +apiVersion: keda.sh/v1alpha1 +kind: TriggerAuthentication +metadata: + name: mongodb-trigger +spec: + secretTargetRef: + - parameter: connectionString + name: mongodb-secret + key: connect +--- +apiVersion: v1 +kind: Secret +metadata: + name: mongodb-secret +type: Opaque +data: + connect: {{MONGODB_CONNECTION_STRING_BASE64}} +` \ No newline at end of file