Skip to content

Address PR comments for Support updates #56

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 5, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
311 changes: 197 additions & 114 deletions pkg/resource/cluster/hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,118 +160,175 @@ func (rm *resourceManager) customUpdate(

switch {
case delta.DifferentAt("Spec.ClientAuthentication"):
input := &svcsdk.UpdateSecurityInput{}
if desired.ko.Status.CurrentVersion != nil {
input.CurrentVersion = desired.ko.Status.CurrentVersion
}
if desired.ko.Status.ACKResourceMetadata.ARN != nil {
input.ClusterArn = (*string)(desired.ko.Status.ACKResourceMetadata.ARN)
}
if desired.ko.Spec.ClientAuthentication != nil {
f0 := &svcsdktypes.ClientAuthentication{}
if desired.ko.Spec.ClientAuthentication.SASL != nil {
f0f0 := &svcsdktypes.Sasl{}
if desired.ko.Spec.ClientAuthentication.SASL.IAM != nil &&
desired.ko.Spec.ClientAuthentication.SASL.IAM.Enabled != nil {
f0f0f0 := &svcsdktypes.Iam{
Enabled: desired.ko.Spec.ClientAuthentication.SASL.IAM.Enabled,
}
f0f0.Iam = f0f0f0
}
if desired.ko.Spec.ClientAuthentication.SASL.SCRAM != nil &&
desired.ko.Spec.ClientAuthentication.SASL.SCRAM.Enabled != nil {
f0f0f1 := &svcsdktypes.Scram{
Enabled: desired.ko.Spec.ClientAuthentication.SASL.SCRAM.Enabled,
}
f0f0.Scram = f0f0f1
}
f0.Sasl = f0f0
}
if desired.ko.Spec.ClientAuthentication.TLS != nil {
f0f1 := &svcsdktypes.Tls{}
if desired.ko.Spec.ClientAuthentication.TLS.CertificateAuthorityARNList != nil {
f0f1.CertificateAuthorityArnList = aws.ToStringSlice(desired.ko.Spec.ClientAuthentication.TLS.CertificateAuthorityARNList)
}
if desired.ko.Spec.ClientAuthentication.TLS.Enabled != nil {
f0f1.Enabled = desired.ko.Spec.ClientAuthentication.TLS.Enabled
}
f0.Tls = f0f1
}
if desired.ko.Spec.ClientAuthentication.Unauthenticated != nil &&
desired.ko.Spec.ClientAuthentication.Unauthenticated.Enabled != nil {
f0.Unauthenticated = &svcsdktypes.Unauthenticated{
Enabled: desired.ko.Spec.ClientAuthentication.Unauthenticated.Enabled,
}
}
input.ClientAuthentication = f0
}

_, err = rm.sdkapi.UpdateSecurity(ctx, input)
rm.metrics.RecordAPICall("UPDATE", "UpdateSecurity", err)
if err != nil {
return nil, err
}
ackcondition.SetSynced(updatedRes, corev1.ConditionFalse, nil, nil)
err = requeueAfterAsyncUpdate()

return rm.updateClientAuthentication(ctx, updatedRes, latest)
case delta.DifferentAt("Spec.AssociatedSCRAMSecrets"):
err = rm.syncAssociatedScramSecrets(ctx, updatedRes, latest)
if err != nil {
return nil, err
return latest, err
}
// Set synced condition to True after successful update
ackcondition.SetSynced(updatedRes, corev1.ConditionFalse, nil, nil)
return updatedRes, requeueAfterAsyncUpdate()

case delta.DifferentAt("Spec.BrokerNodeGroupInfo.StorageInfo.EBSStorageInfo.VolumeSize"):
_, err := rm.sdkapi.UpdateBrokerStorage(ctx, &svcsdk.UpdateBrokerStorageInput{
ClusterArn: (*string)(latest.ko.Status.ACKResourceMetadata.ARN),
CurrentVersion: latest.ko.Status.CurrentVersion,
TargetBrokerEBSVolumeInfo: []svcsdktypes.BrokerEBSVolumeInfo{
{
KafkaBrokerNodeId: aws.String("ALL"),
VolumeSizeGB: aws.Int32(int32(*desired.ko.Spec.BrokerNodeGroupInfo.StorageInfo.EBSStorageInfo.VolumeSize)),
},
},
})
rm.metrics.RecordAPICall("UPDATE", "UpdateBrokerStorage", err)
if err != nil {
return nil, err
}
message := fmt.Sprintf("kafka is updating broker storage")
ackcondition.SetSynced(updatedRes, corev1.ConditionFalse, &message, nil)
err = requeueAfterAsyncUpdate()
return rm.updateBrokerStorage(ctx, updatedRes, latest)

case delta.DifferentAt("Spec.BrokerNodeGroupInfo.InstanceType"):
_, err := rm.sdkapi.UpdateBrokerType(ctx, &svcsdk.UpdateBrokerTypeInput{
ClusterArn: (*string)(latest.ko.Status.ACKResourceMetadata.ARN),
CurrentVersion: latest.ko.Status.CurrentVersion,
TargetInstanceType: desired.ko.Spec.BrokerNodeGroupInfo.InstanceType,
})
rm.metrics.RecordAPICall("UPDATE", "UpdateBrokerType", err)
if err != nil {
return nil, err
}
message := fmt.Sprintf("kafka is updating broker instanceType")
ackcondition.SetSynced(updatedRes, corev1.ConditionFalse, &message, nil)
err = requeueAfterAsyncUpdate()
return rm.updateBrokerType(ctx, desired, latest)

case delta.DifferentAt("Spec.NumberOfBrokerNodes"):
_, err := rm.sdkapi.UpdateBrokerCount(ctx, &svcsdk.UpdateBrokerCountInput{
ClusterArn: (*string)(latest.ko.Status.ACKResourceMetadata.ARN),
CurrentVersion: latest.ko.Status.CurrentVersion,
TargetNumberOfBrokerNodes: aws.Int32(int32(*desired.ko.Spec.NumberOfBrokerNodes)),
})
rm.metrics.RecordAPICall("UPDATE", "UpdateBrokerCount", err)
if err != nil {
return nil, err
}
message := fmt.Sprintf("kafka is updating broker instanceType")
ackcondition.SetSynced(updatedRes, corev1.ConditionFalse, &message, nil)
err = requeueAfterAsyncUpdate()
return rm.updateNumberOfBrokerNodes(ctx, desired, latest)
}

return updatedRes, nil
}

