Skip to content

Commit

Permalink
fix: JetSteam scaler detects leader changes correctly (#6043)
Browse files Browse the repository at this point in the history
Signed-off-by: Jorge Turrado <jorge_turrado@hotmail.es>
  • Loading branch information
JorTurFer authored Aug 7, 2024
1 parent e137285 commit b57db3a
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 30 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ Here is an overview of all new **experimental** features:
### Fixes

- **General**: Hashicorp Vault PKI doesn't fail with due to KeyPair mismatch ([#6028](https://github.com/kedacore/keda/issues/6028))
- **JetStream**: Consumer leader change is correctly detected ([#6042](https://github.com/kedacore/keda/issues/6042))

### Deprecations

Expand Down
14 changes: 0 additions & 14 deletions pkg/scalers/nats_jetstream_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,20 +218,6 @@ func (s *natsJetStreamScaler) getNATSJetstreamMonitoringData(ctx context.Context
}

if s.metadata.clusterSize > 1 {
// we know who the consumer leader and its monitoring url is, query it directly
if s.metadata.consumerLeader != "" && s.metadata.monitoringLeaderURL != "" {
natsJetStreamMonitoringLeaderURL := s.metadata.monitoringLeaderURL

jetStreamAccountResp, err = s.getNATSJetstreamMonitoringRequest(ctx, natsJetStreamMonitoringLeaderURL)
if err != nil {
return err
}

s.setNATSJetStreamMonitoringData(jetStreamAccountResp, natsJetStreamMonitoringLeaderURL)
return nil
}

// we haven't found the consumer yet, grab the list of hosts and try each one
natsJetStreamMonitoringServerURL, err := s.getNATSJetStreamMonitoringServerURL("")
if err != nil {
return err
Expand Down
16 changes: 0 additions & 16 deletions pkg/scalers/nats_jetstream_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,22 +440,6 @@ func TestNATSJetStreamGetNATSJetstreamServerURL(t *testing.T) {
}
}

func TestInvalidateNATSJetStreamCachedMonitoringData(t *testing.T) {
meta, err := parseNATSJetStreamMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testNATSJetStreamGoodMetadata, TriggerIndex: 0})
if err != nil {
t.Fatal("Could not parse metadata:", err)
}

mockJetStreamScaler := natsJetStreamScaler{
stream: nil,
metadata: meta,
httpClient: http.DefaultClient,
logger: InitializeLogger(&scalersconfig.ScalerConfig{TriggerMetadata: testNATSJetStreamGoodMetadata, TriggerIndex: 0}, "nats_jetstream_scaler"),
}

mockJetStreamScaler.invalidateNATSJetStreamCachedMonitoringData()
}

func TestNATSJetStreamClose(t *testing.T) {
mockJetStreamScaler, err := NewNATSJetStreamScaler(&scalersconfig.ScalerConfig{TriggerMetadata: testNATSJetStreamGoodMetadata, TriggerIndex: 0})
if err != nil {
Expand Down
22 changes: 22 additions & 0 deletions tests/scalers/nats_jetstream/helper/nats_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,28 @@ spec:
]
restartPolicy: OnFailure
backoffLimit: 4
`

StepDownConsumer = `
apiVersion: batch/v1
kind: Job
metadata:
name: step-down
namespace: {{.TestNamespace}}
spec:
ttlSecondsAfterFinished: 15
template:
spec:
containers:
- name: stepdown
image: "natsio/nats-box:0.13.2"
imagePullPolicy: Always
command: [
'sh', '-c', 'nats context save local --server {{.NatsAddress}} --select &&
nats consumer cluster step-down {{.NatsStream}} {{.NatsConsumer}}'
]
restartPolicy: OnFailure
backoffLimit: 4
`

DeploymentTemplate = `
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,10 @@ func testActivation(t *testing.T, kc *k8s.Clientset, data nats.JetStreamDeployme

func testScaleOut(t *testing.T, kc *k8s.Clientset, data nats.JetStreamDeploymentTemplateData) {
t.Log("--- testing scale out ---")
// We force the change of consumer leader to ensure that KEDA detects the change and
// handles it properly
KubectlApplyWithTemplate(t, data, "stepDownTemplate", nats.StepDownConsumer)

KubectlApplyWithTemplate(t, data, "publishJobTemplate", nats.PublishJobTemplate)

assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, maxReplicaCount, 60, 3),
Expand Down

0 comments on commit b57db3a

Please sign in to comment.