Skip to content
This repository was archived by the owner on Dec 11, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
add resources common
  • Loading branch information
odacremolbap committed Sep 23, 2022
commit 914c46a93c9ed28b69e2691ba534b6e85c876f0b
16 changes: 16 additions & 0 deletions config/200-clusterroles.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,22 @@ rules:
- delete
- patch

# Manage broker services
- apiGroups:
- ''
resources:
- services
verbs:
- get
- list
- watch
- create
- update
- delete
- patch



# Read reconciled TriggerMesh core resources and update their statuses
# +rbac-check
- apiGroups:
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/triggermesh/triggermesh-core
go 1.19

require (
github.com/stretchr/testify v1.7.0
go.uber.org/zap v1.21.0
k8s.io/api v0.24.4
k8s.io/apimachinery v0.24.4
Expand Down Expand Up @@ -54,6 +55,7 @@ require (
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_golang v1.12.1 // indirect
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.32.1 // indirect
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,22 @@ import (
)

const (
BrokerConditionReady = apis.ConditionReady
BrokerConditionIngress apis.ConditionType = "IngressReady"
BrokerConditionAddressable apis.ConditionType = "Addressable"
RedisBrokerConditionReady = apis.ConditionReady
RedisBrokerRedisDeployment apis.ConditionType = "RedisDeploymentReady"
RedisBrokerRedisService apis.ConditionType = "RedisServiceReady"
RedisBrokerBrokerDeployment apis.ConditionType = "BrokerDeploymentReady"
RedisBrokerBrokerService apis.ConditionType = "BrokerServiceReady"
RedisBrokerConditionAddressable apis.ConditionType = "Addressable"
)

var brokerCondSet = apis.NewLivingConditionSet(
BrokerConditionIngress,
BrokerConditionAddressable,
var redisBrokerCondSet = apis.NewLivingConditionSet(
RedisBrokerRedisDeployment,
RedisBrokerRedisService,
RedisBrokerBrokerDeployment,
RedisBrokerBrokerService,
RedisBrokerConditionAddressable,
)
var brokerCondSetLock = sync.RWMutex{}
var redisBrokerCondSetLock = sync.RWMutex{}

// GetGroupVersionKind returns GroupVersionKind for Brokers
func (t *RedisBroker) GetGroupVersionKind() schema.GroupVersionKind {
Expand All @@ -36,26 +42,26 @@ func (t *RedisBroker) GetStatus() *duckv1.Status {

// RegisterAlternateBrokerConditionSet register a apis.ConditionSet for the given broker class.
func RegisterAlternateBrokerConditionSet(conditionSet apis.ConditionSet) {
brokerCondSetLock.Lock()
defer brokerCondSetLock.Unlock()
redisBrokerCondSetLock.Lock()
defer redisBrokerCondSetLock.Unlock()

brokerCondSet = conditionSet
redisBrokerCondSet = conditionSet
}

// GetConditionSet retrieves the condition set for this resource. Implements the KRShaped interface.
func (b *RedisBroker) GetConditionSet() apis.ConditionSet {
brokerCondSetLock.RLock()
defer brokerCondSetLock.RUnlock()
redisBrokerCondSetLock.RLock()
defer redisBrokerCondSetLock.RUnlock()

return brokerCondSet
return redisBrokerCondSet
}

// GetConditionSet retrieves the condition set for this resource.
func (bs *RedisBrokerStatus) GetConditionSet() apis.ConditionSet {
brokerCondSetLock.RLock()
defer brokerCondSetLock.RUnlock()
redisBrokerCondSetLock.RLock()
defer redisBrokerCondSetLock.RUnlock()

return brokerCondSet
return redisBrokerCondSet
}

// GetTopLevelCondition returns the top level Condition.
Expand All @@ -68,9 +74,9 @@ func (bs *RedisBrokerStatus) GetTopLevelCondition() *apis.Condition {
func (bs *RedisBrokerStatus) SetAddress(url *apis.URL) {
bs.Address.URL = url
if url != nil {
bs.GetConditionSet().Manage(bs).MarkTrue(BrokerConditionAddressable)
bs.GetConditionSet().Manage(bs).MarkTrue(RedisBrokerConditionAddressable)
} else {
bs.GetConditionSet().Manage(bs).MarkFalse(BrokerConditionAddressable, "nil URL", "URL is nil")
bs.GetConditionSet().Manage(bs).MarkFalse(RedisBrokerConditionAddressable, "nil URL", "URL is nil")
}
}

Expand All @@ -90,3 +96,7 @@ func (b *RedisBroker) IsReady() bool {
func (bs *RedisBrokerStatus) InitializeConditions() {
bs.GetConditionSet().Manage(bs).InitializeConditions()
}

func (bs *RedisBrokerStatus) MarkRedisBrokerFailed(reason, messageFormat string, messageA ...interface{}) {
redisBrokerCondSet.Manage(bs).MarkFalse(RedisBrokerRedisDeployment, reason, messageFormat, messageA...)
}
37 changes: 37 additions & 0 deletions pkg/reconciler/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright 2022 TriggerMesh Inc.
// SPDX-License-Identifier: Apache-2.0

package reconciler

// Reasons for API Events
const (
// // ReasonRBACCreate indicates that an RBAC object was successfully created.
// ReasonRBACCreate = "CreateRBAC"
// // ReasonRBACUpdate indicates that an RBAC object was successfully updated.
// ReasonRBACUpdate = "UpdateRBAC"
// // ReasonFailedRBACCreate indicates that the creation of an RBAC object failed.
// ReasonFailedRBACCreate = "FailedRBACCreate"
// // ReasonFailedRBACUpdate indicates that the update of an RBAC object failed.
// ReasonFailedRBACUpdate = "FailedRBACUpdate"

ReasonDeploymentCreate = "CreateDeployment"
ReasonDeploymentGet = "GetDeployment"
ReasonDeploymentUpdate = "UpdateDeployment"
ReasonFailedDeploymentCreate = "FailedDeploymentCreate"
ReasonFailedDeploymentUpdate = "FailedDeploymentUpdate"

// // ReasonServiceCreate indicates that an Service object was successfully created.
// ReasonServiceCreate = "CreateService"
// // ReasonServiceUpdate indicates that an Service object was successfully updated.
// ReasonServiceUpdate = "UpdateService"
// // ReasonFailedServiceCreate indicates that the creation of an Service object failed.
// ReasonFailedServiceCreate = "FailedServiceCreate"
// // ReasonFailedServiceUpdate indicates that the update of an Service object failed.
// ReasonFailedServiceUpdate = "FailedServiceUpdate"

// // ReasonBadSinkURI indicates that the URI of a sink can't be determined.
// ReasonBadSinkURI = "BadSinkURI"

// // ReasonInvalidSpec indicates that spec of a reconciled object is invalid.
// ReasonInvalidSpec = "InvalidSpec"
)
36 changes: 34 additions & 2 deletions pkg/reconciler/redisbroker/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,16 @@ package redisbroker
import (
"context"

"k8s.io/client-go/tools/cache"
kubeclient "knative.dev/pkg/client/injection/kube/client"
"knative.dev/pkg/client/injection/kube/informers/apps/v1/deployment"
"knative.dev/pkg/client/injection/kube/informers/core/v1/service"
"knative.dev/pkg/client/injection/kube/informers/core/v1/serviceaccount"
"knative.dev/pkg/client/injection/kube/informers/rbac/v1/rolebinding"
"knative.dev/pkg/configmap"
"knative.dev/pkg/controller"

eventingv1alpha1 "github.com/triggermesh/triggermesh-core/pkg/apis/eventing/v1alpha1"
rbinformer "github.com/triggermesh/triggermesh-core/pkg/client/generated/injection/informers/eventing/v1alpha1/redisbroker"
rbreconciler "github.com/triggermesh/triggermesh-core/pkg/client/generated/injection/reconciler/eventing/v1alpha1/redisbroker"
)
Expand All @@ -19,13 +26,38 @@ func NewController(
ctx context.Context,
cmw configmap.Watcher,
) *controller.Impl {
rbInformer := rbinformer.Get(ctx)

r := &Reconciler{}
rbInformer := rbinformer.Get(ctx)
deploymentInformer := deployment.Get(ctx)
serviceInformer := service.Get(ctx)
serviceAccountInformer := serviceaccount.Get(ctx)
roleBindingInformer := rolebinding.Get(ctx)

r := &Reconciler{
kubeClientSet: kubeclient.Get(ctx),
redisReconciler: newRedisReconciler(ctx, deploymentInformer.Lister()),
serviceLister: serviceInformer.Lister(),
serviceAccountLister: serviceAccountInformer.Lister(),
roleBindingLister: roleBindingInformer.Lister(),
}

impl := rbreconciler.NewImpl(ctx, r)

rbInformer.Informer().AddEventHandler(controller.HandleAll(impl.Enqueue))

deploymentInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: controller.FilterController(&eventingv1alpha1.RedisBroker{}),
Handler: controller.HandleAll(impl.EnqueueControllerOf),
})

serviceInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: controller.FilterController(&eventingv1alpha1.RedisBroker{}),
Handler: controller.HandleAll(impl.EnqueueControllerOf),
})
serviceAccountInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: controller.FilterController(&eventingv1alpha1.RedisBroker{}),
Handler: controller.HandleAll(impl.EnqueueControllerOf),
})

return impl
}
74 changes: 74 additions & 0 deletions pkg/reconciler/redisbroker/reconcile_redis.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package redisbroker

import (
"context"

"go.uber.org/zap"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
apierrs "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
appsv1listers "k8s.io/client-go/listers/apps/v1"
k8sclient "knative.dev/pkg/client/injection/kube/client"
"knative.dev/pkg/logging"
pkgreconciler "knative.dev/pkg/reconciler"

eventingv1alpha1 "github.com/triggermesh/triggermesh-core/pkg/apis/eventing/v1alpha1"
"github.com/triggermesh/triggermesh-core/pkg/reconciler"
"github.com/triggermesh/triggermesh-core/pkg/reconciler/resources"
)

type redisReconciler struct {
client kubernetes.Interface
deploymentLister appsv1listers.DeploymentLister
}

func newRedisReconciler(ctx context.Context, deploymentLister appsv1listers.DeploymentLister) redisReconciler {
return redisReconciler{
client: k8sclient.Get(ctx),
deploymentLister: deploymentLister,
}
}

func (r *redisReconciler) Reconcile(ctx context.Context, rb *eventingv1alpha1.RedisBroker) (*appsv1.Deployment, error) {

desired := buildRedisDeployment(rb)
current, err := r.deploymentLister.Deployments(desired.Namespace).Get(desired.Name)
switch {
case err == nil:
// TODO compare
// TODO if equal return
// continue
case apierrs.IsNotFound(err):
// continue
default:
fullname := types.NamespacedName{Namespace: desired.Namespace, Name: desired.Name}
logging.FromContext(ctx).Error("Unable to get the deployment", zap.String("deployment", fullname.String()), zap.Error(err))
rb.Status.MarkRedisBrokerFailed("DeploymentGetFailed", "Failed to get Redis deployment")

return nil, pkgreconciler.NewEvent(corev1.EventTypeWarning, reconciler.ReasonDeploymentGet,
"Failed to get Redis deployment %s: %w", fullname, err)
}

// Create

// TODO Update statuses

return current, nil
}

func buildRedisDeployment(rb *eventingv1alpha1.RedisBroker) *appsv1.Deployment {
return resources.NewDeployment(rb.Namespace, rb.Name+"-redis-server",
resources.DeploymentWithMetaOptions(
resources.MetaAddLabel("app", "redis-server"),
resources.MetaAddLabel("eventing.triggermesh.io/redis-name", rb.Name+"-redis-server"),
resources.MetaAddOwner(rb, rb.GetGroupVersionKind())),
resources.DeploymentAddSelectorForTemplate("eventing.triggermesh.io/redis-name", rb.Name+"-redis-server"),
resources.DeploymentSetReplicas(1),
resources.DeploymentWithTemplateOption(
resources.PodSpecAddContainer(
resources.NewContainer("redis", "redis/redis-stack-server:latest",
resources.ContainerAddEnvFromValue("REDIS_ARGS", "--appendonly yes"),
resources.ContainerAddPort("redis", 6379)))))
}
29 changes: 29 additions & 0 deletions pkg/reconciler/redisbroker/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,43 @@ package redisbroker
import (
"context"

"go.uber.org/zap"
"k8s.io/client-go/kubernetes"
corev1listers "k8s.io/client-go/listers/core/v1"
rbacv1listers "k8s.io/client-go/listers/rbac/v1"

"knative.dev/pkg/logging"
"knative.dev/pkg/reconciler"

eventingv1alpha1 "github.com/triggermesh/triggermesh-core/pkg/apis/eventing/v1alpha1"
)

type Reconciler struct {
kubeClientSet kubernetes.Interface
redisReconciler redisReconciler
serviceLister corev1listers.ServiceLister
serviceAccountLister corev1listers.ServiceAccountLister
roleBindingLister rbacv1listers.RoleBindingLister
}

func (r *Reconciler) ReconcileKind(ctx context.Context, rb *eventingv1alpha1.RedisBroker) reconciler.Event {
logging.FromContext(ctx).Infow("Reconciling", zap.Any("Broker", *rb))

// Clean any dangling resources

// Iterate triggers and create secret

// Make sure the Redis deployment exists and propagate the status to the Channel
_, err := r.redisReconciler.Reconcile(ctx, rb)
if err != nil {
return err
}

// create service for redis

// create deployment for broker

// create service for broker

return nil
}
29 changes: 29 additions & 0 deletions pkg/reconciler/resources/common_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright 2022 TriggerMesh Inc.
// SPDX-License-Identifier: Apache-2.0
package resources

import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

const (
tName = "test-name"
tNamespace = "test-namespace"
tImage = "triggermesh/test:v1"
)

var (
tTrue = true

tPod = corev1.Pod{
TypeMeta: metav1.TypeMeta{
Kind: "Pod",
APIVersion: corev1.SchemeGroupVersion.String(),
},
ObjectMeta: metav1.ObjectMeta{
Name: tName,
Namespace: tNamespace,
},
}
)
Loading