// updateNumberOfBrokerNodes updates the number of broker
// nodes for the kafka cluster
func (rm *resourceManager) updateNumberOfBrokerNodes(
ctx context.Context,
desired *resource,
latest *resource,
) (updatedRes *resource, err error) {
rlog := ackrtlog.FromContext(ctx)
exit := rlog.Trace("rm.updateNumberOfBrokerNodes")
defer func() { exit(err) }()

_, err = rm.sdkapi.UpdateBrokerCount(ctx, &svcsdk.UpdateBrokerCountInput{
ClusterArn: (*string)(latest.ko.Status.ACKResourceMetadata.ARN),
CurrentVersion: latest.ko.Status.CurrentVersion,
TargetNumberOfBrokerNodes: aws.Int32(int32(*desired.ko.Spec.NumberOfBrokerNodes)),
})
rm.metrics.RecordAPICall("UPDATE", "UpdateBrokerCount", err)
if err != nil {
return latest, err
}
message := fmt.Sprintf("kafka is updating broker number of broker nodes")
ackcondition.SetSynced(updatedRes, corev1.ConditionFalse, &message, nil)

return desired, requeueAfterAsyncUpdate()
}


// updateBrokerType updates the broker type of the
// kafka cluster
func (rm *resourceManager) updateBrokerType(
ctx context.Context,
desired *resource,
latest *resource,
) (updatedRes *resource, err error) {
rlog := ackrtlog.FromContext(ctx)
exit := rlog.Trace("rm.updateBrokerType")
defer func() { exit(err) }()
_, err = rm.sdkapi.UpdateBrokerType(ctx, &svcsdk.UpdateBrokerTypeInput{
ClusterArn: (*string)(latest.ko.Status.ACKResourceMetadata.ARN),
CurrentVersion: latest.ko.Status.CurrentVersion,
TargetInstanceType: desired.ko.Spec.BrokerNodeGroupInfo.InstanceType,
})
rm.metrics.RecordAPICall("UPDATE", "UpdateBrokerType", err)
if err != nil {
return nil, err
}
message := fmt.Sprintf("kafka is updating broker instanceType")
ackcondition.SetSynced(updatedRes, corev1.ConditionFalse, &message, nil)

return desired, requeueAfterAsyncUpdate()
}

