Skip to content

Commit

Permalink
Set consumer and consumergroups finalizers when creating them (#3355)
Browse files Browse the repository at this point in the history
* Set consumer and consumergroups finalizers when creating them (#823)

It is possible that a delete consumer or consumergroup might
be reconciled and never finalized when it is deleted before
the finalizer is set.
This happens because the Knative generated reconciler uses
patch (as opposed to using update) for setting the finalizer
and patch doesn't have any optimistic concurrency controls.

Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com>
Co-authored-by: Pierangelo Di Pilato <pierdipi@redhat.com>
Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com>

* Fix unit tests

Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com>

---------

Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com>
  • Loading branch information
pierDipi authored Sep 27, 2023
1 parent e4a9113 commit d4caf75
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -156,12 +156,13 @@ func (c *ConsumerGroup) GetStatus() *duckv1.Status {
func (cg *ConsumerGroup) ConsumerFromTemplate(options ...ConsumerOption) *Consumer {
// TODO figure out naming strategy, is generateName enough?
c := &Consumer{
ObjectMeta: cg.Spec.Template.ObjectMeta,
Spec: cg.Spec.Template.Spec,
ObjectMeta: *cg.Spec.Template.ObjectMeta.DeepCopy(),
Spec: *cg.Spec.Template.Spec.DeepCopy(),
}

ownerRef := metav1.NewControllerRef(cg, ConsumerGroupGroupVersionKind)
c.OwnerReferences = append(c.OwnerReferences, *ownerRef)
c.Finalizers = []string{"consumers.internal.kafka.eventing.knative.dev"}

for _, opt := range options {
opt(c)
Expand Down
14 changes: 14 additions & 0 deletions control-plane/pkg/reconciler/consumergroup/consumergroup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ func TestReconcileKind(t *testing.T) {
},
WantCreates: []runtime.Object{
NewConsumer(1,
ConsumerFinalizer(),
ConsumerSpec(NewConsumerSpec(
ConsumerTopics("t1", "t2"),
ConsumerConfigs(
Expand All @@ -121,6 +122,7 @@ func TestReconcileKind(t *testing.T) {
)),
),
NewConsumer(2,
ConsumerFinalizer(),
ConsumerSpec(NewConsumerSpec(
ConsumerTopics("t1", "t2"),
ConsumerConfigs(
Expand Down Expand Up @@ -196,6 +198,7 @@ func TestReconcileKind(t *testing.T) {
NewConfigMapWithBinaryData(systemNamespace, "p1", nil, DispatcherPodAsOwnerReference("p1")),
NewConfigMapWithBinaryData(systemNamespace, "p2", nil, DispatcherPodAsOwnerReference("p2")),
NewConsumer(1,
ConsumerFinalizer(),
ConsumerSpec(NewConsumerSpec(
ConsumerTopics("t1", "t2"),
ConsumerConfigs(
Expand All @@ -207,6 +210,7 @@ func TestReconcileKind(t *testing.T) {
)),
),
NewConsumer(2,
ConsumerFinalizer(),
ConsumerSpec(NewConsumerSpec(
ConsumerTopics("t1", "t2"),
ConsumerConfigs(
Expand Down Expand Up @@ -404,6 +408,7 @@ func TestReconcileKind(t *testing.T) {
WantErr: false,
WantCreates: []runtime.Object{
NewConsumer(1,
ConsumerFinalizer(),
ConsumerSpec(NewConsumerSpec(
ConsumerTopics("t1", "t2"),
ConsumerConfigs(
Expand Down Expand Up @@ -527,6 +532,7 @@ func TestReconcileKind(t *testing.T) {
},
WantCreates: []runtime.Object{
NewConsumer(1,
ConsumerFinalizer(),
ConsumerSpec(NewConsumerSpec(
ConsumerTopics("t1", "t2"),
ConsumerConfigs(
Expand Down Expand Up @@ -873,6 +879,7 @@ func TestReconcileKind(t *testing.T) {
},
WantCreates: []runtime.Object{
NewConsumer(1,
ConsumerFinalizer(),
ConsumerSpec(NewConsumerSpec(
ConsumerTopics("t1", "t2"),
ConsumerConfigs(
Expand Down Expand Up @@ -1114,6 +1121,7 @@ func TestReconcileKind(t *testing.T) {
},
WantCreates: []runtime.Object{
NewConsumer(1,
ConsumerFinalizer(),
ConsumerSpec(NewConsumerSpec(
ConsumerTopics("t1", "t2"),
ConsumerConfigs(
Expand Down Expand Up @@ -1199,6 +1207,7 @@ func TestReconcileKind(t *testing.T) {
},
WantCreates: []runtime.Object{
NewConsumer(1,
ConsumerFinalizer(),
ConsumerSpec(NewConsumerSpec(
ConsumerTopics("t1", "t2"),
ConsumerConfigs(
Expand Down Expand Up @@ -1292,6 +1301,7 @@ func TestReconcileKind(t *testing.T) {
},
WantCreates: []runtime.Object{
NewConsumer(1,
ConsumerFinalizer(),
ConsumerSpec(NewConsumerSpec(
ConsumerTopics("t1", "t2"),
ConsumerConfigs(
Expand Down Expand Up @@ -1413,6 +1423,7 @@ func TestReconcileKind(t *testing.T) {
},
WantCreates: []runtime.Object{
NewConsumer(1,
ConsumerFinalizer(),
ConsumerSpec(NewConsumerSpec(
ConsumerTopics("t1", "t2"),
ConsumerConfigs(
Expand Down Expand Up @@ -1531,6 +1542,7 @@ func TestReconcileKind(t *testing.T) {
},
WantCreates: []runtime.Object{
NewConsumer(1,
ConsumerFinalizer(),
ConsumerSpec(NewConsumerSpec(
ConsumerTopics("t1", "t2"),
ConsumerConfigs(
Expand All @@ -1542,6 +1554,7 @@ func TestReconcileKind(t *testing.T) {
)),
),
NewConsumer(2,
ConsumerFinalizer(),
ConsumerSpec(NewConsumerSpec(
ConsumerTopics("t1", "t2"),
ConsumerConfigs(
Expand Down Expand Up @@ -1739,6 +1752,7 @@ func TestReconcileKindNoAutoscaler(t *testing.T) {
},
WantCreates: []runtime.Object{
NewConsumer(1,
ConsumerFinalizer(),
ConsumerSpec(NewConsumerSpec(
ConsumerTopics("t1", "t2"),
ConsumerConfigs(
Expand Down
3 changes: 3 additions & 0 deletions control-plane/pkg/reconciler/source/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,9 @@ func (r Reconciler) reconcileConsumerGroup(ctx context.Context, ks *sources.Kafk
Labels: map[string]string{
internalscg.UserFacingResourceLabelSelector: strings.ToLower(ks.GetGroupVersionKind().Kind),
},
Finalizers: []string{
"consumergroups.internal.kafka.eventing.knative.dev",
},
},
Spec: internalscg.ConsumerGroupSpec{
Replicas: ks.Spec.Consumers,
Expand Down
11 changes: 11 additions & 0 deletions control-plane/pkg/reconciler/source/source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ func TestReconcileKind(t *testing.T) {
Key: testKey,
WantCreates: []runtime.Object{
NewConsumerGroup(
WithConsumerGroupFinalizer(),
WithConsumerGroupName(SourceUUID),
WithConsumerGroupNamespace(SourceNamespace),
WithConsumerGroupOwnerRef(kmeta.NewControllerRef(NewSource())),
Expand Down Expand Up @@ -163,6 +164,7 @@ func TestReconcileKind(t *testing.T) {
Key: testKey,
WantCreates: []runtime.Object{
NewConsumerGroup(
WithConsumerGroupFinalizer(),
WithConsumerGroupName(SourceUUID),
WithConsumerGroupNamespace(SourceNamespace),
WithConsumerGroupOwnerRef(kmeta.NewControllerRef(NewSource())),
Expand Down Expand Up @@ -216,6 +218,7 @@ func TestReconcileKind(t *testing.T) {
Key: testKey,
WantCreates: []runtime.Object{
NewConsumerGroup(
WithConsumerGroupFinalizer(),
WithConsumerGroupName(SourceUUID),
WithConsumerGroupNamespace(SourceNamespace),
WithConsumerGroupOwnerRef(kmeta.NewControllerRef(NewSource())),
Expand Down Expand Up @@ -272,6 +275,7 @@ func TestReconcileKind(t *testing.T) {
Key: testKey,
WantCreates: []runtime.Object{
NewConsumerGroup(
WithConsumerGroupFinalizer(),
WithConsumerGroupName(SourceUUID),
WithConsumerGroupNamespace(SourceNamespace),
WithConsumerGroupOwnerRef(kmeta.NewControllerRef(NewSource())),
Expand Down Expand Up @@ -327,6 +331,7 @@ func TestReconcileKind(t *testing.T) {
Key: testKey,
WantCreates: []runtime.Object{
NewConsumerGroup(
WithConsumerGroupFinalizer(),
WithConsumerGroupName(SourceUUID),
WithConsumerGroupNamespace(SourceNamespace),
WithConsumerGroupOwnerRef(kmeta.NewControllerRef(NewSource())),
Expand Down Expand Up @@ -430,6 +435,7 @@ func TestReconcileKind(t *testing.T) {
Key: testKey,
WantCreates: []runtime.Object{
NewConsumerGroup(
WithConsumerGroupFinalizer(),
WithConsumerGroupName(SourceUUID),
WithConsumerGroupNamespace(SourceNamespace),
WithConsumerGroupOwnerRef(kmeta.NewControllerRef(NewSource())),
Expand Down Expand Up @@ -557,6 +563,7 @@ func TestReconcileKind(t *testing.T) {
Key: testKey,
WantCreates: []runtime.Object{
NewConsumerGroup(
WithConsumerGroupFinalizer(),
WithConsumerGroupName(SourceUUID),
WithConsumerGroupNamespace(SourceNamespace),
WithConsumerGroupOwnerRef(kmeta.NewControllerRef(NewSource())),
Expand Down Expand Up @@ -638,6 +645,7 @@ func TestReconcileKind(t *testing.T) {
Key: testKey,
WantCreates: []runtime.Object{
NewConsumerGroup(
WithConsumerGroupFinalizer(),
WithConsumerGroupName(SourceUUID),
WithConsumerGroupNamespace(SourceNamespace),
WithConsumerGroupOwnerRef(kmeta.NewControllerRef(NewSource())),
Expand Down Expand Up @@ -696,6 +704,7 @@ func TestReconcileKind(t *testing.T) {
Key: testKey,
WantCreates: []runtime.Object{
NewConsumerGroup(
WithConsumerGroupFinalizer(),
WithConsumerGroupName(SourceUUID),
WithConsumerGroupNamespace(SourceNamespace),
WithConsumerGroupOwnerRef(kmeta.NewControllerRef(NewSource())),
Expand Down Expand Up @@ -1389,6 +1398,7 @@ func TestReconcileKind(t *testing.T) {
Key: testKey,
WantCreates: []runtime.Object{
NewConsumerGroup(
WithConsumerGroupFinalizer(),
WithConsumerGroupName(SourceUUID),
WithConsumerGroupNamespace(SourceNamespace),
WithConsumerGroupOwnerRef(kmeta.NewControllerRef(NewSource())),
Expand Down Expand Up @@ -1442,6 +1452,7 @@ func TestReconcileKind(t *testing.T) {
Key: testKey,
WantCreates: []runtime.Object{
NewConsumerGroup(
WithConsumerGroupFinalizer(),
WithConsumerGroupName(SourceUUID),
WithConsumerGroupNamespace(SourceNamespace),
WithConsumerGroupOwnerRef(kmeta.NewControllerRef(NewSource())),
Expand Down
6 changes: 6 additions & 0 deletions control-plane/pkg/reconciler/testing/objects_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,3 +353,9 @@ func ConsumerDeletedTimeStamp() ConsumerOption {
WithDeletedTimeStamp(c)
}
}

func ConsumerFinalizer() ConsumerOption {
return func(c *kafkainternals.Consumer) {
c.Finalizers = []string{"consumers.internal.kafka.eventing.knative.dev"}
}
}
6 changes: 6 additions & 0 deletions control-plane/pkg/reconciler/testing/objects_consumergroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,12 @@ func WithConsumerGroupLabels(labels map[string]string) ConsumerGroupOption {
}
}

func WithConsumerGroupFinalizer() ConsumerGroupOption {
return func(cg *kafkainternals.ConsumerGroup) {
cg.Finalizers = []string{"consumergroups.internal.kafka.eventing.knative.dev"}
}
}

func ConsumerGroupReplicas(replicas int32) ConsumerGroupOption {
return func(cg *kafkainternals.ConsumerGroup) {
cg.Spec.Replicas = pointer.Int32(replicas)
Expand Down

0 comments on commit d4caf75

Please sign in to comment.