Skip to content

Commit

Permalink
Address review comment
Browse files Browse the repository at this point in the history
  • Loading branch information
creydr committed Aug 20, 2024
1 parent c7c740a commit 51759b5
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 9 deletions.
8 changes: 5 additions & 3 deletions control-plane/pkg/reconciler/broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"strings"
"time"

eventingduck "knative.dev/eventing/pkg/apis/duck/v1"

"k8s.io/utils/ptr"

"knative.dev/eventing/pkg/auth"
Expand Down Expand Up @@ -195,7 +197,7 @@ func (r *Reconciler) reconcileKind(ctx context.Context, broker *eventing.Broker)
}

// Get resource configuration.
brokerResource, err := r.reconcilerBrokerResource(ctx, topic, broker, secret, topicConfig, audience)
brokerResource, err := r.reconcilerBrokerResource(ctx, topic, broker, secret, topicConfig, audience, broker.Status.AppliedEventPoliciesStatus)
if err != nil {
return statusConditionManager.FailedToResolveConfig(err)
}
Expand Down Expand Up @@ -621,7 +623,7 @@ func rebuildCMFromStatusAnnotations(br *eventing.Broker) *corev1.ConfigMap {
return cm
}

func (r *Reconciler) reconcilerBrokerResource(ctx context.Context, topic string, broker *eventing.Broker, secret *corev1.Secret, config *kafka.TopicConfig, audience *string) (*contract.Resource, error) {
func (r *Reconciler) reconcilerBrokerResource(ctx context.Context, topic string, broker *eventing.Broker, secret *corev1.Secret, config *kafka.TopicConfig, audience *string, appliedEventPoliciesStatus eventingduck.AppliedEventPoliciesStatus) (*contract.Resource, error) {
features := feature.FromContext(ctx)

resource := &contract.Resource{
Expand Down Expand Up @@ -662,7 +664,7 @@ func (r *Reconciler) reconcilerBrokerResource(ctx context.Context, topic string,
}
resource.EgressConfig = egressConfig

eventPolicies, err := coreconfig.EventPoliciesFromAppliedEventPoliciesStatus(broker.Status.AppliedEventPoliciesStatus, r.EventPolicyLister, broker.Namespace, features)
eventPolicies, err := coreconfig.EventPoliciesFromAppliedEventPoliciesStatus(appliedEventPoliciesStatus, r.EventPolicyLister, broker.Namespace, features)
if err != nil {
return nil, fmt.Errorf("could not get eventpolicies from broker status: %w", err)

Check warning on line 669 in control-plane/pkg/reconciler/broker/broker.go

View check run for this annotation

Codecov / codecov/patch

control-plane/pkg/reconciler/broker/broker.go#L669

Added line #L669 was not covered by tests
}
Expand Down
6 changes: 3 additions & 3 deletions control-plane/pkg/reconciler/channel/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, channel *messagingv1beta
}

// Get resource configuration
channelResource, err := r.getChannelContractResource(ctx, topic, channel, authContext, topicConfig, audience)
channelResource, err := r.getChannelContractResource(ctx, topic, channel, authContext, topicConfig, audience, channel.Status.AppliedEventPoliciesStatus)
if err != nil {
return statusConditionManager.FailedToResolveConfig(err)
}
Expand Down Expand Up @@ -688,7 +688,7 @@ func (r *Reconciler) reconcileConsumerGroup(ctx context.Context, channel *messag
return cg, nil
}

func (r *Reconciler) getChannelContractResource(ctx context.Context, topic string, channel *messagingv1beta1.KafkaChannel, auth *security.NetSpecAuthContext, config *kafka.TopicConfig, audience *string) (*contract.Resource, error) {
func (r *Reconciler) getChannelContractResource(ctx context.Context, topic string, channel *messagingv1beta1.KafkaChannel, auth *security.NetSpecAuthContext, config *kafka.TopicConfig, audience *string, appliedEventPoliciesStatus v1.AppliedEventPoliciesStatus) (*contract.Resource, error) {
features := feature.FromContext(ctx)

resource := &contract.Resource{
Expand Down Expand Up @@ -719,7 +719,7 @@ func (r *Reconciler) getChannelContractResource(ctx context.Context, topic strin
resource.Ingress.Audience = *audience
}

eventPolicies, err := coreconfig.EventPoliciesFromAppliedEventPoliciesStatus(channel.Status.AppliedEventPoliciesStatus, r.EventPolicyLister, channel.Namespace, features)
eventPolicies, err := coreconfig.EventPoliciesFromAppliedEventPoliciesStatus(appliedEventPoliciesStatus, r.EventPolicyLister, channel.Namespace, features)
if err != nil {
return nil, fmt.Errorf("could not get eventpolicies from channel status: %w", err)

Check warning on line 724 in control-plane/pkg/reconciler/channel/channel.go

View check run for this annotation

Codecov / codecov/patch

control-plane/pkg/reconciler/channel/channel.go#L724

Added line #L724 was not covered by tests
}
Expand Down
8 changes: 5 additions & 3 deletions control-plane/pkg/reconciler/sink/kafka_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"fmt"
"time"

eventingduck "knative.dev/eventing/pkg/apis/duck/v1"

corev1 "k8s.io/api/core/v1"
eventingv1alpha1listers "knative.dev/eventing/pkg/client/listers/eventing/v1alpha1"

Expand Down Expand Up @@ -196,7 +198,7 @@ func (r *Reconciler) reconcileKind(ctx context.Context, ks *eventing.KafkaSink)
}

// Get sink configuration.
sinkConfig, err := r.getSinkContractResource(ctx, ks, secret, audience)
sinkConfig, err := r.getSinkContractResource(ctx, ks, secret, audience, ks.Status.AppliedEventPoliciesStatus)
if err != nil {
return statusConditionManager.FailedToResolveConfig(err)

Check warning on line 203 in control-plane/pkg/reconciler/sink/kafka_sink.go

View check run for this annotation

Codecov / codecov/patch

control-plane/pkg/reconciler/sink/kafka_sink.go#L203

Added line #L203 was not covered by tests
}
Expand Down Expand Up @@ -429,7 +431,7 @@ func (r *Reconciler) setTrustBundles(ct *contract.Contract) error {
return nil
}

func (r *Reconciler) getSinkContractResource(ctx context.Context, kafkaSink *eventingv1alpha1.KafkaSink, secret *corev1.Secret, audience *string) (*contract.Resource, error) {
func (r *Reconciler) getSinkContractResource(ctx context.Context, kafkaSink *eventingv1alpha1.KafkaSink, secret *corev1.Secret, audience *string, appliedEventPoliciesStatus eventingduck.AppliedEventPoliciesStatus) (*contract.Resource, error) {
features := feature.FromContext(ctx)
sinkConfig := &contract.Resource{
Uid: string(kafkaSink.UID),
Expand Down Expand Up @@ -463,7 +465,7 @@ func (r *Reconciler) getSinkContractResource(ctx context.Context, kafkaSink *eve
sinkConfig.Ingress.Audience = *audience
}

eventPolicies, err := coreconfig.EventPoliciesFromAppliedEventPoliciesStatus(kafkaSink.Status.AppliedEventPoliciesStatus, r.EventPolicyLister, kafkaSink.Namespace, features)
eventPolicies, err := coreconfig.EventPoliciesFromAppliedEventPoliciesStatus(appliedEventPoliciesStatus, r.EventPolicyLister, kafkaSink.Namespace, features)
if err != nil {
return nil, fmt.Errorf("could not get eventpolicies from kafkasink status: %w", err)

Check warning on line 470 in control-plane/pkg/reconciler/sink/kafka_sink.go

View check run for this annotation

Codecov / codecov/patch

control-plane/pkg/reconciler/sink/kafka_sink.go#L470

Added line #L470 was not covered by tests
}
Expand Down

0 comments on commit 51759b5

Please sign in to comment.