-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
eventhub-scaler works on new hubs where initially no storage checkpoint exists #798
Conversation
Thank you @christle - it may have been this same issue but I'd even seen issues where checkpoints exist (in that Azure Storage has partition records), but the offset was |
Hi @jeffhollan, as fas as i know, the scaler ignores the offset value, so this pr dont fix this issue. But i have made a similar observation for the offset and have an idea, what's the cause for this and in which case it's happening. It took's me a while to find this out.
I think this should ends in an overflow Exception.The result is, the scaler returns an very high metric value (everytime the same value, no matter how much messages). I dont know why messages, which are routed from the IOT Hub to an EventHub, don't increase the |
pkg/scalers/azure_eventhub_scaler.go
Outdated
@@ -116,15 +117,22 @@ func parseAzureEventHubMetadata(metadata, resolvedEnv map[string]string) (*Event | |||
} | |||
|
|||
//GetUnprocessedEventCountInPartition gets number of unprocessed events in a given partition | |||
func (scaler *AzureEventHubScaler) GetUnprocessedEventCountInPartition(ctx context.Context, partitionID string) (newEventCount int64, err error) { | |||
func (scaler *AzureEventHubScaler) GetUnprocessedEventCountInPartition(ctx context.Context, partitionID string) (newEventCount int64, checkpoint Checkpoint, err error) { | |||
partitionInfo, err := scaler.client.GetPartitionInformation(ctx, partitionID) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the redundant call to GetPartitionInformation for Partition id in GetUnprocessedEventCountInPartition, we can pass the reference of partitionInfo eventhub.HubPartitionRuntimeInformation from IsActive and GetMetrics methods where this information has already been retrieved. Other than that this change will solve the issue of "Until the an event hub processor is scaled to at least 1 manually, the keda-operator will never be able to get the checkpoint data"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok thank you. I can change that. But now there are some merge conflicts. I will try to fix it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in isActive is no partitionInfo, only runtimeInfo but i can add it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok i'm done. I eliminate one call to GetPartitionInformation and fix the merge conflicts.
… exists Signed-off-by: Christian Leinweber <christian.leinweber@maibornwolff.de>
Thanks @christle for your contribution :) |
eventhub-scaler can scale even when no storage checkpoint exists.
To scale up without checkpoint, the first calculation based only on eventhub partition infos. The tricky part was to differentiate between a partition without messages and exactly 1 unprocessed message.
Fixes #797