-
Notifications
You must be signed in to change notification settings - Fork 629
feat: add complete request reply data plane #8699
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
2e9d4b3
582ec1d
d6d0a5f
7a6d7e2
b9de746
1f69bc7
e04ebc4
b6159ed
fa42e46
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,147 @@ | ||
| /* | ||
| Copyright 2025 The Knative Authors | ||
|
|
||
| Licensed under the Apache License, Version 2.0 (the "License"); | ||
| you may not use this file except in compliance with the License. | ||
| You may obtain a copy of the License at | ||
|
|
||
| http://www.apache.org/licenses/LICENSE-2.0 | ||
|
|
||
| Unless required by applicable law or agreed to in writing, software | ||
| distributed under the License is distributed on an "AS IS" BASIS, | ||
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| See the License for the specific language governing permissions and | ||
| limitations under the License. | ||
| */ | ||
|
|
||
| package main | ||
|
|
||
| import ( | ||
| "context" | ||
| "log" | ||
|
|
||
| "github.com/kelseyhightower/envconfig" | ||
| "go.uber.org/zap" | ||
| apierrs "k8s.io/apimachinery/pkg/api/errors" | ||
| metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
|
|
||
| "knative.dev/eventing/pkg/eventingtls" | ||
| "knative.dev/eventing/pkg/kncloudevents" | ||
| "knative.dev/eventing/pkg/requestreply" | ||
| kubeclient "knative.dev/pkg/client/injection/kube/client" | ||
| configmapinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/configmap/filtered" | ||
| filteredfactory "knative.dev/pkg/client/injection/kube/informers/factory/filtered" | ||
| configmap "knative.dev/pkg/configmap/informer" | ||
| "knative.dev/pkg/controller" | ||
| "knative.dev/pkg/injection" | ||
| "knative.dev/pkg/logging" | ||
| "knative.dev/pkg/signals" | ||
| "knative.dev/pkg/system" | ||
|
|
||
| requestreplyinformer "knative.dev/eventing/pkg/client/injection/informers/eventing/v1alpha1/requestreply" | ||
| ) | ||
|
|
||
| type envConfig struct { | ||
| HttpPort int `envconfig:"HTTP_PORT" default:"8080"` | ||
| HttpsPort int `envconfig:"HTTPS_PORT" default:"8443"` | ||
| PodIdx int `envconfig:"POD_INDEX" required:"true"` | ||
| SecretsPath string `envconfig:"SECRETS_PATH" required:"true"` | ||
| } | ||
|
|
||
| func main() { | ||
| ctx := signals.NewContext() | ||
|
|
||
| cfg := injection.ParseAndGetRESTConfigOrDie() | ||
| ctx = injection.WithConfig(ctx, cfg) | ||
|
|
||
| var env envConfig | ||
| if err := envconfig.Process("", &env); err != nil { | ||
| log.Fatal("Failed to process env var:", err) | ||
| } | ||
|
|
||
| ctx = filteredfactory.WithSelectors(ctx, | ||
| eventingtls.TrustBundleLabelSelector, | ||
| ) | ||
|
|
||
| log.Printf("Registering %d clients", len(injection.Default.GetClients())) | ||
| log.Printf("Registering %d informer factories", len(injection.Default.GetInformerFactories())) | ||
| log.Printf("Registering %d informers", len(injection.Default.GetInformers())) | ||
|
|
||
| ctx, informers := injection.Default.SetupInformers(ctx, cfg) | ||
|
|
||
| loggingConfig, err := getLoggingConfig(ctx, system.Namespace(), logging.ConfigMapName()) | ||
| if err != nil { | ||
| log.Fatal("Error loading/parsing logging configuration:", err) | ||
| } | ||
|
|
||
| sl, atomicLevel := logging.NewLoggerFromConfig(loggingConfig, "request-reply") | ||
| defer flush(sl) | ||
|
|
||
| ctx = logging.WithLogger(ctx, sl) | ||
|
|
||
| logger := sl.Desugar() | ||
|
|
||
| logger.Info("Starting the RequestReply Data Plane") | ||
|
|
||
| kubeClient := kubeclient.Get(ctx) | ||
|
|
||
| configMapWatcher := configmap.NewInformedWatcher(kubeClient, system.Namespace()) | ||
|
|
||
| configMapWatcher.Watch(logging.ConfigMapName(), logging.UpdateLevelFromConfigMap(sl, atomicLevel, "request-reply")) | ||
|
|
||
| trustBundleConfigMapLister := configmapinformer.Get(ctx, eventingtls.TrustBundleLabelSelector).Lister().ConfigMaps(system.Namespace()) | ||
|
|
||
| keyStore := &requestreply.AESKeyStore{ | ||
| Logger: sl.Named("key-store"), | ||
| } | ||
|
|
||
| err = keyStore.WatchPath(env.SecretsPath) | ||
| if err != nil { | ||
| logger.Fatal("failed to watch secrets file path", zap.Error(err)) | ||
| } | ||
| defer keyStore.StopWatch() | ||
|
|
||
| handler := requestreply.NewHandler( | ||
| logger, | ||
| requestreplyinformer.Get(ctx), | ||
| trustBundleConfigMapLister, | ||
| keyStore, | ||
| env.PodIdx, | ||
| ) | ||
|
|
||
| sm, err := eventingtls.NewServerManager(ctx, | ||
| kncloudevents.NewHTTPEventReceiver(env.HttpPort), | ||
| kncloudevents.NewHTTPEventReceiver(env.HttpsPort), // TODO: add tls config when we have it | ||
| handler, | ||
| configMapWatcher, | ||
| ) | ||
| if err != nil { | ||
| logger.Fatal("failed to start eventingtls server", zap.Error(err)) | ||
| } | ||
|
|
||
| logger.Info("Starting informers") | ||
| if err := controller.StartInformers(ctx.Done(), informers...); err != nil { | ||
| logger.Fatal("Failed to start informers", zap.Error(err)) | ||
| } | ||
|
|
||
| logger.Info("Starting server") | ||
| if err := sm.StartServers(ctx); err != nil { | ||
| logger.Fatal("StartServers() returned an error", zap.Error(err)) | ||
| } | ||
| logger.Info("Exiting...") | ||
| } | ||
|
|
||
| func flush(sl *zap.SugaredLogger) { | ||
| _ = sl.Sync() | ||
| } | ||
|
|
||
| func getLoggingConfig(ctx context.Context, namespace, loggingConfigMapName string) (*logging.Config, error) { | ||
| loggingConfigMap, err := kubeclient.Get(ctx).CoreV1().ConfigMaps(namespace).Get(ctx, loggingConfigMapName, metav1.GetOptions{}) | ||
| if apierrs.IsNotFound(err) { | ||
| return logging.NewConfigFromConfigMap(nil) | ||
| } else if err != nil { | ||
| return nil, err | ||
| } | ||
|
|
||
| return logging.NewConfigFromConfigMap(loggingConfigMap) | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,39 @@ | ||
| # Copyright 2020 The Knative Authors | ||
| # | ||
| # Licensed under the Apache License, Version 2.0 (the "License"); | ||
| # you may not use this file except in compliance with the License. | ||
| # You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, software | ||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
|
|
||
| apiVersion: v1 | ||
| kind: ServiceAccount | ||
| metadata: | ||
| name: request-reply | ||
| namespace: knative-eventing | ||
| labels: | ||
| app.kubernetes.io/version: devel | ||
| app.kubernetes.io/name: knative-eventing | ||
|
|
||
| --- | ||
| apiVersion: rbac.authorization.k8s.io/v1 | ||
| kind: ClusterRoleBinding | ||
| metadata: | ||
| name: knative-eventing-request-reply | ||
| labels: | ||
| app.kubernetes.io/version: devel | ||
| app.kubernetes.io/name: knative-eventing | ||
| subjects: | ||
| - kind: ServiceAccount | ||
| name: request-reply | ||
| namespace: knative-eventing | ||
| roleRef: | ||
| kind: ClusterRole | ||
| name: knative-eventing-request-reply | ||
| apiGroup: rbac.authorization.k8s.io |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,111 @@ | ||
| # Copyright 2025 The Knative Authors | ||
| # | ||
| # Licensed under the Apache License, Version 2.0 (the "License"); | ||
| # you may not use this file except in compliance with the License. | ||
| # You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, software | ||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
|
|
||
| apiVersion: apps/v1 | ||
creydr marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| kind: StatefulSet | ||
| metadata: | ||
| name: request-reply | ||
| namespace: knative-eventing | ||
| labels: | ||
| app.kubernetes.io/component: request-reply | ||
| app.kubernetes.io/version: devel | ||
| app.kubernetes.io/name: knative-eventing | ||
| spec: | ||
| replicas: 1 | ||
| selector: | ||
| matchLabels: | ||
| eventing.knative.dev/part-of: request-reply | ||
| template: | ||
| metadata: | ||
| labels: | ||
| eventing.knative.dev/part-of: request-reply | ||
| app.kubernetes.io/component: request-reply | ||
| app.kubernetes.io/version: devel | ||
| app.kubernetes.io/name: knative-eventing | ||
| spec: | ||
| containers: | ||
| - name: request-reply | ||
| image: ko://knative.dev/eventing/cmd/requestreply | ||
| volumeMounts: | ||
| - name: aes-keys | ||
| mountPath: /etc/secrets | ||
| env: | ||
| - name: SYSTEM_NAMESPACE | ||
| valueFrom: | ||
| fieldRef: | ||
| fieldPath: metadata.namespace | ||
| - name: POD_INDEX | ||
| valueFrom: | ||
| fieldRef: | ||
| fieldPath: metadata.labels['apps.kubernetes.io/pod-index'] | ||
| - name: HTTP_PORT | ||
| value: "8080" | ||
| - name: HTTPS_PORT | ||
| value: "8443" | ||
| - name: SECRETS_PATH | ||
| value: "/etc/secrets" | ||
| - name: CONFIG_LOGGING_NAME | ||
| value: config-logging | ||
| - name: CONFIG_OBSERVABILITY_NAME | ||
| value: config-observability | ||
|
|
||
| ports: | ||
| - containerPort: 8080 | ||
| name: http | ||
| protocol: TCP | ||
| - containerPort: 8443 | ||
| name: https | ||
| protocol: TCP | ||
|
|
||
| serviceAccountName: request-reply | ||
| volumes: | ||
| - name: aes-keys | ||
| secret: | ||
| secretName: request-reply-keys | ||
| restartPolicy: Always | ||
|
|
||
| --- | ||
| apiVersion: v1 | ||
| kind: Service | ||
| metadata: | ||
| labels: | ||
| eventing.knative.dev/part-of: request-reply | ||
| app.kubernetes.io/component: request-reply | ||
| app.kubernetes.io/version: devel | ||
| app.kubernetes.io/name: knative-eventing | ||
| name: request-reply | ||
| namespace: knative-eventing | ||
| spec: | ||
| ports: | ||
| - name: http | ||
| port: 80 | ||
| protocol: TCP | ||
| targetPort: 8080 | ||
| - name: https | ||
| port: 443 | ||
| protocol: TCP | ||
| targetPort: 8443 | ||
| selector: | ||
| eventing.knative.dev/part-of: request-reply | ||
| --- | ||
| apiVersion: v1 | ||
| kind: Secret | ||
| metadata: | ||
| labels: | ||
| eventing.knative.dev/part-of: request-reply | ||
| app.kubernetes.io/component: request-reply | ||
| app.kubernetes.io/version: devel | ||
| app.kubernetes.io/name: knative-eventing | ||
| name: request-reply-keys | ||
| namespace: knative-eventing | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,67 @@ | ||
| # Copyright 2025 The Knative Authors | ||
| # | ||
| # Licensed under the Apache License, Version 2.0 (the "License"); | ||
| # you may not use this file except in compliance with the License. | ||
| # You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, software | ||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
|
|
||
| apiVersion: rbac.authorization.k8s.io/v1 | ||
| kind: ClusterRole | ||
| metadata: | ||
| name: knative-eventing-request-reply | ||
| labels: | ||
| app.kubernetes.io/version: devel | ||
| app.kubernetes.io/name: knative-eventing | ||
| rules: | ||
| - apiGroups: | ||
| - "" | ||
| resources: | ||
| - "configmaps" | ||
| - "secrets" | ||
| verbs: | ||
| - "get" | ||
| - "list" | ||
| - "watch" | ||
| - apiGroups: | ||
| - eventing.knative.dev | ||
| resources: | ||
| - requestreplies | ||
| - requestreplies/status | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should we add a separate one for finalizers and patch/update - like other roles?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think so? We only need this for the data plane to be able to see which resources exist/we should be handling - we don't touch the finalizers in this code afaik |
||
| verbs: | ||
| - get | ||
| - list | ||
| - watch | ||
| - patch | ||
| - apiGroups: | ||
| - "" | ||
| resources: | ||
| - events | ||
| verbs: | ||
| - "create" | ||
| - "patch" | ||
| - apiGroups: | ||
| - coordination.k8s.io | ||
| resources: | ||
| - leases | ||
| verbs: | ||
| - get | ||
| - list | ||
| - watch | ||
| - create | ||
| - update | ||
| - patch | ||
| - apiGroups: | ||
| - eventing.knative.dev | ||
| resources: | ||
| - eventpolicies | ||
| verbs: | ||
| - get | ||
| - list | ||
| - watch | ||
Uh oh!
There was an error while loading. Please reload this page.