Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG-5922] Report failing ScaledJob triggers in status #5916

Merged
merged 5 commits into from
Jul 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ Here is an overview of all new **experimental** features:
- **General**: Scalers are properly closed after being refreshed ([#5806](https://github.com/kedacore/keda/issues/5806))
- **MongoDB Scaler**: MongoDB url parses correctly `+srv` scheme ([#5760](https://github.com/kedacore/keda/issues/5760))
- **New Relic Scaler**: Fix CVE-2024-6104 in github.com/hashicorp/go-retryablehttp ([#5944](https://github.com/kedacore/keda/issues/5944))
- **ScaledJob**: Fix ScaledJob ignores failing trigger(s) error ([#5922](https://github.com/kedacore/keda/issues/5922))

### Deprecations

Expand Down
8 changes: 4 additions & 4 deletions pkg/mock/mock_scaling/mock_executor/mock_interface.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/scaling/executor/scale_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ const (

// ScaleExecutor contains methods RequestJobScale and RequestScale
type ScaleExecutor interface {
RequestJobScale(ctx context.Context, scaledJob *kedav1alpha1.ScaledJob, isActive bool, scaleTo int64, maxScale int64)
RequestJobScale(ctx context.Context, scaledJob *kedav1alpha1.ScaledJob, isActive bool, isError bool, scaleTo int64, maxScale int64)
RequestScale(ctx context.Context, scaledObject *kedav1alpha1.ScaledObject, isActive bool, isError bool, options *ScaleExecutorOptions)
}

Expand Down
15 changes: 14 additions & 1 deletion pkg/scaling/executor/scale_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ const (
defaultFailedJobsHistoryLimit = int32(100)
)

func (e *scaleExecutor) RequestJobScale(ctx context.Context, scaledJob *kedav1alpha1.ScaledJob, isActive bool, scaleTo int64, maxScale int64) {
func (e *scaleExecutor) RequestJobScale(ctx context.Context, scaledJob *kedav1alpha1.ScaledJob, isActive, isError bool, scaleTo int64, maxScale int64) {
logger := e.logger.WithValues("scaledJob.Name", scaledJob.Name, "scaledJob.Namespace", scaledJob.Namespace)

runningJobCount := e.getRunningJobCount(ctx, scaledJob)
Expand All @@ -65,6 +65,19 @@ func (e *scaleExecutor) RequestJobScale(ctx context.Context, scaledJob *kedav1al
logger.V(1).Info("No change in activity")
}

if isError {
// some triggers responded with error
// Set ScaledJob.Status.ReadyCondition to Unknown
readyCondition := scaledJob.Status.Conditions.GetReadyCondition()
msg := "Some triggers defined in ScaledJob are not working correctly"
logger.V(1).Info(msg)
if !readyCondition.IsUnknown() {
if err := e.setReadyCondition(ctx, logger, scaledJob, metav1.ConditionUnknown, "PartialTriggerError", msg); err != nil {
logger.Error(err, "error setting ready condition")
}
}
}

condition := scaledJob.Status.Conditions.GetActiveCondition()
if condition.IsUnknown() || condition.IsTrue() != isActive {
if isActive {
Expand Down
20 changes: 11 additions & 9 deletions pkg/scaling/scale_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,8 +259,8 @@ func (h *scaleHandler) checkScalers(ctx context.Context, scalableObject interfac
return
}

isActive, scaleTo, maxScale := h.isScaledJobActive(ctx, obj)
h.scaleExecutor.RequestJobScale(ctx, obj, isActive, scaleTo, maxScale)
isActive, isError, scaleTo, maxScale := h.isScaledJobActive(ctx, obj)
h.scaleExecutor.RequestJobScale(ctx, obj, isActive, isError, scaleTo, maxScale)
}
}

Expand Down Expand Up @@ -813,15 +813,16 @@ func (*scaleHandler) getScalerState(ctx context.Context, scaler scalers.Scaler,

// getScaledJobMetrics returns metrics for specified metric name for a ScaledJob identified by its name and namespace.
// It could either query the metric value directly from the scaler or from a cache, that's being stored for the scaler.
func (h *scaleHandler) getScaledJobMetrics(ctx context.Context, scaledJob *kedav1alpha1.ScaledJob) []scaledjob.ScalerMetrics {
func (h *scaleHandler) getScaledJobMetrics(ctx context.Context, scaledJob *kedav1alpha1.ScaledJob) ([]scaledjob.ScalerMetrics, bool) {
logger := log.WithValues("scaledJob.Namespace", scaledJob.Namespace, "scaledJob.Name", scaledJob.Name)

cache, err := h.GetScalersCache(ctx, scaledJob)
metricscollector.RecordScaledJobError(scaledJob.Namespace, scaledJob.Name, err)
if err != nil {
log.Error(err, "error getting scalers cache", "scaledJob.Namespace", scaledJob.Namespace, "scaledJob.Name", scaledJob.Name)
return nil
return nil, true
}
var isError bool
var scalersMetrics []scaledjob.ScalerMetrics
scalers, scalerConfigs := cache.GetScalers()
for scalerIndex, scaler := range scalers {
Expand Down Expand Up @@ -849,8 +850,9 @@ func (h *scaleHandler) getScaledJobMetrics(ctx context.Context, scaledJob *kedav
metricscollector.RecordScalerLatency(scaledJob.Namespace, scaledJob.Name, scalerName, scalerIndex, metricName, false, latency)
}
if err != nil {
scalerLogger.V(1).Info("Error getting scaler metrics and activity, but continue", "error", err)
scalerLogger.Error(err, "Error getting scaler metrics and activity, but continue")
cache.Recorder.Event(scaledJob, corev1.EventTypeWarning, eventreason.KEDAScalerFailed, err.Error())
isError = true
continue
}
if isTriggerActive {
Expand Down Expand Up @@ -883,21 +885,21 @@ func (h *scaleHandler) getScaledJobMetrics(ctx context.Context, scaledJob *kedav
metricscollector.RecordScalerActive(scaledJob.Namespace, scaledJob.Name, scalerName, scalerIndex, metricName, false, isTriggerActive)
}
}
return scalersMetrics
return scalersMetrics, isError
}

// isScaledJobActive returns whether the input ScaledJob:
// is active as the first return value,
// the second and the third return values indicate queueLength and maxValue for scale
func (h *scaleHandler) isScaledJobActive(ctx context.Context, scaledJob *kedav1alpha1.ScaledJob) (bool, int64, int64) {
func (h *scaleHandler) isScaledJobActive(ctx context.Context, scaledJob *kedav1alpha1.ScaledJob) (bool, bool, int64, int64) {
logger := logf.Log.WithName("scalemetrics")

scalersMetrics := h.getScaledJobMetrics(ctx, scaledJob)
scalersMetrics, isError := h.getScaledJobMetrics(ctx, scaledJob)
isActive, queueLength, maxValue, maxFloatValue :=
scaledjob.IsScaledJobActive(scalersMetrics, scaledJob.Spec.ScalingStrategy.MultipleScalersCalculation, scaledJob.MinReplicaCount(), scaledJob.MaxReplicaCount())

logger.V(1).WithValues("scaledJob.Name", scaledJob.Name).Info("Checking if ScaleJob Scalers are active", "isActive", isActive, "maxValue", maxFloatValue, "MultipleScalersCalculation", scaledJob.Spec.ScalingStrategy.MultipleScalersCalculation)
return isActive, queueLength, maxValue
return isActive, isError, queueLength, maxValue
}

// getTrueMetricArray is a help function made for composite scaler to determine
Expand Down
25 changes: 17 additions & 8 deletions pkg/scaling/scale_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -661,19 +661,21 @@ func TestIsScaledJobActive(t *testing.T) {
scalerCachesLock: &sync.RWMutex{},
scaledObjectsMetricCache: metricscache.NewMetricsCache(),
}
isActive, queueLength, maxValue := sh.isScaledJobActive(context.TODO(), scaledJobSingle)
// nosemgrep: context-todo
isActive, isError, queueLength, maxValue := sh.isScaledJobActive(context.TODO(), scaledJobSingle)
JorTurFer marked this conversation as resolved.
Show resolved Hide resolved
assert.Equal(t, true, isActive)
assert.Equal(t, false, isError)
assert.Equal(t, int64(20), queueLength)
assert.Equal(t, int64(10), maxValue)
scalerCache.Close(context.Background())

// Test the valiation
scalerTestDatam := []scalerTestData{
newScalerTestData("s0-queueLength", 100, "max", 20, 1, true, 10, 2, true, 5, 3, true, 7, 4, false, true, 20, 20),
newScalerTestData("queueLength", 100, "min", 20, 1, true, 10, 2, true, 5, 3, true, 7, 4, false, true, 5, 2),
newScalerTestData("messageCount", 100, "avg", 20, 1, true, 10, 2, true, 5, 3, true, 7, 4, false, true, 12, 9),
newScalerTestData("s3-messageCount", 100, "sum", 20, 1, true, 10, 2, true, 5, 3, true, 7, 4, false, true, 35, 27),
newScalerTestData("s10-messageCount", 25, "sum", 20, 1, true, 10, 2, true, 5, 3, true, 7, 4, false, true, 35, 25),
newScalerTestData("s0-queueLength", 100, "max", 20, 1, true, 10, 2, true, 5, 3, true, 7, 4, false, true, false, 20, 20),
newScalerTestData("queueLength", 100, "min", 20, 1, true, 10, 2, true, 5, 3, true, 7, 4, false, true, false, 5, 2),
newScalerTestData("messageCount", 100, "avg", 20, 1, true, 10, 2, true, 5, 3, true, 7, 4, false, true, false, 12, 9),
newScalerTestData("s3-messageCount", 100, "sum", 20, 1, true, 10, 2, true, 5, 3, true, 7, 4, false, true, false, 35, 27),
newScalerTestData("s10-messageCount", 25, "sum", 20, 1, true, 10, 2, true, 5, 3, true, 7, 4, false, true, false, 35, 25),
}

for index, scalerTestData := range scalerTestDatam {
Expand Down Expand Up @@ -717,9 +719,11 @@ func TestIsScaledJobActive(t *testing.T) {
scaledObjectsMetricCache: metricscache.NewMetricsCache(),
}
fmt.Printf("index: %d", index)
isActive, queueLength, maxValue = sh.isScaledJobActive(context.TODO(), scaledJob)
// nosemgrep: context-todo
isActive, isError, queueLength, maxValue = sh.isScaledJobActive(context.TODO(), scaledJob)
zroubalik marked this conversation as resolved.
Show resolved Hide resolved
// assert.Equal(t, 5, index)
assert.Equal(t, scalerTestData.ResultIsActive, isActive)
assert.Equal(t, scalerTestData.ResultIsError, isError)
assert.Equal(t, scalerTestData.ResultQueueLength, queueLength)
assert.Equal(t, scalerTestData.ResultMaxValue, maxValue)
scalerCache.Close(context.Background())
Expand Down Expand Up @@ -757,8 +761,10 @@ func TestIsScaledJobActiveIfQueueEmptyButMinReplicaCountGreaterZero(t *testing.T
scaledObjectsMetricCache: metricscache.NewMetricsCache(),
}

isActive, queueLength, maxValue := sh.isScaledJobActive(context.TODO(), scaledJobSingle)
// nosemgrep: context-todo
isActive, isError, queueLength, maxValue := sh.isScaledJobActive(context.TODO(), scaledJobSingle)
zroubalik marked this conversation as resolved.
Show resolved Hide resolved
assert.Equal(t, true, isActive)
assert.Equal(t, false, isError)
assert.Equal(t, int64(0), queueLength)
assert.Equal(t, int64(0), maxValue)
scalerCache.Close(context.Background())
Expand All @@ -781,6 +787,7 @@ func newScalerTestData(
scaler4AverageValue int, //nolint:golint,unparam
scaler4IsActive bool, //nolint:golint,unparam
resultIsActive bool, //nolint:golint,unparam
resultIsError bool, //nolint:golint,unparam
resultQueueLength,
resultMaxLength int) scalerTestData {
return scalerTestData{
Expand All @@ -800,6 +807,7 @@ func newScalerTestData(
Scaler4AverageValue: int64(scaler4AverageValue),
Scaler4IsActive: scaler4IsActive,
ResultIsActive: resultIsActive,
ResultIsError: resultIsError,
ResultQueueLength: int64(resultQueueLength),
ResultMaxValue: int64(resultMaxLength),
}
Expand All @@ -822,6 +830,7 @@ type scalerTestData struct {
Scaler4AverageValue int64
Scaler4IsActive bool
ResultIsActive bool
ResultIsError bool
ResultQueueLength int64
ResultMaxValue int64
MinReplicaCount int32
Expand Down
Loading