// updateBrokerStorate updates the volumeSize of the
// kafka cluster broker storage
func (rm *resourceManager) updateBrokerStorage(
ctx context.Context,
desired *resource,
latest *resource,
) (updatedRes *resource, err error) {
rlog := ackrtlog.FromContext(ctx)
exit := rlog.Trace("rm.updateBrokerStorage")
defer func() { exit(err) }()

_, err = rm.sdkapi.UpdateBrokerStorage(ctx, &svcsdk.UpdateBrokerStorageInput{
ClusterArn: (*string)(latest.ko.Status.ACKResourceMetadata.ARN),
CurrentVersion: latest.ko.Status.CurrentVersion,
TargetBrokerEBSVolumeInfo: []svcsdktypes.BrokerEBSVolumeInfo{
{
KafkaBrokerNodeId: aws.String("ALL"),
VolumeSizeGB: aws.Int32(int32(*desired.ko.Spec.BrokerNodeGroupInfo.StorageInfo.EBSStorageInfo.VolumeSize)),
},
},
})
rm.metrics.RecordAPICall("UPDATE", "UpdateBrokerStorage", err)
if err != nil {
return nil, err
}
message := fmt.Sprintf("kafka is updating broker storage")
ackcondition.SetSynced(updatedRes, corev1.ConditionFalse, &message, nil)
return desired, requeueAfterAsyncUpdate()
}

// updateClientAuthentication updates the kafka cluster
// authentication settings
func (rm *resourceManager) updateClientAuthentication(
ctx context.Context,
desired *resource,
latest *resource,
) (updatedRes *resource, err error) {
rlog := ackrtlog.FromContext(ctx)
exit := rlog.Trace("rm.updateClientAuthentication")
defer func() { exit(err) }()

return updatedRes, err
input := &svcsdk.UpdateSecurityInput{}
if latest.ko.Status.CurrentVersion != nil {
input.CurrentVersion = desired.ko.Status.CurrentVersion
}
if latest.ko.Status.ACKResourceMetadata.ARN != nil {
input.ClusterArn = (*string)(desired.ko.Status.ACKResourceMetadata.ARN)
}
if desired.ko.Spec.ClientAuthentication != nil {
f0 := &svcsdktypes.ClientAuthentication{}
if desired.ko.Spec.ClientAuthentication.SASL != nil {
f0f0 := &svcsdktypes.Sasl{}
if desired.ko.Spec.ClientAuthentication.SASL.IAM != nil &&
desired.ko.Spec.ClientAuthentication.SASL.IAM.Enabled != nil {
f0f0f0 := &svcsdktypes.Iam{
Enabled: desired.ko.Spec.ClientAuthentication.SASL.IAM.Enabled,
}
f0f0.Iam = f0f0f0
}
if desired.ko.Spec.ClientAuthentication.SASL.SCRAM != nil &&
desired.ko.Spec.ClientAuthentication.SASL.SCRAM.Enabled != nil {
f0f0f1 := &svcsdktypes.Scram{
Enabled: desired.ko.Spec.ClientAuthentication.SASL.SCRAM.Enabled,
}
f0f0.Scram = f0f0f1
}
f0.Sasl = f0f0
}
if desired.ko.Spec.ClientAuthentication.TLS != nil {
f0f1 := &svcsdktypes.Tls{}
if desired.ko.Spec.ClientAuthentication.TLS.CertificateAuthorityARNList != nil {
f0f1.CertificateAuthorityArnList = aws.ToStringSlice(desired.ko.Spec.ClientAuthentication.TLS.CertificateAuthorityARNList)
}
if desired.ko.Spec.ClientAuthentication.TLS.Enabled != nil {
f0f1.Enabled = desired.ko.Spec.ClientAuthentication.TLS.Enabled
}
f0.Tls = f0f1
}
if desired.ko.Spec.ClientAuthentication.Unauthenticated != nil &&
desired.ko.Spec.ClientAuthentication.Unauthenticated.Enabled != nil {
f0.Unauthenticated = &svcsdktypes.Unauthenticated{
Enabled: desired.ko.Spec.ClientAuthentication.Unauthenticated.Enabled,
}
}
input.ClientAuthentication = f0
}

_, err = rm.sdkapi.UpdateSecurity(ctx, input)
rm.metrics.RecordAPICall("UPDATE", "UpdateSecurity", err)
if err != nil {
return nil, err
}
message := "kafka is updating the client authentication"
ackcondition.SetSynced(desired, corev1.ConditionFalse, &message, nil)

return desired, err
}

