Skip to content

Commit

Permalink
chore: build endpoint once in the parser
Browse files Browse the repository at this point in the history
  • Loading branch information
mknet3 committed Nov 6, 2022
1 parent 4a45996 commit fd307f9
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 42 deletions.
28 changes: 14 additions & 14 deletions pkg/scalers/nats_jetstream_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ type natsJetStreamScaler struct {

type natsJetStreamMetadata struct {
monitoringEndpoint string
useHTTPS bool
account string
stream string
consumer string
Expand Down Expand Up @@ -111,11 +110,6 @@ func NewNATSJetStreamScaler(config *ScalerConfig) (Scaler, error) {

func parseNATSJetStreamMetadata(config *ScalerConfig) (natsJetStreamMetadata, error) {
meta := natsJetStreamMetadata{}
var err error
meta.monitoringEndpoint, err = GetFromAuthOrMeta(config, "natsServerMonitoringEndpoint")
if err != nil {
return meta, err
}

if config.TriggerMetadata["account"] == "" {
return meta, errors.New("no account name given")
Expand Down Expand Up @@ -153,27 +147,33 @@ func parseNATSJetStreamMetadata(config *ScalerConfig) (natsJetStreamMetadata, er

meta.scalerIndex = config.ScalerIndex

meta.useHTTPS = false
natsServerEndpoint, err := GetFromAuthOrMeta(config, "natsServerMonitoringEndpoint")
if err != nil {
return meta, err
}
useHTTPS := false
if val, ok := config.TriggerMetadata["useHttps"]; ok {
useHTTPS, err := strconv.ParseBool(val)
useHTTPS, err = strconv.ParseBool(val)
if err != nil {
return meta, fmt.Errorf("useHTTPS parsing error %s", err.Error())
}
meta.useHTTPS = useHTTPS
}
meta.monitoringEndpoint = getNATSJetStreamEndpoint(useHTTPS, natsServerEndpoint, meta.account)

return meta, nil
}

func (s *natsJetStreamScaler) getNATSJetStreamEndpoint() string {
func getNATSJetStreamEndpoint(useHTTPS bool, natsServerEndpoint string, account string) string {
protocol := natsHTTPProtocol
if s.metadata.useHTTPS {
if useHTTPS {
protocol = natsHTTPSProtocol
}
return fmt.Sprintf("%s://%s/jsz?acc=%s&consumers=true&config=true", protocol, s.metadata.monitoringEndpoint, s.metadata.account)

return fmt.Sprintf("%s://%s/jsz?acc=%s&consumers=true&config=true", protocol, natsServerEndpoint, account)
}

func (s *natsJetStreamScaler) IsActive(ctx context.Context) (bool, error) {
monitoringEndpoint := s.getNATSJetStreamEndpoint()
monitoringEndpoint := s.metadata.monitoringEndpoint

req, err := http.NewRequestWithContext(ctx, http.MethodGet, monitoringEndpoint, nil)
if err != nil {
Expand Down Expand Up @@ -232,7 +232,7 @@ func (s *natsJetStreamScaler) GetMetricSpecForScaling(context.Context) []v2.Metr
}

func (s *natsJetStreamScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, s.getNATSJetStreamEndpoint(), nil)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, s.metadata.monitoringEndpoint, nil)
if err != nil {
return nil, err
}
Expand Down
17 changes: 16 additions & 1 deletion pkg/scalers/nats_jetstream_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ package scalers
import (
"context"
"net/http"
"strings"
"testing"

"github.com/stretchr/testify/assert"
)

type parseNATSJetStreamMetadataTestData struct {
Expand Down Expand Up @@ -52,7 +55,7 @@ func TestNATSJetStreamParseMetadata(t *testing.T) {
if err != nil && !testData.isError {
t.Error("Expected success but got error", err)
} else if testData.isError && err == nil {
t.Error("Expected error but got success")
t.Error("Expected error but got success" + testData.authParams["natsServerMonitoringEndpoint"] + "foo")
}
}
}
Expand All @@ -77,3 +80,15 @@ func TestNATSJetStreamGetMetricSpecForScaling(t *testing.T) {
}
}
}

func TestGetNATSJetStreamEndpointHTTPS(t *testing.T) {
endpoint := getNATSJetStreamEndpoint(true, "nats.nats:8222", "$G")

assert.True(t, strings.HasPrefix(endpoint, "https:"))
}

func TestGetNATSJetStreamEndpointHTTP(t *testing.T) {
endpoint := getNATSJetStreamEndpoint(false, "nats.nats:8222", "$G")

assert.True(t, strings.HasPrefix(endpoint, "http:"))
}
55 changes: 28 additions & 27 deletions pkg/scalers/stan_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,14 @@ type stanScaler struct {
}

type stanMetadata struct {
natsServerMonitoringEndpoint string
useHTTPS bool
queueGroup string
durableName string
subject string
lagThreshold int64
activationLagThreshold int64
scalerIndex int
monitoringEndpoint string
stanChannelsEndpoint string
queueGroup string
durableName string
subject string
lagThreshold int64
activationLagThreshold int64
scalerIndex int
}

const (
Expand Down Expand Up @@ -85,11 +85,6 @@ func NewStanScaler(config *ScalerConfig) (Scaler, error) {

func parseStanMetadata(config *ScalerConfig) (stanMetadata, error) {
meta := stanMetadata{}
var err error
meta.natsServerMonitoringEndpoint, err = GetFromAuthOrMeta(config, "natsServerMonitoringEndpoint")
if err != nil {
return meta, err
}

if config.TriggerMetadata["queueGroup"] == "" {
return meta, errors.New("no queue group given")
Expand Down Expand Up @@ -127,34 +122,40 @@ func parseStanMetadata(config *ScalerConfig) (stanMetadata, error) {

meta.scalerIndex = config.ScalerIndex

meta.useHTTPS = false
var err error
useHTTPS := false
if val, ok := config.TriggerMetadata["useHttps"]; ok {
useHTTPS, err := strconv.ParseBool(val)
useHTTPS, err = strconv.ParseBool(val)
if err != nil {
return meta, fmt.Errorf("useHTTPS parsing error %s", err.Error())
}
meta.useHTTPS = useHTTPS
}
natsServerEndpoint, err := GetFromAuthOrMeta(config, "natsServerMonitoringEndpoint")
if err != nil {
return meta, err
}
meta.stanChannelsEndpoint = getSTANChannelsEndpoint(useHTTPS, natsServerEndpoint)
meta.monitoringEndpoint = getMonitoringEndpoint(meta.stanChannelsEndpoint, meta.subject)

return meta, nil
}

// IsActive determines if we need to scale from zero
func (s *stanScaler) IsActive(ctx context.Context) (bool, error) {
monitoringEndpoint := s.getMonitoringEndpoint()
monitoringEndpoint := s.metadata.monitoringEndpoint

req, err := http.NewRequestWithContext(ctx, "GET", monitoringEndpoint, nil)
if err != nil {
return false, err
}
resp, err := s.httpClient.Do(req)
if err != nil {
s.logger.Error(err, "Unable to access the nats streaming broker monitoring endpoint", "natsServerMonitoringEndpoint", s.metadata.natsServerMonitoringEndpoint)
s.logger.Error(err, "Unable to access the nats streaming broker monitoring endpoint", "natsServerMonitoringEndpoint", s.metadata.monitoringEndpoint)
return false, err
}

if resp.StatusCode == 404 {
req, err := http.NewRequestWithContext(ctx, "GET", s.getSTANChannelsEndpoint(), nil)
req, err := http.NewRequestWithContext(ctx, "GET", s.metadata.stanChannelsEndpoint, nil)
if err != nil {
return false, err
}
Expand All @@ -166,7 +167,7 @@ func (s *stanScaler) IsActive(ctx context.Context) (bool, error) {
if baseResp.StatusCode == 404 {
s.logger.Info("Streaming broker endpoint returned 404. Please ensure it has been created", "url", monitoringEndpoint, "channelName", s.metadata.subject)
} else {
s.logger.Info("Unable to connect to STAN. Please ensure you have configured the ScaledObject with the correct endpoint.", "baseResp.StatusCode", baseResp.StatusCode, "natsServerMonitoringEndpoint", s.metadata.natsServerMonitoringEndpoint)
s.logger.Info("Unable to connect to STAN. Please ensure you have configured the ScaledObject with the correct endpoint.", "baseResp.StatusCode", baseResp.StatusCode, "monitoringEndpoint", s.metadata.monitoringEndpoint)
}

return false, err
Expand All @@ -180,16 +181,16 @@ func (s *stanScaler) IsActive(ctx context.Context) (bool, error) {
return s.hasPendingMessage() || s.getMaxMsgLag() > s.metadata.activationLagThreshold, nil
}

func (s *stanScaler) getSTANChannelsEndpoint() string {
func getSTANChannelsEndpoint(useHTTPS bool, natsServerEndpoint string) string {
protocol := natsStreamingHTTPProtocol
if s.metadata.useHTTPS {
if useHTTPS {
protocol = natsStreamingHTTPSProtocol
}
return fmt.Sprintf("%s://%s/streaming/channelsz", protocol, s.metadata.natsServerMonitoringEndpoint)
return fmt.Sprintf("%s://%s/streaming/channelsz", protocol, natsServerEndpoint)
}

func (s *stanScaler) getMonitoringEndpoint() string {
return s.getSTANChannelsEndpoint() + "?channel=" + s.metadata.subject + "&subs=1"
func getMonitoringEndpoint(stanChannelsEndpoint string, subject string) string {
return fmt.Sprintf("%s?channel=%s&subs=1", stanChannelsEndpoint, subject)
}

func (s *stanScaler) getMaxMsgLag() int64 {
Expand Down Expand Up @@ -244,14 +245,14 @@ func (s *stanScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec {

// GetMetrics returns value for a supported metric and an error if there is a problem getting the metric
func (s *stanScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) {
req, err := http.NewRequestWithContext(ctx, "GET", s.getMonitoringEndpoint(), nil)
req, err := http.NewRequestWithContext(ctx, "GET", s.metadata.monitoringEndpoint, nil)
if err != nil {
return nil, err
}
resp, err := s.httpClient.Do(req)

if err != nil {
s.logger.Error(err, "Unable to access the nats streaming broker monitoring endpoint", "natsServerMonitoringEndpoint", s.metadata.natsServerMonitoringEndpoint)
s.logger.Error(err, "Unable to access the nats streaming broker monitoring endpoint", "monitoringEndpoint", s.metadata.monitoringEndpoint)
return []external_metrics.ExternalMetricValue{}, err
}

Expand Down
15 changes: 15 additions & 0 deletions pkg/scalers/stan_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ package scalers
import (
"context"
"net/http"
"strings"
"testing"

"github.com/stretchr/testify/assert"
)

type parseStanMetadataTestData struct {
Expand Down Expand Up @@ -75,3 +78,15 @@ func TestStanGetMetricSpecForScaling(t *testing.T) {
}
}
}

func TestGetSTANChannelsEndpointHTTPS(t *testing.T) {
endpoint := getSTANChannelsEndpoint(true, "stan-nats-ss")

assert.True(t, strings.HasPrefix(endpoint, "https:"))
}

func TestGetSTANChannelsEndpointHTTP(t *testing.T) {
endpoint := getSTANChannelsEndpoint(false, "stan-nats-ss")

assert.True(t, strings.HasPrefix(endpoint, "http:"))
}

0 comments on commit fd307f9

Please sign in to comment.