Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add NATS JetStream scaler #3468

Merged
merged 3 commits into from
Aug 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,16 @@ issues:
- linters:
- staticcheck
text: "SA1019: package github.com/Azure/azure-sdk-for-go/services/preview/monitor/mgmt/2018-03-01/insights"
# Exclude for stan_scaler and nats_jetstream_scaler, reason:
# pkg/scalers/nats_jetstream_scaler.go:109: 109-153 lines are duplicate of `pkg/scalers/stan_scaler.go:83-127` (dupl)
- path: nats_jetstream_scaler.go
linters:
- dupl
# Exclude for stan_scaler and nats_jetstream_scaler, reason:
# pkg/scalers/stan_scaler.go:83: 83-127 lines are duplicate of `pkg/scalers/nats_jetstream_scaler.go:109-153` (dupl)
- path: stan_scaler.go
linters:
- dupl

linters-settings:
funlen:
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ To learn more about our roadmap, we recommend reading [this document](ROADMAP.md

### New

- **General:** Add new NATS JetStream scaler ([#2391](https://github.com/kedacore/keda/issues/2391))
- **General:** Add support for `minReplicaCount` in ScaledJob ([#3426](https://github.com/kedacore/keda/issues/3426))
- **General:** Add support to customize HPA name ([#3057](https://github.com/kedacore/keda/issues/3057))
- **General:** Basic setup for migrating e2e tests to Go. ([#2737](https://github.com/kedacore/keda/issues/2737))
Expand Down
262 changes: 262 additions & 0 deletions pkg/scalers/nats_jetstream_scaler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,262 @@
package scalers

import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"strconv"

v2beta2 "k8s.io/api/autoscaling/v2beta2"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/metrics/pkg/apis/external_metrics"
logf "sigs.k8s.io/controller-runtime/pkg/log"

kedautil "github.com/kedacore/keda/v2/pkg/util"
)

const (
jetStreamMetricType = "External"
defaultJetStreamLagThreshold = 10
)

var jsLog = logf.Log.WithName("nats_jetstream_scaler")

type natsJetStreamScaler struct {
metricType v2beta2.MetricTargetType
stream *streamDetail
metadata natsJetStreamMetadata
httpClient *http.Client
}

type natsJetStreamMetadata struct {
monitoringEndpoint string
account string
stream string
consumer string
lagThreshold int64
activationLagThreshold int64
scalerIndex int
}

type jetStreamEndpointResponse struct {
Accounts []accountDetail `json:"account_details"`
}

type accountDetail struct {
Name string `json:"name"`
Streams []*streamDetail `json:"stream_detail"`
}

type streamDetail struct {
Name string `json:"name"`
Config streamConfig `json:"config"`
State streamState `json:"state"`
Consumers []consumerDetail `json:"consumer_detail"`
}

type streamConfig struct {
Subjects []string `json:"subjects"`
}

type streamState struct {
MsgCount int64 `json:"messages"`
LastSequence int64 `json:"last_seq"`
}

type consumerDetail struct {
StreamName string `json:"stream_name"`
Name string `json:"name"`
NumAckPending int `json:"num_ack_pending"`
NumRedelivered int `json:"num_redelivered"`
NumWaiting int `json:"num_waiting"`
NumPending int `json:"num_pending"`
Config consumerConfig `json:"config"`
DeliveryStatus consumerDeliveryStatus `json:"delivery"`
}

type consumerConfig struct {
DurableName string `json:"durable_name"`
}

type consumerDeliveryStatus struct {
ConsumerSequence int64 `json:"customer_seq"`
StreamSequence int64 `json:"stream_seq"`
}

func NewNATSJetStreamScaler(config *ScalerConfig) (Scaler, error) {
metricType, err := GetMetricTargetType(config)
if err != nil {
return nil, fmt.Errorf("error getting scaler metric type: %s", err)
}

jsMetadata, err := parseNATSJetStreamMetadata(config)
if err != nil {
return nil, fmt.Errorf("error parsing NATS JetStream metadata: %s", err)
}

return &natsJetStreamScaler{
metricType: metricType,
stream: &streamDetail{},
metadata: jsMetadata,
httpClient: kedautil.CreateHTTPClient(config.GlobalHTTPTimeout, false),
}, nil
}

func parseNATSJetStreamMetadata(config *ScalerConfig) (natsJetStreamMetadata, error) {
meta := natsJetStreamMetadata{}
var err error
meta.monitoringEndpoint, err = GetFromAuthOrMeta(config, "natsServerMonitoringEndpoint")
if err != nil {
return meta, err
}

if config.TriggerMetadata["account"] == "" {
return meta, errors.New("no account name given")
}
meta.account = config.TriggerMetadata["account"]

if config.TriggerMetadata["stream"] == "" {
return meta, errors.New("no stream name given")
}
meta.stream = config.TriggerMetadata["stream"]

if config.TriggerMetadata["consumer"] == "" {
return meta, errors.New("no consumer name given")
}
meta.consumer = config.TriggerMetadata["consumer"]

meta.lagThreshold = defaultJetStreamLagThreshold

if val, ok := config.TriggerMetadata[lagThresholdMetricName]; ok {
t, err := strconv.ParseInt(val, 10, 64)
if err != nil {
return meta, fmt.Errorf("error parsing %s: %s", lagThresholdMetricName, err)
}
meta.lagThreshold = t
}

meta.activationLagThreshold = 0
if val, ok := config.TriggerMetadata["activationLagThreshold"]; ok {
activationTargetQueryValue, err := strconv.ParseInt(val, 10, 64)
if err != nil {
return meta, fmt.Errorf("activationLagThreshold parsing error %s", err.Error())
}
meta.activationLagThreshold = activationTargetQueryValue
}

meta.scalerIndex = config.ScalerIndex
return meta, nil
}

func (s *natsJetStreamScaler) getNATSJetStreamEndpoint() string {
return fmt.Sprintf("http://%s/jsz?acc=%s&consumers=true&config=true", s.metadata.monitoringEndpoint, s.metadata.account)
}

func (s *natsJetStreamScaler) IsActive(ctx context.Context) (bool, error) {
zroubalik marked this conversation as resolved.
Show resolved Hide resolved
monitoringEndpoint := s.getNATSJetStreamEndpoint()

req, err := http.NewRequestWithContext(ctx, http.MethodGet, monitoringEndpoint, nil)
if err != nil {
return false, err
}

resp, err := s.httpClient.Do(req)
if err != nil {
jsLog.Error(err, "unable to access NATS JetStream monitoring endpoint", "natsServerMonitoringEndpoint", s.metadata.monitoringEndpoint)
return false, err
}

defer resp.Body.Close()
var jsAccountResp jetStreamEndpointResponse
if err = json.NewDecoder(resp.Body).Decode(&jsAccountResp); err != nil {
jsLog.Error(err, "unable to decode JetStream account response")
return false, err
}

// Find and assign the stream that we are looking for.
for _, account := range jsAccountResp.Accounts {
if account.Name == s.metadata.account {
for _, stream := range account.Streams {
if stream.Name == s.metadata.stream {
s.stream = stream
}
}
}
}
return s.getMaxMsgLag() > s.metadata.activationLagThreshold, nil
}

func (s *natsJetStreamScaler) getMaxMsgLag() int64 {
consumerName := s.metadata.consumer

for _, consumer := range s.stream.Consumers {
if consumer.Name == consumerName {
return int64(consumer.NumPending)
}
}
return s.stream.State.LastSequence
}

func (s *natsJetStreamScaler) GetMetricSpecForScaling(context.Context) []v2beta2.MetricSpec {
metricName := kedautil.NormalizeString(fmt.Sprintf("nats-jetstream-%s", s.metadata.stream))
externalMetric := &v2beta2.ExternalMetricSource{
Metric: v2beta2.MetricIdentifier{
Name: GenerateMetricNameWithIndex(s.metadata.scalerIndex, metricName),
},
Target: GetMetricTarget(s.metricType, s.metadata.lagThreshold),
}
metricSpec := v2beta2.MetricSpec{
External: externalMetric, Type: jetStreamMetricType,
}
return []v2beta2.MetricSpec{metricSpec}
}

func (s *natsJetStreamScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, s.getNATSJetStreamEndpoint(), nil)
if err != nil {
return nil, err
}

resp, err := s.httpClient.Do(req)
if err != nil {
jsLog.Error(err, "unable to access NATS JetStream monitoring endpoint", "natsServerMonitoringEndpoint", s.metadata.monitoringEndpoint)
return []external_metrics.ExternalMetricValue{}, err
}

defer resp.Body.Close()
var jsAccountResp jetStreamEndpointResponse
if err = json.NewDecoder(resp.Body).Decode(&jsAccountResp); err != nil {
jsLog.Error(err, "unable to decode JetStream account details")
return []external_metrics.ExternalMetricValue{}, err
}

// Find and assign the stream that we are looking for.
for _, account := range jsAccountResp.Accounts {
if account.Name == s.metadata.account {
for _, stream := range account.Streams {
if stream.Name == s.metadata.stream {
s.stream = stream
}
}
}
}

totalLag := s.getMaxMsgLag()
jsLog.V(1).Info("NATS JetStream Scaler: Providing metrics based on totalLag, threshold", "totalLag", totalLag, "lagThreshold", s.metadata.lagThreshold)

metric := external_metrics.ExternalMetricValue{
MetricName: metricName,
Value: *resource.NewQuantity(totalLag, resource.DecimalSI),
Timestamp: metav1.Now(),
}

return append([]external_metrics.ExternalMetricValue{}, metric), nil
}

func (s *natsJetStreamScaler) Close(context.Context) error {
return nil
}
2 changes: 2 additions & 0 deletions pkg/scaling/scale_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,8 @@ func buildScaler(ctx context.Context, client client.Client, triggerType string,
return scalers.NewMSSQLScaler(config)
case "mysql":
return scalers.NewMySQLScaler(config)
case "nats-jetstream":
return scalers.NewNATSJetStreamScaler(config)
case "new-relic":
return scalers.NewNewRelicScaler(config)
case "openstack-metric":
Expand Down
2 changes: 1 addition & 1 deletion tests/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ The test script will run in 3 phases:

After `utils/setup_test.go` is done, we expect to have KEDA setup in the `keda` namespace.

- **Tests:** Currently there are only scaler tests in `tests/scalers/`. Each test is kept in its own package. This is to prevent conflicting variable declarations for commoly used variables (**ex -** `testNamespace`). Individual scaler tests are run
- **Tests:** Currently there are only scaler tests in `tests/scalers/`. Each test is kept in its own package. This is to prevent conflicting variable declarations for commonly used variables (**ex -** `testNamespace`). Individual scaler tests are run
in parallel, but tests within a file can be run in parallel or in series. More about tests below.

- **Global cleanup:** This is done in [`utils/cleanup_test.go`](utils/cleanup_test.go). It cleans up all the resources created in `utils/setup_test.go`.
Expand Down
Loading