diff --git a/CHANGELOG.md b/CHANGELOG.md index c8a3d1d1b8a..e772cff3e58 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ ### New - Can use Pod Identity with Azure Event Hub scaler ([#994](https://github.com/kedacore/keda/issues/994)) - Introducing InfluxDB scaler ([#1239](https://github.com/kedacore/keda/issues/1239)) +- Introducing MongoDB scaler ([#1465](https://github.com/kedacore/keda/issues/1465)) ### Improvements - Support add ScaledJob's label to its job ([#1311](https://github.com/kedacore/keda/issues/1311)) diff --git a/go.mod b/go.mod index 2598537b930..3a3895ec81a 100644 --- a/go.mod +++ b/go.mod @@ -36,6 +36,7 @@ require ( github.com/stretchr/testify v1.6.1 github.com/tidwall/gjson v1.6.4 github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c + go.mongodb.org/mongo-driver v1.1.2 golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897 // indirect google.golang.org/api v0.36.0 google.golang.org/genproto v0.0.0-20200904004341-0bd0a958aa1d diff --git a/go.sum b/go.sum index ebf3d01b3b1..f64781c987a 100644 --- a/go.sum +++ b/go.sum @@ -550,6 +550,7 @@ github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG github.com/go-sql-driver/mysql v1.4.1/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= github.com/go-sql-driver/mysql v1.5.0 h1:ozyZYNQW3x3HtqT1jira07DN2PArx2v7/mN66gGcHOs= github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= +github.com/go-stack/stack v1.8.0 h1:5SgMzNM5HxrEjV0ww2lTmX6E2Izsfxas4+YHWRs3Lsk= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/go-test/deep v1.0.2-0.20181118220953-042da051cf31/go.mod h1:wGDj63lr65AM2AQyKZd/NYHGb0R+1RLqB8NKt3aSFNA= github.com/go-test/deep v1.0.4/go.mod h1:wGDj63lr65AM2AQyKZd/NYHGb0R+1RLqB8NKt3aSFNA= @@ -891,6 +892,7 @@ github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfV github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:CzGEWj7cYgsdH8dAjBGEr58BoE7ScuLd+fwFZ44+/x8= +github.com/kedacore/keda v1.5.0 h1:c8xA1Vo3H7rPwFiWUX3CBXnjBSrbYDmUs9iEfDlf4bQ= github.com/kelseyhightower/envconfig v1.3.0/go.mod h1:cccZRl6mQpaq41TPp5QxidR+Sa3axMbJDNb//FQX6Gg= github.com/kelseyhightower/envconfig v1.4.0/go.mod h1:cccZRl6mQpaq41TPp5QxidR+Sa3axMbJDNb//FQX6Gg= github.com/kevinburke/ssh_config v0.0.0-20190725054713-01f96b0aa0cd/go.mod h1:CT57kijsi8u/K/BOFA39wgDQJ9CxiF4nAY/ojJ6r6mM= @@ -1395,6 +1397,7 @@ go.etcd.io/etcd v0.0.0-20191023171146-3cf2f69b5738 h1:VcrIfasaLFkyjk6KNlXQSzO+B0 go.etcd.io/etcd v0.0.0-20191023171146-3cf2f69b5738/go.mod h1:dnLIgRNXwCJa5e+c6mIZCrds/GIG4ncV9HhK5PX7jPg= go.mongodb.org/mongo-driver v1.0.3/go.mod h1:u7ryQJ+DOzQmeO7zB6MHyr8jkEQvC8vH7qLUO4lqsUM= go.mongodb.org/mongo-driver v1.1.1/go.mod h1:u7ryQJ+DOzQmeO7zB6MHyr8jkEQvC8vH7qLUO4lqsUM= +go.mongodb.org/mongo-driver v1.1.2 h1:jxcFYjlkl8xaERsgLo+RNquI0epW6zuy/ZRQs6jnrFA= go.mongodb.org/mongo-driver v1.1.2/go.mod h1:u7ryQJ+DOzQmeO7zB6MHyr8jkEQvC8vH7qLUO4lqsUM= go.opencensus.io v0.15.0/go.mod h1:UffZAU+4sDEINUGP/B7UfBBkq4fqLu9zXAX7ke6CHW0= go.opencensus.io v0.17.0/go.mod h1:mp1VrMQxhlqqDpKvH4UcQUa4YwlzNmymAjPrDdfxNpI= diff --git a/pkg/scalers/mongo.go b/pkg/scalers/mongo.go new file mode 100644 index 00000000000..a8d59fa3d2a --- /dev/null +++ b/pkg/scalers/mongo.go @@ -0,0 +1,291 @@ +package scalers + +import ( + "context" + "errors" + "fmt" + "strconv" + "time" + + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/bson/primitive" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" + "go.mongodb.org/mongo-driver/mongo/readpref" + "go.mongodb.org/mongo-driver/x/bsonx" + "k8s.io/api/autoscaling/v2beta2" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/metrics/pkg/apis/external_metrics" + logf "sigs.k8s.io/controller-runtime/pkg/log" + + kedautil "github.com/kedacore/keda/v2/pkg/util" +) + +// mongoScaler is support for mongo in keda. +type mongoScaler struct { + metadata *mongoMetadata + client *mongo.Client +} + +// mongoMetadata specify mongo scaler params. +type mongoMetadata struct { + // The string is used by connected with mongo. + // +optional + connectionString string + // Specify the host to connect to the mongo server,if the connectionString be provided, don't need specify this param. + // +optional + host string + // Specify the port to connect to the mongo server,if the connectionString be provided, don't need specify this param. + // +optional + port string + // Specify the username to connect to the mongo server,if the connectionString be provided, don't need specify this param. + // +optional + username string + // Specify the password to connect to the mongo server,if the connectionString be provided, don't need specify this param. + // +optional + password string + + // The name of the database to be queried. + // +required + dbName string + // The name of the collection to be queried. + // +required + collection string + // A mongo filter doc,used by specify DB. + // +required + query string + // A threshold that is used as targetAverageValue in HPA + // +required + queryValue int +} + +// Default variables and settings +const ( + mongoDefaultTimeOut = 10 * time.Second + defaultCollection = "default" + defaultDB = "local" + defaultQueryValue = 1 +) + +type mongoFields struct { + ID primitive.ObjectID `bson:"_id, omitempty"` +} + +var mongoLog = logf.Log.WithName("mongo_scaler") + +// NewMongoScaler creates a new Mongo scaler +func NewMongoScaler(config *ScalerConfig) (Scaler, error) { + ctx, cancel := context.WithTimeout(context.Background(), mongoDefaultTimeOut) + defer cancel() + + meta, connStr, err := parseMongoMetadata(config) + if err != nil { + return nil, fmt.Errorf("failed to parsing mongo metadata,because of %v", err) + } + + opt := options.Client().ApplyURI(connStr) + client, err := mongo.Connect(ctx, opt) + if err != nil { + return nil, fmt.Errorf("failed to establish connection with mongo,because of %v", err) + } + + if err = client.Ping(ctx, readpref.Primary()); err != nil { + return nil, fmt.Errorf("failed to ping mongo,because of %v", err) + } + + return &mongoScaler{ + metadata: meta, + client: client, + }, nil +} + +func parseMongoMetadata(config *ScalerConfig) (*mongoMetadata, string, error) { + var connStr string + // setting default metadata + meta := mongoMetadata{ + collection: defaultCollection, + query: "", + queryValue: defaultQueryValue, + dbName: defaultDB, + } + + // parse metaData from ScaledJob config + if val, ok := config.TriggerMetadata["collection"]; ok { + meta.collection = val + } else { + return nil, "", fmt.Errorf("no collection given") + } + + if val, ok := config.TriggerMetadata["query"]; ok { + meta.query = val + } else { + return nil, "", fmt.Errorf("no query given") + } + + if val, ok := config.TriggerMetadata["queryValue"]; ok { + queryValue, err := strconv.Atoi(val) + if err != nil { + return nil, "", fmt.Errorf("failed to convert %v to int,because of %v", queryValue, err.Error()) + } + meta.queryValue = queryValue + } else { + return nil, "", fmt.Errorf("no queryValue given") + } + + if val, ok := config.TriggerMetadata["dbName"]; ok { + meta.dbName = val + } else { + return nil, "", fmt.Errorf("no dbName given") + } + + // Resolve connectionString + if c, ok := config.AuthParams["connectionString"]; ok { + meta.connectionString = c + } else if v, ok := config.TriggerMetadata["connectionStringFromEnv"]; ok { + meta.connectionString = config.ResolvedEnv[v] + } else { + meta.connectionString = "" + if val, ok := config.TriggerMetadata["host"]; ok { + meta.host = val + } else { + return nil, "", fmt.Errorf("no host given") + } + if val, ok := config.TriggerMetadata["port"]; ok { + meta.port = val + } else { + return nil, "", fmt.Errorf("no port given") + } + + if val, ok := config.TriggerMetadata["username"]; ok { + meta.username = val + } else { + return nil, "", fmt.Errorf("no username given") + } + // get password from env or authParams + if v, ok := config.AuthParams["password"]; ok { + meta.password = v + } else if v, ok := config.TriggerMetadata["passwordFromEnv"]; ok { + meta.password = config.ResolvedEnv[v] + } + + if len(meta.password) == 0 { + return nil, "", fmt.Errorf("no password given") + } + } + + if meta.connectionString != "" { + connStr = meta.connectionString + } else { + // Build connection str + addr := fmt.Sprintf("%s:%s", meta.host, meta.port) + auth := fmt.Sprintf("%s:%s", meta.username, meta.password) + connStr = "mongodb://" + auth + "@" + addr + } + + return &meta, connStr, nil +} + +func (s *mongoScaler) IsActive(ctx context.Context) (bool, error) { + result, err := s.getQueryResult() + if err != nil { + mongoLog.Error(err, fmt.Sprintf("failed to get query result by mongo,because of %v", err)) + return false, err + } + return result > 0, nil +} + +// Close disposes of mongo connections +func (s *mongoScaler) Close() error { + if s.client != nil { + err := s.client.Disconnect(context.TODO()) + if err != nil { + mongoLog.Error(err, fmt.Sprintf("failed to close mongo connection,because of %v", err)) + return err + } + } + + return nil +} + +// getQueryResult query mongo by meta.query +func (s *mongoScaler) getQueryResult() (int, error) { + var ( + results []mongoFields + ) + + ctx, cancel := context.WithTimeout(context.Background(), mongoDefaultTimeOut) + defer cancel() + + filter, err := json2BsonDoc(s.metadata.query) + if err != nil { + mongoLog.Error(err, fmt.Sprintf("failed to convert query param to bson.Doc,because of %v", err)) + return 0, err + } + + cursor, err := s.client.Database(s.metadata.dbName).Collection(s.metadata.collection).Find(ctx, filter) + if err != nil { + mongoLog.Error(err, fmt.Sprintf("failed to query %v in %v,because of %v", s.metadata.dbName, s.metadata.collection, err)) + return 0, err + } + + err = cursor.All(ctx, &results) + if err != nil { + mongoLog.Error(err, fmt.Sprintf("failed to traversal the query results,because of %v", err)) + return 0, err + } + + return len(results), nil +} + +// GetMetrics query from mongo,and return to external metrics +func (s *mongoScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) { + num, err := s.getQueryResult() + if err != nil { + return []external_metrics.ExternalMetricValue{}, fmt.Errorf("failed to inspect momgo,because of %v", err) + } + + metric := external_metrics.ExternalMetricValue{ + MetricName: metricName, + Value: *resource.NewQuantity(int64(num), resource.DecimalSI), + Timestamp: metav1.Now(), + } + + return append([]external_metrics.ExternalMetricValue{}, metric), nil +} + +// GetMetricSpecForScaling get the query value for scaling +func (s *mongoScaler) GetMetricSpecForScaling() []v2beta2.MetricSpec { + targetQueryValue := resource.NewQuantity(int64(s.metadata.queryValue), resource.DecimalSI) + metricName := kedautil.NormalizeString(fmt.Sprintf("%s-%s", "mongo", s.metadata.connectionString)) + + externalMetric := &v2beta2.ExternalMetricSource{ + Metric: v2beta2.MetricIdentifier{ + Name: metricName, + }, + Target: v2beta2.MetricTarget{ + Type: v2beta2.AverageValueMetricType, + AverageValue: targetQueryValue, + }, + } + metricSpec := v2beta2.MetricSpec{ + External: externalMetric, Type: externalMetricType, + } + return []v2beta2.MetricSpec{metricSpec} +} + +// json2BsonDoc convert Json to Bson.Doc +func json2BsonDoc(js string) (doc bsonx.Doc, err error) { + doc = bsonx.Doc{} + err = bson.UnmarshalExtJSON([]byte(js), true, &doc) + if err != nil { + return nil, err + } + + if len(doc) == 0 { + return nil, errors.New("empty bson document") + } + + return doc, nil +} diff --git a/pkg/scalers/mongo_test.go b/pkg/scalers/mongo_test.go new file mode 100644 index 00000000000..fcd3866f9cb --- /dev/null +++ b/pkg/scalers/mongo_test.go @@ -0,0 +1,81 @@ +package scalers + +import ( + "testing" + + "go.mongodb.org/mongo-driver/mongo" +) + +var testMongoResolvedEnv = map[string]string{ + "Mongo_CONN_STR": "test_conn_str", + "Mongo_PASSWORD": "test", +} + +type parseMongoMetadataTestData struct { + metadata map[string]string + resolvedEnv map[string]string + raisesError bool +} + +type mongoMetricIdentifier struct { + metadataTestData *parseMongoMetadataTestData + name string +} + +var testMONGOMetadata = []parseMongoMetadataTestData{ + // No metadata + { + metadata: map[string]string{}, + resolvedEnv: testMongoResolvedEnv, + raisesError: true, + }, + // connectionStringFromEnv + { + metadata: map[string]string{"query": `{"name":"John"}`, "collection": "demo", "queryValue": "12", "connectionStringFromEnv": "Mongo_CONN_STR", "dbName": "test"}, + resolvedEnv: testMongoResolvedEnv, + raisesError: false, + }, +} + +var mongoMetricIdentifiers = []mongoMetricIdentifier{ + {metadataTestData: &testMONGOMetadata[1], name: "mongo-test_conn_str"}, +} + +func TestParseMongoMetadata(t *testing.T) { + for _, testData := range testMONGOMetadata { + _, _, err := parseMongoMetadata(&ScalerConfig{TriggerMetadata: testData.metadata}) + if err != nil && !testData.raisesError { + t.Error("Expected success but got error:", err) + } + if err == nil && testData.raisesError { + t.Error("Expected error but got success") + } + } +} + +func TestMongoGetMetricSpecForScaling(t *testing.T) { + for _, testData := range mongoMetricIdentifiers { + meta, _, err := parseMongoMetadata(&ScalerConfig{ResolvedEnv: testData.metadataTestData.resolvedEnv, TriggerMetadata: testData.metadataTestData.metadata}) + if err != nil { + t.Fatal("Could not parse metadata:", err) + } + mockMongoScaler := mongoScaler{meta, &mongo.Client{}} + + metricSpec := mockMongoScaler.GetMetricSpecForScaling() + metricName := metricSpec[0].External.Metric.Name + if metricName != testData.name { + t.Error("Wrong External metric source name:", metricName) + } + } +} + +func TestJson2BsonDoc(t *testing.T) { + var testJSON = `{"name":"carson"}` + doc, err := json2BsonDoc(testJSON) + if err != nil { + t.Error("convert testJson to Bson.Doc err:", err) + } + if doc == nil { + t.Error("the doc is nil") + } +} diff --git a/pkg/scaling/scale_handler.go b/pkg/scaling/scale_handler.go index bba860fbf39..03a1958eaeb 100644 --- a/pkg/scaling/scale_handler.go +++ b/pkg/scaling/scale_handler.go @@ -450,6 +450,8 @@ func buildScaler(triggerType string, config *scalers.ScalerConfig) (scalers.Scal return scalers.NewCPUMemoryScaler(corev1.ResourceMemory, config) case "metrics-api": return scalers.NewMetricsAPIScaler(config) + case "mongo": + return scalers.NewMongoScaler(config) case "mysql": return scalers.NewMySQLScaler(config) case "postgresql":