Skip to content

Commit

Permalink
add mongoDB scaler
Browse files Browse the repository at this point in the history
Signed-off-by: 高强 <wb-gq555900@alibaba-inc.com>
  • Loading branch information
高强 committed Jan 9, 2021
1 parent fbfa738 commit dc989e6
Show file tree
Hide file tree
Showing 7 changed files with 620 additions and 0 deletions.
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@
### New
- Can use Pod Identity with Azure Event Hub scaler ([#994](https://github.com/kedacore/keda/issues/994))
- Introducing InfluxDB scaler ([#1239](https://github.com/kedacore/keda/issues/1239))
- Add Redis cluster support for Redis list and Redis streams scalers ([#1437](https://github.com/kedacore/keda/pull/1437))
- Global authentication credentials can be managed using ClusterTriggerAuthentication objects ([#1452](https://github.com/kedacore/keda/pull/1452))
- Introducing OpenStack Swift scaler ([#1342](https://github.com/kedacore/keda/issues/1342))
- Introducing MongoDB scaler ([#1465](https://github.com/kedacore/keda/issues/1465))

### Improvements
- Support add ScaledJob's label to its job ([#1311](https://github.com/kedacore/keda/issues/1311))
Expand All @@ -27,6 +31,13 @@
- Mask password in postgres scaler auto generated metricName. ([PR #1381](https://github.com/kedacore/keda/pull/1381))
- Bug fix for pending jobs in ScaledJob's accurateScalingStrategy . ([#1323](https://github.com/kedacore/keda/issues/1323))
- Fix memory leak because of unclosed scalers. ([#1413](https://github.com/kedacore/keda/issues/1413))
- Override the vhost on a RabbitMQ scaler via `vhostName` in the metadata. ([#1451](https://github.com/kedacore/keda/pull/1451))
- Optimize Kafka scaler's `getLagForPartition` function. ([#1464](https://github.com/kedacore/keda/pull/1464))
- Reduce unnecessary /scale requests from ScaledObject controller ([#1453](https://github.com/kedacore/keda/pull/1453))
- Add support for the WATCH_NAMESPACE environment variable to the operator ([#1474](https://github.com/kedacore/keda/pull/1474))
- Automatically determine the RabbitMQ protocol when possible, and support setting the protocl via TriggerAuthentication ([#1459](https://github.com/kedacore/keda/pulls/1459),[#1483](https://github.com/kedacore/keda/pull/1483))
- Improve performance when fetching pod information ([#1457](https://github.com/kedacore/keda/pull/1457))
- Improve performance when fetching current scaling information on Deployments ([#1458](https://github.com/kedacore/keda/pull/1458))

### Breaking Changes

Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ require (
github.com/stretchr/testify v1.6.1
github.com/tidwall/gjson v1.6.4
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c
go.mongodb.org/mongo-driver v1.1.2
golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897 // indirect
google.golang.org/api v0.36.0
google.golang.org/genproto v0.0.0-20200904004341-0bd0a958aa1d
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -550,6 +550,7 @@ github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG
github.com/go-sql-driver/mysql v1.4.1/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w=
github.com/go-sql-driver/mysql v1.5.0 h1:ozyZYNQW3x3HtqT1jira07DN2PArx2v7/mN66gGcHOs=
github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
github.com/go-stack/stack v1.8.0 h1:5SgMzNM5HxrEjV0ww2lTmX6E2Izsfxas4+YHWRs3Lsk=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/go-test/deep v1.0.2-0.20181118220953-042da051cf31/go.mod h1:wGDj63lr65AM2AQyKZd/NYHGb0R+1RLqB8NKt3aSFNA=
github.com/go-test/deep v1.0.4/go.mod h1:wGDj63lr65AM2AQyKZd/NYHGb0R+1RLqB8NKt3aSFNA=
Expand Down Expand Up @@ -891,6 +892,7 @@ github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfV
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM=
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:CzGEWj7cYgsdH8dAjBGEr58BoE7ScuLd+fwFZ44+/x8=
github.com/kedacore/keda v1.5.0 h1:c8xA1Vo3H7rPwFiWUX3CBXnjBSrbYDmUs9iEfDlf4bQ=
github.com/kelseyhightower/envconfig v1.3.0/go.mod h1:cccZRl6mQpaq41TPp5QxidR+Sa3axMbJDNb//FQX6Gg=
github.com/kelseyhightower/envconfig v1.4.0/go.mod h1:cccZRl6mQpaq41TPp5QxidR+Sa3axMbJDNb//FQX6Gg=
github.com/kevinburke/ssh_config v0.0.0-20190725054713-01f96b0aa0cd/go.mod h1:CT57kijsi8u/K/BOFA39wgDQJ9CxiF4nAY/ojJ6r6mM=
Expand Down Expand Up @@ -1395,6 +1397,7 @@ go.etcd.io/etcd v0.0.0-20191023171146-3cf2f69b5738 h1:VcrIfasaLFkyjk6KNlXQSzO+B0
go.etcd.io/etcd v0.0.0-20191023171146-3cf2f69b5738/go.mod h1:dnLIgRNXwCJa5e+c6mIZCrds/GIG4ncV9HhK5PX7jPg=
go.mongodb.org/mongo-driver v1.0.3/go.mod h1:u7ryQJ+DOzQmeO7zB6MHyr8jkEQvC8vH7qLUO4lqsUM=
go.mongodb.org/mongo-driver v1.1.1/go.mod h1:u7ryQJ+DOzQmeO7zB6MHyr8jkEQvC8vH7qLUO4lqsUM=
go.mongodb.org/mongo-driver v1.1.2 h1:jxcFYjlkl8xaERsgLo+RNquI0epW6zuy/ZRQs6jnrFA=
go.mongodb.org/mongo-driver v1.1.2/go.mod h1:u7ryQJ+DOzQmeO7zB6MHyr8jkEQvC8vH7qLUO4lqsUM=
go.opencensus.io v0.15.0/go.mod h1:UffZAU+4sDEINUGP/B7UfBBkq4fqLu9zXAX7ke6CHW0=
go.opencensus.io v0.17.0/go.mod h1:mp1VrMQxhlqqDpKvH4UcQUa4YwlzNmymAjPrDdfxNpI=
Expand Down
291 changes: 291 additions & 0 deletions pkg/scalers/mongo.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,291 @@
package scalers

import (
"context"
"errors"
"fmt"
"strconv"
"time"

"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/mongo/readpref"
"go.mongodb.org/mongo-driver/x/bsonx"
"k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/metrics/pkg/apis/external_metrics"
logf "sigs.k8s.io/controller-runtime/pkg/log"

kedautil "github.com/kedacore/keda/v2/pkg/util"
)

// mongoDBScaler is support for mongoDB in keda.
type mongoDBScaler struct {
metadata *mongoDBMetadata
client *mongo.Client
}

// mongoDBMetadata specify mongoDB scaler params.
type mongoDBMetadata struct {
// The string is used by connected with mongoDB.
// +optional
connectionString string
// Specify the host to connect to the mongoDB server,if the connectionString be provided, don't need specify this param.
// +optional
host string
// Specify the port to connect to the mongoDB server,if the connectionString be provided, don't need specify this param.
// +optional
port string
// Specify the username to connect to the mongoDB server,if the connectionString be provided, don't need specify this param.
// +optional
username string
// Specify the password to connect to the mongoDB server,if the connectionString be provided, don't need specify this param.
// +optional
password string

// The name of the database to be queried.
// +required
dbName string
// The name of the collection to be queried.
// +required
collection string
// A mongoDB filter doc,used by specify DB.
// +required
query string
// A threshold that is used as targetAverageValue in HPA
// +required
queryValue int

metricName string
}

// Default variables and settings
const (
mongoDBDefaultTimeOut = 10 * time.Second
defaultCollection = "default"
defaultDB = "test"
defaultQueryValue = 1
)

type mongoDBFields struct {
ID primitive.ObjectID `bson:"_id, omitempty"`
}

var mongoDBLog = logf.Log.WithName("mongoDB_scaler")

// NewMongoDBScaler creates a new mongoDB scaler
func NewMongoDBScaler(config *ScalerConfig) (Scaler, error) {
ctx, cancel := context.WithTimeout(context.Background(), mongoDBDefaultTimeOut)
defer cancel()

meta, connStr, err := parseMongoDBMetadata(config)
if err != nil {
return nil, fmt.Errorf("failed to parsing mongoDB metadata,because of %v", err)
}

opt := options.Client().ApplyURI(connStr)
client, err := mongo.Connect(ctx, opt)
if err != nil {
return nil, fmt.Errorf("failed to establish connection with mongoDB,because of %v", err)
}

if err = client.Ping(ctx, readpref.Primary()); err != nil {
return nil, fmt.Errorf("failed to ping mongoDB,because of %v", err)
}

return &mongoDBScaler{
metadata: meta,
client: client,
}, nil
}

func parseMongoDBMetadata(config *ScalerConfig) (*mongoDBMetadata, string, error) {
var connStr string
// setting default metadata
meta := mongoDBMetadata{
collection: defaultCollection,
query: "",
queryValue: defaultQueryValue,
dbName: defaultDB,
}

// parse metaData from ScaledJob config
if val, ok := config.TriggerMetadata["collection"]; ok {
meta.collection = val
} else {
return nil, "", fmt.Errorf("no collection given")
}

if val, ok := config.TriggerMetadata["query"]; ok {
meta.query = val
} else {
return nil, "", fmt.Errorf("no query given")
}

if val, ok := config.TriggerMetadata["queryValue"]; ok {
queryValue, err := strconv.Atoi(val)
if err != nil {
return nil, "", fmt.Errorf("failed to convert %v to int,because of %v", queryValue, err.Error())
}
meta.queryValue = queryValue
} else {
return nil, "", fmt.Errorf("no queryValue given")
}

if val, ok := config.TriggerMetadata["dbName"]; ok {
meta.dbName = val
} else {
return nil, "", fmt.Errorf("no dbName given")
}

// Resolve connectionString
if c, ok := config.AuthParams["connectionString"]; ok {
meta.connectionString = c
} else if v, ok := config.TriggerMetadata["connectionStringFromEnv"]; ok {
meta.connectionString = config.ResolvedEnv[v]
} else {
meta.connectionString = ""
if val, ok := config.TriggerMetadata["host"]; ok {
meta.host = val
} else {
return nil, "", fmt.Errorf("no host given")
}
if val, ok := config.TriggerMetadata["port"]; ok {
meta.port = val
} else {
return nil, "", fmt.Errorf("no port given")
}

if val, ok := config.TriggerMetadata["username"]; ok {
meta.username = val
} else {
return nil, "", fmt.Errorf("no username given")
}
// get password from env or authParams
if v, ok := config.AuthParams["password"]; ok {
meta.password = v
} else if v, ok := config.TriggerMetadata["passwordFromEnv"]; ok {
meta.password = config.ResolvedEnv[v]
}

if len(meta.password) == 0 {
return nil, "", fmt.Errorf("no password given")
}
}

if meta.connectionString != "" {
connStr = meta.connectionString
} else {
// Build connection str
addr := fmt.Sprintf("%s:%s", meta.host, meta.port)
auth := fmt.Sprintf("%s:%s", meta.username, meta.password)
connStr = "mongodb://" + auth + "@" + addr
}

if val, ok := config.TriggerMetadata["metricName"]; ok {
meta.metricName = kedautil.NormalizeString(fmt.Sprintf("mongodb-%s", val))
} else {
maskedURL, err := kedautil.MaskPartOfURL(connStr, kedautil.Hostname)
if err != nil {
return nil, "", fmt.Errorf("failure masking part of url")
}
meta.metricName = kedautil.NormalizeString(fmt.Sprintf("mongodb-%s-%s", maskedURL, meta.collection))
}
return &meta, connStr, nil
}

func (s *mongoDBScaler) IsActive(ctx context.Context) (bool, error) {
result, err := s.getQueryResult()
if err != nil {
mongoDBLog.Error(err, fmt.Sprintf("failed to get query result by mongoDB,because of %v", err))
return false, err
}
return result > 0, nil
}

// Close disposes of mongoDB connections
func (s *mongoDBScaler) Close() error {
if s.client != nil {
err := s.client.Disconnect(context.TODO())
if err != nil {
mongoDBLog.Error(err, fmt.Sprintf("failed to close mongoDB connection,because of %v", err))
return err
}
}

return nil
}

// getQueryResult query mongoDB by meta.query
func (s *mongoDBScaler) getQueryResult() (int, error) {
ctx, cancel := context.WithTimeout(context.Background(), mongoDBDefaultTimeOut)
defer cancel()

filter, err := json2BsonDoc(s.metadata.query)
if err != nil {
mongoDBLog.Error(err, fmt.Sprintf("failed to convert query param to bson.Doc,because of %v", err))
return 0, err
}

docs_num, err := s.client.Database(s.metadata.dbName).Collection(s.metadata.collection).CountDocuments(ctx, filter)
if err != nil {
mongoDBLog.Error(err, fmt.Sprintf("failed to query %v in %v,because of %v", s.metadata.dbName, s.metadata.collection, err))
return 0, err
}

return int(docs_num), nil
}

// GetMetrics query from mongoDB,and return to external metrics
func (s *mongoDBScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) {
num, err := s.getQueryResult()
if err != nil {
return []external_metrics.ExternalMetricValue{}, fmt.Errorf("failed to inspect momgo,because of %v", err)
}

metric := external_metrics.ExternalMetricValue{
MetricName: metricName,
Value: *resource.NewQuantity(int64(num), resource.DecimalSI),
Timestamp: metav1.Now(),
}

return append([]external_metrics.ExternalMetricValue{}, metric), nil
}

// GetMetricSpecForScaling get the query value for scaling
func (s *mongoDBScaler) GetMetricSpecForScaling() []v2beta2.MetricSpec {
targetQueryValue := resource.NewQuantity(int64(s.metadata.queryValue), resource.DecimalSI)

externalMetric := &v2beta2.ExternalMetricSource{
Metric: v2beta2.MetricIdentifier{
Name: s.metadata.metricName,
},
Target: v2beta2.MetricTarget{
Type: v2beta2.AverageValueMetricType,
AverageValue: targetQueryValue,
},
}
metricSpec := v2beta2.MetricSpec{
External: externalMetric, Type: externalMetricType,
}
return []v2beta2.MetricSpec{metricSpec}
}

// json2BsonDoc convert Json to Bson.Doc
func json2BsonDoc(js string) (doc bsonx.Doc, err error) {
doc = bsonx.Doc{}
err = bson.UnmarshalExtJSON([]byte(js), true, &doc)
if err != nil {
return nil, err
}

if len(doc) == 0 {
return nil, errors.New("empty bson document")
}

return doc, nil
}
Loading

0 comments on commit dc989e6

Please sign in to comment.