Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 147 additions & 0 deletions cmd/requestreply/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/*
Copyright 2025 The Knative Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package main

import (
"context"
"log"

"github.com/kelseyhightower/envconfig"
"go.uber.org/zap"
apierrs "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"knative.dev/eventing/pkg/eventingtls"
"knative.dev/eventing/pkg/kncloudevents"
"knative.dev/eventing/pkg/requestreply"
kubeclient "knative.dev/pkg/client/injection/kube/client"
configmapinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/configmap/filtered"
filteredfactory "knative.dev/pkg/client/injection/kube/informers/factory/filtered"
configmap "knative.dev/pkg/configmap/informer"
"knative.dev/pkg/controller"
"knative.dev/pkg/injection"
"knative.dev/pkg/logging"
"knative.dev/pkg/signals"
"knative.dev/pkg/system"

requestreplyinformer "knative.dev/eventing/pkg/client/injection/informers/eventing/v1alpha1/requestreply"
)

type envConfig struct {
HttpPort int `envconfig:"HTTP_PORT" default:"8080"`
HttpsPort int `envconfig:"HTTPS_PORT" default:"8443"`
PodIdx int `envconfig:"POD_INDEX" required:"true"`
SecretsPath string `envconfig:"SECRETS_PATH" required:"true"`
}

func main() {
ctx := signals.NewContext()

cfg := injection.ParseAndGetRESTConfigOrDie()
ctx = injection.WithConfig(ctx, cfg)

var env envConfig
if err := envconfig.Process("", &env); err != nil {
log.Fatal("Failed to process env var:", err)
}

ctx = filteredfactory.WithSelectors(ctx,
eventingtls.TrustBundleLabelSelector,
)

log.Printf("Registering %d clients", len(injection.Default.GetClients()))
log.Printf("Registering %d informer factories", len(injection.Default.GetInformerFactories()))
log.Printf("Registering %d informers", len(injection.Default.GetInformers()))

ctx, informers := injection.Default.SetupInformers(ctx, cfg)

loggingConfig, err := getLoggingConfig(ctx, system.Namespace(), logging.ConfigMapName())
if err != nil {
log.Fatal("Error loading/parsing logging configuration:", err)
}

sl, atomicLevel := logging.NewLoggerFromConfig(loggingConfig, "request-reply")
defer flush(sl)

ctx = logging.WithLogger(ctx, sl)

logger := sl.Desugar()

logger.Info("Starting the RequestReply Data Plane")

kubeClient := kubeclient.Get(ctx)

configMapWatcher := configmap.NewInformedWatcher(kubeClient, system.Namespace())

configMapWatcher.Watch(logging.ConfigMapName(), logging.UpdateLevelFromConfigMap(sl, atomicLevel, "request-reply"))

trustBundleConfigMapLister := configmapinformer.Get(ctx, eventingtls.TrustBundleLabelSelector).Lister().ConfigMaps(system.Namespace())

keyStore := &requestreply.AESKeyStore{
Logger: sl.Named("key-store"),
}

err = keyStore.WatchPath(env.SecretsPath)
if err != nil {
logger.Fatal("failed to watch secrets file path", zap.Error(err))
}
defer keyStore.StopWatch()

handler := requestreply.NewHandler(
logger,
requestreplyinformer.Get(ctx),
trustBundleConfigMapLister,
keyStore,
env.PodIdx,
)

sm, err := eventingtls.NewServerManager(ctx,
kncloudevents.NewHTTPEventReceiver(env.HttpPort),
kncloudevents.NewHTTPEventReceiver(env.HttpsPort), // TODO: add tls config when we have it
handler,
configMapWatcher,
)
if err != nil {
logger.Fatal("failed to start eventingtls server", zap.Error(err))
}

logger.Info("Starting informers")
if err := controller.StartInformers(ctx.Done(), informers...); err != nil {
logger.Fatal("Failed to start informers", zap.Error(err))
}

logger.Info("Starting server")
if err := sm.StartServers(ctx); err != nil {
logger.Fatal("StartServers() returned an error", zap.Error(err))
}
logger.Info("Exiting...")
}

func flush(sl *zap.SugaredLogger) {
_ = sl.Sync()
}

func getLoggingConfig(ctx context.Context, namespace, loggingConfigMapName string) (*logging.Config, error) {
loggingConfigMap, err := kubeclient.Get(ctx).CoreV1().ConfigMaps(namespace).Get(ctx, loggingConfigMapName, metav1.GetOptions{})
if apierrs.IsNotFound(err) {
return logging.NewConfigFromConfigMap(nil)
} else if err != nil {
return nil, err
}

return logging.NewConfigFromConfigMap(loggingConfigMap)
}
39 changes: 39 additions & 0 deletions config/core/200-request-reply-serviceaccount.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# Copyright 2020 The Knative Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

apiVersion: v1
kind: ServiceAccount
metadata:
name: request-reply
namespace: knative-eventing
labels:
app.kubernetes.io/version: devel
app.kubernetes.io/name: knative-eventing

---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: knative-eventing-request-reply
labels:
app.kubernetes.io/version: devel
app.kubernetes.io/name: knative-eventing
subjects:
- kind: ServiceAccount
name: request-reply
namespace: knative-eventing
roleRef:
kind: ClusterRole
name: knative-eventing-request-reply
apiGroup: rbac.authorization.k8s.io
111 changes: 111 additions & 0 deletions config/core/deployments/request-reply.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
# Copyright 2025 The Knative Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

apiVersion: apps/v1
kind: StatefulSet
metadata:
name: request-reply
namespace: knative-eventing
labels:
app.kubernetes.io/component: request-reply
app.kubernetes.io/version: devel
app.kubernetes.io/name: knative-eventing
spec:
replicas: 1
selector:
matchLabels:
eventing.knative.dev/part-of: request-reply
template:
metadata:
labels:
eventing.knative.dev/part-of: request-reply
app.kubernetes.io/component: request-reply
app.kubernetes.io/version: devel
app.kubernetes.io/name: knative-eventing
spec:
containers:
- name: request-reply
image: ko://knative.dev/eventing/cmd/requestreply
volumeMounts:
- name: aes-keys
mountPath: /etc/secrets
env:
- name: SYSTEM_NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
- name: POD_INDEX
valueFrom:
fieldRef:
fieldPath: metadata.labels['apps.kubernetes.io/pod-index']
- name: HTTP_PORT
value: "8080"
- name: HTTPS_PORT
value: "8443"
- name: SECRETS_PATH
value: "/etc/secrets"
- name: CONFIG_LOGGING_NAME
value: config-logging
- name: CONFIG_OBSERVABILITY_NAME
value: config-observability

ports:
- containerPort: 8080
name: http
protocol: TCP
- containerPort: 8443
name: https
protocol: TCP

serviceAccountName: request-reply
volumes:
- name: aes-keys
secret:
secretName: request-reply-keys
restartPolicy: Always

---
apiVersion: v1
kind: Service
metadata:
labels:
eventing.knative.dev/part-of: request-reply
app.kubernetes.io/component: request-reply
app.kubernetes.io/version: devel
app.kubernetes.io/name: knative-eventing
name: request-reply
namespace: knative-eventing
spec:
ports:
- name: http
port: 80
protocol: TCP
targetPort: 8080
- name: https
port: 443
protocol: TCP
targetPort: 8443
selector:
eventing.knative.dev/part-of: request-reply
---
apiVersion: v1
kind: Secret
metadata:
labels:
eventing.knative.dev/part-of: request-reply
app.kubernetes.io/component: request-reply
app.kubernetes.io/version: devel
app.kubernetes.io/name: knative-eventing
name: request-reply-keys
namespace: knative-eventing
67 changes: 67 additions & 0 deletions config/core/roles/request-reply-clusterrole.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# Copyright 2025 The Knative Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: knative-eventing-request-reply
labels:
app.kubernetes.io/version: devel
app.kubernetes.io/name: knative-eventing
rules:
- apiGroups:
- ""
resources:
- "configmaps"
- "secrets"
verbs:
- "get"
- "list"
- "watch"
- apiGroups:
- eventing.knative.dev
resources:
- requestreplies
- requestreplies/status
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we add a separate one for finalizers and patch/update - like other roles?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so? We only need this for the data plane to be able to see which resources exist/we should be handling - we don't touch the finalizers in this code afaik

verbs:
- get
- list
- watch
- patch
- apiGroups:
- ""
resources:
- events
verbs:
- "create"
- "patch"
- apiGroups:
- coordination.k8s.io
resources:
- leases
verbs:
- get
- list
- watch
- create
- update
- patch
- apiGroups:
- eventing.knative.dev
resources:
- eventpolicies
verbs:
- get
- list
- watch
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ require (
github.com/cloudevents/sdk-go/v2 v2.16.1
github.com/coreos/go-oidc/v3 v3.9.0
github.com/eclipse/paho.golang v0.12.0
github.com/fsnotify/fsnotify v1.7.0
github.com/go-jose/go-jose/v3 v3.0.4
github.com/google/go-cmp v0.7.0
github.com/google/uuid v1.6.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,8 @@ github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2
github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/fsnotify/fsnotify v1.5.1/go.mod h1:T3375wBYaZdLLcVNkcVbzGHY7f1l/uK5T5Ai1i3InKU=
github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA=
github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM=
github.com/fxamacker/cbor/v2 v2.7.0 h1:iM5WgngdRBanHcxugY4JySA0nk1wZorNOpTgCMedv5E=
github.com/fxamacker/cbor/v2 v2.7.0/go.mod h1:pxXPTn3joSm21Gbwsv0w9OSA2y1HFR9qXEeXQVeNoDQ=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
Expand Down
Loading
Loading