Skip to content

Commit 1721e84

Browse files
authored
Add update operations for VolumeSize, InstanceType, and BrokerCount (#55)
Issue [#2249](aws-controllers-k8s/community#2249) Description of changes: Also added reconcile before deletion attempt if cluster is not active. Before, if we tried deleting a cluster that was not active, the controller would set it to terminal, and does not reconcile. Another change being added is a field in the Cluster status called ClusterVersion. This field is returned by the sdkFind operation, and is crutial to make updates to VolumeSize, InstanceType, and BrokerCount. By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
1 parent acdc0c7 commit 1721e84

File tree

13 files changed

+279
-23
lines changed

13 files changed

+279
-23
lines changed
Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
ack_generate_info:
2-
build_date: "2025-02-20T18:10:11Z"
2+
build_date: "2025-03-04T23:55:37Z"
33
build_hash: a326346bd3a6973254d247c9ab2dc76790c36241
44
go_version: go1.24.0
55
version: v0.43.2
6-
api_directory_checksum: eda989f20dde9f1b4331ffa67dc3b9a5ef0d64e4
6+
api_directory_checksum: 36fbfad1e0bff98a14b120ba292a7f6b4e546fb4
77
api_version: v1alpha1
88
aws_sdk_go_version: v1.32.6
99
generator_config_info:
10-
file_checksum: 5ea49df43c7aef08a9ac8b7171e9f50c3ed82e13
10+
file_checksum: c641b5dd9aa81f1f42655f2afe9fcfb9dc7de696
1111
original_file_name: generator.yaml
1212
last_modification:
1313
reason: API generation

apis/v1alpha1/cluster.go

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

apis/v1alpha1/generator.yaml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@ resources:
2323
CreateCluster:
2424
input_fields:
2525
ClusterName: Name
26+
DescribeCluster:
27+
input_fields:
28+
ClusterName: Name
2629
hooks:
2730
sdk_read_one_post_set_output:
2831
template_path: hooks/cluster/sdk_read_one_post_set_output.go.tpl
@@ -95,6 +98,11 @@ resources:
9598
BootstrapBrokerStringVpcConnectivityTls:
9699
type: string
97100
is_read_only: true
101+
CurrentVersion:
102+
from:
103+
operation: DescribeCluster
104+
path: ClusterInfo.CurrentVersion
105+
is_read_only: true
98106
tags:
99107
# TODO(jaypipes): Ignore tags for now... we will add support later
100108
ignore: true

apis/v1alpha1/zz_generated.deepcopy.go

Lines changed: 5 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

config/crd/bases/kafka.services.k8s.aws_clusters.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -356,6 +356,9 @@ spec:
356356
- type
357357
type: object
358358
type: array
359+
currentVersion:
360+
description: The current version of the MSK cluster.
361+
type: string
359362
state:
360363
description: |-
361364
The state of the cluster. The possible states are ACTIVE, CREATING, DELETING,

generator.yaml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@ resources:
2323
CreateCluster:
2424
input_fields:
2525
ClusterName: Name
26+
DescribeCluster:
27+
input_fields:
28+
ClusterName: Name
2629
hooks:
2730
sdk_read_one_post_set_output:
2831
template_path: hooks/cluster/sdk_read_one_post_set_output.go.tpl
@@ -95,6 +98,11 @@ resources:
9598
BootstrapBrokerStringVpcConnectivityTls:
9699
type: string
97100
is_read_only: true
101+
CurrentVersion:
102+
from:
103+
operation: DescribeCluster
104+
path: ClusterInfo.CurrentVersion
105+
is_read_only: true
98106
tags:
99107
# TODO(jaypipes): Ignore tags for now... we will add support later
100108
ignore: true

helm/crds/kafka.services.k8s.aws_clusters.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -356,6 +356,9 @@ spec:
356356
- type
357357
type: object
358358
type: array
359+
currentVersion:
360+
description: The current version of the MSK cluster.
361+
type: string
359362
state:
360363
description: |-
361364
The state of the cluster. The possible states are ACTIVE, CREATING, DELETING,

pkg/resource/cluster/hooks.go

Lines changed: 153 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,12 @@ import (
1818
"errors"
1919
"fmt"
2020
"strings"
21+
"time"
2122

2223
svcapitypes "github.com/aws-controllers-k8s/kafka-controller/apis/v1alpha1"
2324
ackcompare "github.com/aws-controllers-k8s/runtime/pkg/compare"
2425
ackcondition "github.com/aws-controllers-k8s/runtime/pkg/condition"
26+
ackerr "github.com/aws-controllers-k8s/runtime/pkg/errors"
2527
ackrequeue "github.com/aws-controllers-k8s/runtime/pkg/requeue"
2628
ackrtlog "github.com/aws-controllers-k8s/runtime/pkg/runtime/log"
2729
ackutil "github.com/aws-controllers-k8s/runtime/pkg/util"
@@ -38,6 +40,7 @@ var (
3840
string(svcsdktypes.ClusterStateDeleting),
3941
string(svcsdktypes.ClusterStateFailed),
4042
}
43+
RequeueAfterUpdateDuration = 15 * time.Second
4144
)
4245

4346
var (
@@ -113,6 +116,18 @@ func clusterDeleting(r *resource) bool {
113116
return cs == strings.ToLower(string(svcsdktypes.ClusterStateDeleting))
114117
}
115118

119+
// requeueAfterAsyncUpdate returns a `ackrequeue.RequeueNeededAfter` struct
120+
// explaining the cluster cannot be modified until after the asynchronous update
121+
// has (first, started and then) completed and the cluster reaches an active
122+
// status.
123+
func requeueAfterAsyncUpdate() *ackrequeue.RequeueNeededAfter {
124+
return ackrequeue.NeededAfter(
125+
fmt.Errorf("cluster has started asynchronously updating, cannot be modified until '%s'",
126+
"Active"),
127+
RequeueAfterUpdateDuration,
128+
)
129+
}
130+
116131
func (rm *resourceManager) customUpdate(
117132
ctx context.Context,
118133
desired *resource,
@@ -133,12 +148,6 @@ func (rm *resourceManager) customUpdate(
133148
// Copy status from latest since it has the current cluster state
134149
updatedRes.ko.Status = latest.ko.Status
135150

136-
if clusterDeleting(latest) {
137-
msg := "Cluster is currently being deleted"
138-
ackcondition.SetSynced(updatedRes, corev1.ConditionFalse, &msg, nil)
139-
return updatedRes, requeueWaitWhileDeleting
140-
}
141-
142151
if !clusterActive(latest) {
143152
msg := "Cluster is in '" + *latest.ko.Status.State + "' state"
144153
ackcondition.SetSynced(updatedRes, corev1.ConditionFalse, &msg, nil)
@@ -149,16 +158,120 @@ func (rm *resourceManager) customUpdate(
149158
return updatedRes, requeueWaitUntilCanModify(latest)
150159
}
151160

152-
if delta.DifferentAt("Spec.AssociatedSCRAMSecrets") {
161+
switch {
162+
case delta.DifferentAt("Spec.ClientAuthentication"):
163+
input := &svcsdk.UpdateSecurityInput{}
164+
if desired.ko.Status.CurrentVersion != nil {
165+
input.CurrentVersion = desired.ko.Status.CurrentVersion
166+
}
167+
if desired.ko.Status.ACKResourceMetadata.ARN != nil {
168+
input.ClusterArn = (*string)(desired.ko.Status.ACKResourceMetadata.ARN)
169+
}
170+
if desired.ko.Spec.ClientAuthentication != nil {
171+
f0 := &svcsdktypes.ClientAuthentication{}
172+
if desired.ko.Spec.ClientAuthentication.SASL != nil {
173+
f0f0 := &svcsdktypes.Sasl{}
174+
if desired.ko.Spec.ClientAuthentication.SASL.IAM != nil &&
175+
desired.ko.Spec.ClientAuthentication.SASL.IAM.Enabled != nil {
176+
f0f0f0 := &svcsdktypes.Iam{
177+
Enabled: desired.ko.Spec.ClientAuthentication.SASL.IAM.Enabled,
178+
}
179+
f0f0.Iam = f0f0f0
180+
}
181+
if desired.ko.Spec.ClientAuthentication.SASL.SCRAM != nil &&
182+
desired.ko.Spec.ClientAuthentication.SASL.SCRAM.Enabled != nil {
183+
f0f0f1 := &svcsdktypes.Scram{
184+
Enabled: desired.ko.Spec.ClientAuthentication.SASL.SCRAM.Enabled,
185+
}
186+
f0f0.Scram = f0f0f1
187+
}
188+
f0.Sasl = f0f0
189+
}
190+
if desired.ko.Spec.ClientAuthentication.TLS != nil {
191+
f0f1 := &svcsdktypes.Tls{}
192+
if desired.ko.Spec.ClientAuthentication.TLS.CertificateAuthorityARNList != nil {
193+
f0f1.CertificateAuthorityArnList = aws.ToStringSlice(desired.ko.Spec.ClientAuthentication.TLS.CertificateAuthorityARNList)
194+
}
195+
if desired.ko.Spec.ClientAuthentication.TLS.Enabled != nil {
196+
f0f1.Enabled = desired.ko.Spec.ClientAuthentication.TLS.Enabled
197+
}
198+
f0.Tls = f0f1
199+
}
200+
if desired.ko.Spec.ClientAuthentication.Unauthenticated != nil &&
201+
desired.ko.Spec.ClientAuthentication.Unauthenticated.Enabled != nil {
202+
f0.Unauthenticated = &svcsdktypes.Unauthenticated{
203+
Enabled: desired.ko.Spec.ClientAuthentication.Unauthenticated.Enabled,
204+
}
205+
}
206+
input.ClientAuthentication = f0
207+
}
208+
209+
_, err = rm.sdkapi.UpdateSecurity(ctx, input)
210+
rm.metrics.RecordAPICall("UPDATE", "UpdateSecurity", err)
211+
if err != nil {
212+
return nil, err
213+
}
214+
ackcondition.SetSynced(updatedRes, corev1.ConditionFalse, nil, nil)
215+
err = requeueAfterAsyncUpdate()
216+
217+
case delta.DifferentAt("Spec.AssociatedSCRAMSecrets"):
153218
err = rm.syncAssociatedScramSecrets(ctx, updatedRes, latest)
154219
if err != nil {
155220
return nil, err
156221
}
222+
// Set synced condition to True after successful update
223+
ackcondition.SetSynced(updatedRes, corev1.ConditionFalse, nil, nil)
224+
225+
case delta.DifferentAt("Spec.BrokerNodeGroupInfo.StorageInfo.EBSStorageInfo.VolumeSize"):
226+
_, err := rm.sdkapi.UpdateBrokerStorage(ctx, &svcsdk.UpdateBrokerStorageInput{
227+
ClusterArn: (*string)(latest.ko.Status.ACKResourceMetadata.ARN),
228+
CurrentVersion: latest.ko.Status.CurrentVersion,
229+
TargetBrokerEBSVolumeInfo: []svcsdktypes.BrokerEBSVolumeInfo{
230+
{
231+
KafkaBrokerNodeId: aws.String("ALL"),
232+
VolumeSizeGB: aws.Int32(int32(*desired.ko.Spec.BrokerNodeGroupInfo.StorageInfo.EBSStorageInfo.VolumeSize)),
233+
},
234+
},
235+
})
236+
rm.metrics.RecordAPICall("UPDATE", "UpdateBrokerStorage", err)
237+
if err != nil {
238+
return nil, err
239+
}
240+
message := fmt.Sprintf("kafka is updating broker storage")
241+
ackcondition.SetSynced(updatedRes, corev1.ConditionFalse, &message, nil)
242+
err = requeueAfterAsyncUpdate()
243+
244+
case delta.DifferentAt("Spec.BrokerNodeGroupInfo.InstanceType"):
245+
_, err := rm.sdkapi.UpdateBrokerType(ctx, &svcsdk.UpdateBrokerTypeInput{
246+
ClusterArn: (*string)(latest.ko.Status.ACKResourceMetadata.ARN),
247+
CurrentVersion: latest.ko.Status.CurrentVersion,
248+
TargetInstanceType: desired.ko.Spec.BrokerNodeGroupInfo.InstanceType,
249+
})
250+
rm.metrics.RecordAPICall("UPDATE", "UpdateBrokerType", err)
251+
if err != nil {
252+
return nil, err
253+
}
254+
message := fmt.Sprintf("kafka is updating broker instanceType")
255+
ackcondition.SetSynced(updatedRes, corev1.ConditionFalse, &message, nil)
256+
err = requeueAfterAsyncUpdate()
257+
258+
case delta.DifferentAt("Spec.NumberOfBrokerNodes"):
259+
_, err := rm.sdkapi.UpdateBrokerCount(ctx, &svcsdk.UpdateBrokerCountInput{
260+
ClusterArn: (*string)(latest.ko.Status.ACKResourceMetadata.ARN),
261+
CurrentVersion: latest.ko.Status.CurrentVersion,
262+
TargetNumberOfBrokerNodes: aws.Int32(int32(*desired.ko.Spec.NumberOfBrokerNodes)),
263+
})
264+
rm.metrics.RecordAPICall("UPDATE", "UpdateBrokerCount", err)
265+
if err != nil {
266+
return nil, err
267+
}
268+
message := fmt.Sprintf("kafka is updating broker instanceType")
269+
ackcondition.SetSynced(updatedRes, corev1.ConditionFalse, &message, nil)
270+
err = requeueAfterAsyncUpdate()
271+
157272
}
158273

159-
// Set synced condition to True after successful update
160-
ackcondition.SetSynced(updatedRes, corev1.ConditionTrue, nil, nil)
161-
return updatedRes, nil
274+
return updatedRes, err
162275
}
163276

164277
// syncAssociatedScramSecrets examines the Secret ARNs in the supplied Cluster
@@ -242,6 +355,16 @@ func (rm *resourceManager) getAssociatedScramSecrets(
242355
return res, err
243356
}
244357

358+
type unprocessedSecret struct {
359+
errorCode string
360+
errorMessage string
361+
secretArn string
362+
}
363+
364+
func (us unprocessedSecret) String() string {
365+
return fmt.Sprintf("ErrorCode: %s, ErrorMessage %s, SecretArn: %s", us.errorCode, us.errorMessage, us.secretArn)
366+
}
367+
245368
// batchAssociateScramSecret associates the supplied scram secrets to the supplied Cluster
246369
// resource
247370
func (rm *resourceManager) batchAssociateScramSecret(
@@ -255,14 +378,27 @@ func (rm *resourceManager) batchAssociateScramSecret(
255378

256379
input := &svcsdk.BatchAssociateScramSecretInput{}
257380
input.ClusterArn = (*string)(r.ko.Status.ACKResourceMetadata.ARN)
258-
// Convert []*string to []string
259-
unrefSecrets := make([]string, len(secretARNs))
260-
for i, s := range secretARNs {
261-
unrefSecrets[i] = *s
262-
}
263-
input.SecretArnList = unrefSecrets
264-
_, err = rm.sdkapi.BatchAssociateScramSecret(ctx, input)
381+
input.SecretArnList = aws.ToStringSlice(secretARNs)
382+
resp, err := rm.sdkapi.BatchAssociateScramSecret(ctx, input)
265383
rm.metrics.RecordAPICall("UPDATE", "BatchAssociateScramSecret", err)
384+
if err != nil {
385+
return err
386+
}
387+
388+
if len(resp.UnprocessedScramSecrets) > 0 {
389+
unprocessedSecrets := []unprocessedSecret{}
390+
for _, uss := range resp.UnprocessedScramSecrets {
391+
us := unprocessedSecret{
392+
errorCode: aws.ToString(uss.ErrorCode),
393+
errorMessage: aws.ToString(uss.ErrorMessage),
394+
secretArn: aws.ToString(uss.SecretArn),
395+
}
396+
unprocessedSecrets = append(unprocessedSecrets, us)
397+
}
398+
399+
return ackerr.NewTerminalError(fmt.Errorf("Cant attach secret arns: %v", unprocessedSecrets))
400+
}
401+
266402
return err
267403
}
268404

pkg/resource/cluster/sdk.go

Lines changed: 25 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

templates/hooks/cluster/sdk_delete_pre_build_request.go.tpl

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,10 @@
99
if err := rm.syncAssociatedScramSecrets(ctx, &resource{ko: groupCpy}, r); err != nil {
1010
return nil, err
1111
}
12+
if !clusterActive(r) {
13+
// doing this to avoid BadRequestException
14+
return r, ackrequeue.NeededAfter(
15+
fmt.Errorf("waiting for cluster to be active before deletion"),
16+
ackrequeue.DefaultRequeueAfterDuration,
17+
)
18+
}

0 commit comments

Comments
 (0)