Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Provision contract fields for authz (contract update afterwards) #4065

Prev Previous commit
Next Next commit
Refactor KafkaChannel reconcilation to update contract at the end of …
…reconcilation
  • Loading branch information
creydr committed Aug 19, 2024
commit 2e49802d795ca778f44f12a61298a24b415824ef
125 changes: 63 additions & 62 deletions control-plane/pkg/reconciler/channel/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,25 +210,6 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, channel *messagingv1beta
}
logger.Debug("Got contract config map")

// Get data plane config data.
ct, err := r.GetDataPlaneConfigMapData(logger, contractConfigMap)
if err != nil || ct == nil {
return statusConditionManager.FailedToGetDataFromConfigMap(err)
}
logger.Debug("Got contract data from config map", zap.Any(base.ContractLogKey, ct))

_, err = r.setTrustBundles(ct)
if err != nil {
return statusConditionManager.FailedToResolveConfig(err)
}

// Get resource configuration
channelResource, err := r.getChannelContractResource(ctx, topic, channel, authContext, topicConfig)
if err != nil {
return statusConditionManager.FailedToResolveConfig(err)
}
coreconfig.SetDeadLetterSinkURIFromEgressConfig(&channel.Status.DeliveryStatus, channelResource.EgressConfig)

allReady, subscribersError := r.reconcileSubscribers(ctx, channel, topicName, topicConfig.BootstrapServers, secret)
if subscribersError != nil {
channel.GetConditionSet().Manage(&channel.Status).MarkFalse(KafkaChannelConditionSubscribersReady, "failed to reconcile all subscribers", subscribersError.Error())
Expand All @@ -240,38 +221,6 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, channel *messagingv1beta
channel.GetConditionSet().Manage(&channel.Status).MarkTrue(KafkaChannelConditionSubscribersReady)
}

// Update contract data with the new contract configuration (add/update channel resource)
channelIndex := coreconfig.FindResource(ct, channel.UID)
coreconfig.AddOrUpdateResourceConfig(ct, channelResource, channelIndex, logger)
// Update the configuration map with the new contract data.
if err := r.UpdateDataPlaneConfigMap(ctx, ct, contractConfigMap); err != nil {
logger.Error("failed to update data plane config map", zap.Error(
statusConditionManager.FailedToUpdateConfigMap(err),
))
return err
}
logger.Debug("Contract config map updated")
statusConditionManager.ConfigMapUpdated()

// We update receiver pods annotation regardless of our contract changed or not due to the fact
// that in a previous reconciliation we might have failed to update one of our data plane pod annotation, so we want
// to anyway update remaining annotations with the contract generation that was saved in the CM.

// We reject events to a non-existing Channel, which means that we cannot consider a Channel Ready if all
// receivers haven't got the Channel, so update failures to receiver pods is a hard failure.
// On the other side, dispatcher pods care about Subscriptions, and the Channel object is used as a configuration
// prototype for all associated Subscriptions, so we consider that it's fine on the dispatcher side to receive eventually
// the update even if here eventually means seconds or minutes after the actual update.

// Update volume generation annotation of receiver pods
if err := r.UpdateReceiverPodsContractGenerationAnnotation(ctx, logger, ct.Generation); err != nil {
logger.Error("Failed to update receiver pod annotation", zap.Error(
statusConditionManager.FailedToUpdateReceiverPodsAnnotation(err),
))
return err
}
logger.Debug("Updated receiver pod annotation")

channelService, err := r.reconcileChannelService(ctx, channel)
if err != nil {
logger.Error("Error reconciling the backwards compatibility channel service.", zap.Error(err))
Expand All @@ -288,7 +237,6 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, channel *messagingv1beta
audience = nil
}

var addressableStatus duckv1.AddressStatus
channelHttpsHost := network.GetServiceHostname(r.Env.IngressName, r.SystemNamespace)
channelHttpHost := network.GetServiceHostname(channelService.Name, channel.Namespace)
if featureFlags.IsPermissiveTransportEncryption() {
Expand All @@ -304,8 +252,8 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, channel *messagingv1beta
// - status.addresses:
// - https address with path-based routing
// - http address with path-based routing
addressableStatus.Addresses = []duckv1.Addressable{httpsAddress, httpAddress}
addressableStatus.Address = &httpAddress
channel.Status.Addresses = []duckv1.Addressable{httpsAddress, httpAddress}
channel.Status.Address = &httpAddress
} else if featureFlags.IsStrictTransportEncryption() {
// Strict mode: (only https addresses)
// - status.address https address with path-based routing
Expand All @@ -317,16 +265,72 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, channel *messagingv1beta
}

httpsAddress := receiver.HTTPSAddress(channelHttpsHost, audience, channel, caCerts)
addressableStatus.Addresses = []duckv1.Addressable{httpsAddress}
addressableStatus.Address = &httpsAddress
channel.Status.Addresses = []duckv1.Addressable{httpsAddress}
channel.Status.Address = &httpsAddress
} else {
httpAddress := receiver.ChannelHTTPAddress(channelHttpHost, audience)
addressableStatus.Address = &httpAddress
addressableStatus.Addresses = []duckv1.Addressable{httpAddress}
channel.Status.Address = &httpAddress
channel.Status.Addresses = []duckv1.Addressable{httpAddress}
}

channel.GetConditionSet().Manage(channel.GetStatus()).MarkTrue(base.ConditionAddressable)