// syncAssociatedScramSecrets examines the Secret ARNs in the supplied Cluster
Expand Down Expand Up @@ -318,6 +375,9 @@ func (rm *resourceManager) syncAssociatedScramSecrets(
}
}

// Set synced condition to True after successful update
ackcondition.SetSynced(desired, corev1.ConditionFalse, nil, nil)

return nil
}

Expand Down Expand Up @@ -355,14 +415,28 @@ func (rm *resourceManager) getAssociatedScramSecrets(
return res, err
}

type unprocessedSecret struct {
errorCode string
errorMessage string
secretArn string
// unprocessedSecrets is an error returned by the
// BatchAssociateScramSecret or Disassociate. It represents the
// secretArns that could not be associated and the reason
type unprocessedSecrets struct {
error
errorCodes []string
errorMessages []string
secretArns []string
}

func (us unprocessedSecret) String() string {
return fmt.Sprintf("ErrorCode: %s, ErrorMessage %s, SecretArn: %s", us.errorCode, us.errorMessage, us.secretArn)
// Error implementation of unprocessedSecrets loops over the errorCodes
// errorMessages, and failedSecretArns
func (us unprocessedSecrets) Error() string {
// I don't see a case where the lengths will differ
// getting the minimum just in case, so we can avoid
// an index out of bounds
lenErrs := min(len(us.errorCodes), len(us.errorMessages), len(us.secretArns))
errorMessage := ""
for i := range lenErrs {
errorMessage += fmt.Sprintf("ErrorCode: %s, ErrorMessage %s, SecretArn: %s\n", us.errorCodes[i], us.errorMessages[i], us.secretArns[i])
}
return errorMessage
}

// batchAssociateScramSecret associates the supplied scram secrets to the supplied Cluster
Expand All @@ -386,17 +460,14 @@ func (rm *resourceManager) batchAssociateScramSecret(
}

if len(resp.UnprocessedScramSecrets) > 0 {
unprocessedSecrets := []unprocessedSecret{}
unprocessedSecrets := unprocessedSecrets{}
for _, uss := range resp.UnprocessedScramSecrets {
us := unprocessedSecret{
errorCode: aws.ToString(uss.ErrorCode),
errorMessage: aws.ToString(uss.ErrorMessage),
secretArn: aws.ToString(uss.SecretArn),
}
unprocessedSecrets = append(unprocessedSecrets, us)
unprocessedSecrets.errorCodes = append(unprocessedSecrets.errorCodes, aws.ToString(uss.ErrorCode))
unprocessedSecrets.errorMessages = append(unprocessedSecrets.errorCodes, aws.ToString(uss.ErrorMessage))
unprocessedSecrets.secretArns = append(unprocessedSecrets.errorCodes, aws.ToString(uss.SecretArn))
}

return ackerr.NewTerminalError(fmt.Errorf("Cant attach secret arns: %v", unprocessedSecrets))
return ackerr.NewTerminalError(unprocessedSecrets)
}

return err
Expand All @@ -421,8 +492,20 @@ func (rm *resourceManager) batchDisassociateScramSecret(
unrefSecrets[i] = *s
}
input.SecretArnList = unrefSecrets
_, err = rm.sdkapi.BatchDisassociateScramSecret(ctx, input)
resp, err := rm.sdkapi.BatchDisassociateScramSecret(ctx, input)
rm.metrics.RecordAPICall("UPDATE", "BatchDisassociateScramSecret", err)

if len(resp.UnprocessedScramSecrets) > 0 {
unprocessedSecrets := unprocessedSecrets{}
for _, uss := range resp.UnprocessedScramSecrets {
unprocessedSecrets.errorCodes = append(unprocessedSecrets.errorCodes, aws.ToString(uss.ErrorCode))
unprocessedSecrets.errorMessages = append(unprocessedSecrets.errorCodes, aws.ToString(uss.ErrorMessage))
unprocessedSecrets.secretArns = append(unprocessedSecrets.errorCodes, aws.ToString(uss.SecretArn))
}

return ackerr.NewTerminalError(unprocessedSecrets)
}

return err
}

Expand Down