// update contract at end of reconcilation to have updated status fields in contract too

// Get data plane config data.
ct, err := r.GetDataPlaneConfigMapData(logger, contractConfigMap)
if err != nil || ct == nil {
return statusConditionManager.FailedToGetDataFromConfigMap(err)
}
logger.Debug("Got contract data from config map", zap.Any(base.ContractLogKey, ct))

_, err = r.setTrustBundles(ct)
if err != nil {
return statusConditionManager.FailedToResolveConfig(err)
}

// Get resource configuration
channelResource, err := r.getChannelContractResource(ctx, topic, channel, authContext, topicConfig)
if err != nil {
return statusConditionManager.FailedToResolveConfig(err)
}
coreconfig.SetDeadLetterSinkURIFromEgressConfig(&channel.Status.DeliveryStatus, channelResource.EgressConfig)

// Update contract data with the new contract configuration (add/update channel resource)
channelIndex := coreconfig.FindResource(ct, channel.UID)
coreconfig.AddOrUpdateResourceConfig(ct, channelResource, channelIndex, logger)
// Update the configuration map with the new contract data.
if err := r.UpdateDataPlaneConfigMap(ctx, ct, contractConfigMap); err != nil {
logger.Error("failed to update data plane config map", zap.Error(
statusConditionManager.FailedToUpdateConfigMap(err),
))
return err
}
logger.Debug("Contract config map updated")
statusConditionManager.ConfigMapUpdated()

// We update receiver pods annotation regardless of our contract changed or not due to the fact
// that in a previous reconciliation we might have failed to update one of our data plane pod annotation, so we want
// to anyway update remaining annotations with the contract generation that was saved in the CM.

// We reject events to a non-existing Channel, which means that we cannot consider a Channel Ready if all
// receivers haven't got the Channel, so update failures to receiver pods is a hard failure.
// On the other side, dispatcher pods care about Subscriptions, and the Channel object is used as a configuration
// prototype for all associated Subscriptions, so we consider that it's fine on the dispatcher side to receive eventually
// the update even if here eventually means seconds or minutes after the actual update.

// Update volume generation annotation of receiver pods
if err := r.UpdateReceiverPodsContractGenerationAnnotation(ctx, logger, ct.Generation); err != nil {
logger.Error("Failed to update receiver pod annotation", zap.Error(
statusConditionManager.FailedToUpdateReceiverPodsAnnotation(err),
))
return err
}
logger.Debug("Updated receiver pod annotation")

// do probing after contract got updated and dataplane is aware of changes
proberAddressable := prober.ProberAddressable{
AddressStatus: &addressableStatus,
AddressStatus: &channel.Status.AddressStatus,
ResourceKey: types.NamespacedName{
Namespace: channel.GetNamespace(),
Name: channel.GetName(),
Expand All @@ -341,9 +345,6 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, channel *messagingv1beta
}

statusConditionManager.ProbesStatusReady()
channel.Status.Address = addressableStatus.Address
channel.Status.Addresses = addressableStatus.Addresses
channel.GetConditionSet().Manage(channel.GetStatus()).MarkTrue(base.ConditionAddressable)

return nil
}
Expand Down
10 changes: 8 additions & 2 deletions control-plane/pkg/reconciler/channel/channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,7 @@ func TestReconcileKind(t *testing.T) {
StatusTopicReadyWithName(ChannelTopic()),
StatusProbeFailed(prober.StatusNotReady),
StatusChannelSubscribers(),
ChannelAddressable(&env),
),
},
},
Expand Down Expand Up @@ -418,6 +419,7 @@ func TestReconcileKind(t *testing.T) {
StatusTopicReadyWithName(ChannelTopic()),
StatusProbeFailed(prober.StatusUnknown),
StatusChannelSubscribers(),
ChannelAddressable(&env),
),
},
},
Expand Down Expand Up @@ -1641,6 +1643,7 @@ func TestReconcileKind(t *testing.T) {
kafka.BootstrapServersConfigMapKey: ChannelBootstrapServers,
}),
NewConfigMapWithBinaryData(env.DataPlaneConfigMapNamespace, env.ContractConfigMapName, []byte("corrupt")),
NewPerChannelService(&env),
},
Key: testKey,
WantErr: true,
Expand All @@ -1652,6 +1655,8 @@ func TestReconcileKind(t *testing.T) {
StatusConfigParsed,
WithChannelTopicStatusAnnotation(ChannelTopic()),
StatusTopicReadyWithName(ChannelTopic()),
ChannelAddressable(&env),
StatusChannelSubscribers(),
StatusConfigMapNotUpdatedReady(
"Failed to get contract data from ConfigMap: knative-eventing/kafka-channel-channels-subscriptions",
"failed to unmarshal contract: 'corrupt'",
Expand Down Expand Up @@ -1964,8 +1969,9 @@ func TestReconcileKind(t *testing.T) {
BootstrapServers: ChannelBootstrapServers,
Reference: ChannelReference(),
Ingress: &contract.Ingress{
Host: receiver.Host(ChannelNamespace, ChannelName),
Path: receiver.Path(ChannelNamespace, ChannelName),
Host: receiver.Host(ChannelNamespace, ChannelName),
Path: receiver.Path(ChannelNamespace, ChannelName),
Audience: ChannelAudience,
},
},
},
Expand Down