Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions config/core/resources/requestreply.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,6 @@ spec:
replyAttribute:
description: The name of the cloudevents attribute which will hold the correlation id for an event which will be treated as a reply.
type: string
secrets:
description: A list of the names of one or more secrets used to sign the correlation ids and reply ids. The secrets must be in the same namespace as the requestreply resource.
type: array
items:
type: string
timeout:
description: A ISO8601 string representing how long RequestReply holds onto an incoming request before it times out without a reply.
type: string
Expand Down Expand Up @@ -141,6 +136,12 @@ spec:
type:
description: Type of condition.
type: string
desiredReplicas:
description: The current replicas (StatefulSet pod + trigger) that are desired
type: integer
readyReplicas:
description: The current replicas (StatefulSet pod + trigger) that are ready
type: integer
address:
description: RequestReply is Addressable. It exposes the endpoint as an URI to get events delivered.
type: object
Expand Down Expand Up @@ -198,6 +199,8 @@ spec:
kind: RequestReply
plural: requestreplies
singular: requestreply
shortNames:
- rr
categories:
- all
- knative
Expand Down
42 changes: 22 additions & 20 deletions docs/eventing-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -3021,16 +3021,6 @@ DeliverySpec
<td>
</td>
</tr>
<tr>
<td>
<code>secrets</code><br/>
<em>
[]string
</em>
</td>
<td>
</td>
</tr>
</table>
</td>
</tr>
Expand Down Expand Up @@ -3748,16 +3738,6 @@ DeliverySpec
<td>
</td>
</tr>
<tr>
<td>
<code>secrets</code><br/>
<em>
[]string
</em>
</td>
<td>
</td>
</tr>
</tbody>
</table>
<h3 id="eventing.knative.dev/v1alpha1.RequestReplyStatus">RequestReplyStatus
Expand Down Expand Up @@ -3829,6 +3809,28 @@ AppliedEventPoliciesStatus
<p>AppliedEventPoliciesStatus contains the list of EventPolicies which apply to this Broker.</p>
</td>
</tr>
<tr>
<td>
<code>desiredReplicas</code><br/>
<em>
int32
</em>
</td>
<td>
<p>DesiredReplicas is the number of replicas (StatefulSet pod + trigger) that is desired</p>
</td>
</tr>
<tr>
<td>
<code>readyReplicas</code><br/>
<em>
int32
</em>
</td>
<td>
<p>ReadyReplicas is the number of ready replicas (StatefulSet pod + trigger) for this RequestReply resource</p>
</td>
</tr>
</tbody>
</table>
<hr/>
Expand Down
7 changes: 7 additions & 0 deletions pkg/apis/eventing/v1alpha1/requestreply_defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ import (
func (rr *RequestReply) SetDefaults(ctx context.Context) {
ctx = apis.WithinParent(ctx, rr.ObjectMeta)
rr.Spec.SetDefaults(ctx)

if rr.Labels == nil {
rr.Labels = make(map[string]string)
}

rr.Labels["eventing.knative.dev/broker"] = rr.Spec.BrokerRef.Name
}

func (rrs *RequestReplySpec) SetDefaults(ctx context.Context) {
Expand All @@ -41,4 +47,5 @@ func (rrs *RequestReplySpec) SetDefaults(ctx context.Context) {
if rrs.ReplyAttribute == "" {
rrs.ReplyAttribute = "replyid"
}

}
17 changes: 14 additions & 3 deletions pkg/apis/eventing/v1alpha1/requestreply_defaults_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"testing"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/ptr"

"github.com/google/go-cmp/cmp"
Expand All @@ -33,8 +34,13 @@ func TestRequestReplyDefaults(t *testing.T) {
"nil spec": {
initial: RequestReply{},
expected: RequestReply{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"eventing.knative.dev/broker": "",
},
},
Spec: RequestReplySpec{
Timeout: ptr.To("30s"),
Timeout: ptr.To("PT30S"),
CorrelationAttribute: "correlationid",
ReplyAttribute: "replyid",
},
Expand All @@ -43,14 +49,19 @@ func TestRequestReplyDefaults(t *testing.T) {
"does not override existing values": {
initial: RequestReply{
Spec: RequestReplySpec{
Timeout: ptr.To("40s"),
Timeout: ptr.To("PT40S"),
CorrelationAttribute: "othercorrelationid",
ReplyAttribute: "otherreplyid",
},
},
expected: RequestReply{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"eventing.knative.dev/broker": "",
},
},
Spec: RequestReplySpec{
Timeout: ptr.To("40s"),
Timeout: ptr.To("PT40S"),
CorrelationAttribute: "othercorrelationid",
ReplyAttribute: "otherreplyid",
},
Expand Down
24 changes: 14 additions & 10 deletions pkg/apis/eventing/v1alpha1/requestreply_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ import (
v1 "knative.dev/pkg/apis/duck/v1"
)

var requestReplyCondSet = apis.NewLivingConditionSet(RequestReplyConditionIngress, RequestReplyConditionTriggers, RequestReplyConditionAddressable, RequestReplyConditionEventPoliciesReady)
var requestReplyCondSet = apis.NewLivingConditionSet(RequestReplyConditionTriggers, RequestReplyConditionAddressable, RequestReplyConditionEventPoliciesReady, RequestReplyConditionBrokerReady)

const (
RequestReplyConditionReady = apis.ConditionReady
RequestReplyConditionIngress apis.ConditionType = "IngressReady"
RequestReplyConditionTriggers apis.ConditionType = "TriggersReady"
RequestReplyConditionAddressable apis.ConditionType = "Addressable"
RequestReplyConditionEventPoliciesReady apis.ConditionType = "EventPoliciesReady"
RequestReplyConditionBrokerReady apis.ConditionType = "BrokerReady"
)

// GetConditionSet retrieves the condition set for this resource. Implements the KRShaped interface.
Expand Down Expand Up @@ -81,14 +81,6 @@ func (rr *RequestReplyStatus) MarkTriggersNotReadyWithReason(reason, messageForm
rr.GetConditionSet().Manage(rr).MarkUnknown(RequestReplyConditionTriggers, reason, messageFormat, messageA...)
}

func (rr *RequestReplyStatus) MarkIngressReady() {
rr.GetConditionSet().Manage(rr).MarkTrue(RequestReplyConditionIngress)
}

func (rr *RequestReplyStatus) MarkIngressNotReadyWithReason(reason, messageFormat string, messageA ...interface{}) {
rr.GetConditionSet().Manage(rr).MarkUnknown(RequestReplyConditionIngress, reason, messageFormat, messageA...)
}

func (rr *RequestReplyStatus) MarkEventPoliciesTrue() {
rr.GetConditionSet().Manage(rr).MarkTrue(RequestReplyConditionEventPoliciesReady)
}
Expand All @@ -104,3 +96,15 @@ func (rr *RequestReplyStatus) MarkEventPoliciesFailed(reason, messageFormat stri
func (rr *RequestReplyStatus) MarkEventPoliciesUnknown(reason, messageFormat string, messageA ...interface{}) {
rr.GetConditionSet().Manage(rr).MarkUnknown(RequestReplyConditionEventPoliciesReady, reason, messageFormat, messageA...)
}

func (rr *RequestReplyStatus) MarkBrokerReady() {
rr.GetConditionSet().Manage(rr).MarkTrue(RequestReplyConditionBrokerReady)
}

func (rr *RequestReplyStatus) MarkBrokerNotReady(reason, messageFormat string, messageA ...interface{}) {
rr.GetConditionSet().Manage(rr).MarkFalse(RequestReplyConditionBrokerReady, reason, messageFormat, messageA...)
}

func (rr *RequestReplyStatus) MarkBrokerUnknown(reason, messageFormat string, messageA ...interface{}) {
rr.GetConditionSet().Manage(rr).MarkUnknown(RequestReplyConditionBrokerReady, reason, messageFormat, messageA...)
}
56 changes: 18 additions & 38 deletions pkg/apis/eventing/v1alpha1/requestreply_lifecycle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,11 @@ func TestRequestReplyInitializeConditions(t *testing.T) {
Status: corev1.ConditionUnknown,
},
{
Type: RequestReplyConditionEventPoliciesReady,
Type: RequestReplyConditionBrokerReady,
Status: corev1.ConditionUnknown,
},
{
Type: RequestReplyConditionIngress,
Type: RequestReplyConditionEventPoliciesReady,
Status: corev1.ConditionUnknown,
},
{
Expand Down Expand Up @@ -124,28 +124,28 @@ func TestRequestReplyReadyCondition(t *testing.T) {
name string
rrs *RequestReplyStatus
markAddresableSucceeded *bool
markIngressReadySucceeded *bool
markTriggersReadySucceeded *bool
markEventPoliciesReadySucceeded *bool
markBrokerReady *bool
wantReady bool
}{
{
name: "Initially everything is Unknown, Auth&SubjectsResolved marked as true, EP should become Ready",
name: "Initially everything is Unknown, Auth&SubjectsResolved marked as true, RR should become Ready",
rrs: &RequestReplyStatus{
Status: duckv1.Status{
Conditions: []apis.Condition{
{Type: RequestReplyConditionReady, Status: corev1.ConditionUnknown},
{Type: RequestReplyConditionAddressable, Status: corev1.ConditionUnknown},
{Type: RequestReplyConditionIngress, Status: corev1.ConditionUnknown},
{Type: RequestReplyConditionTriggers, Status: corev1.ConditionUnknown},
{Type: RequestReplyConditionEventPoliciesReady, Status: corev1.ConditionUnknown},
{Type: RequestReplyConditionBrokerReady, Status: corev1.ConditionUnknown},
},
},
},
markAddresableSucceeded: ptr.To(true),
markIngressReadySucceeded: ptr.To(true),
markTriggersReadySucceeded: ptr.To(true),
markEventPoliciesReadySucceeded: ptr.To(true),
markBrokerReady: ptr.To(true),
wantReady: true,
},
{
Expand All @@ -155,35 +155,16 @@ func TestRequestReplyReadyCondition(t *testing.T) {
Conditions: []apis.Condition{
{Type: RequestReplyConditionReady, Status: corev1.ConditionTrue},
{Type: RequestReplyConditionAddressable, Status: corev1.ConditionTrue},
{Type: RequestReplyConditionIngress, Status: corev1.ConditionTrue},
{Type: RequestReplyConditionTriggers, Status: corev1.ConditionTrue},
{Type: RequestReplyConditionEventPoliciesReady, Status: corev1.ConditionTrue},
{Type: RequestReplyConditionBrokerReady, Status: corev1.ConditionTrue},
},
},
},
markAddresableSucceeded: ptr.To(false),
markIngressReadySucceeded: ptr.To(true),
markTriggersReadySucceeded: ptr.To(true),
markEventPoliciesReadySucceeded: ptr.To(true),
wantReady: false,
},
{
name: "Initially everything is Ready, Ingress set to false, RR should become False",
rrs: &RequestReplyStatus{
Status: duckv1.Status{
Conditions: []apis.Condition{
{Type: RequestReplyConditionReady, Status: corev1.ConditionTrue},
{Type: RequestReplyConditionAddressable, Status: corev1.ConditionTrue},
{Type: RequestReplyConditionIngress, Status: corev1.ConditionTrue},
{Type: RequestReplyConditionTriggers, Status: corev1.ConditionTrue},
{Type: RequestReplyConditionEventPoliciesReady, Status: corev1.ConditionTrue},
},
},
},
markAddresableSucceeded: ptr.To(true),
markIngressReadySucceeded: ptr.To(false),
markTriggersReadySucceeded: ptr.To(true),
markEventPoliciesReadySucceeded: ptr.To(true),
markBrokerReady: ptr.To(true),
wantReady: false,
},
{
Expand All @@ -193,16 +174,16 @@ func TestRequestReplyReadyCondition(t *testing.T) {
Conditions: []apis.Condition{
{Type: RequestReplyConditionReady, Status: corev1.ConditionTrue},
{Type: RequestReplyConditionAddressable, Status: corev1.ConditionTrue},
{Type: RequestReplyConditionIngress, Status: corev1.ConditionTrue},
{Type: RequestReplyConditionTriggers, Status: corev1.ConditionTrue},
{Type: RequestReplyConditionEventPoliciesReady, Status: corev1.ConditionTrue},
{Type: RequestReplyConditionBrokerReady, Status: corev1.ConditionTrue},
},
},
},
markAddresableSucceeded: ptr.To(true),
markIngressReadySucceeded: ptr.To(true),
markTriggersReadySucceeded: ptr.To(false),
markEventPoliciesReadySucceeded: ptr.To(true),
markBrokerReady: ptr.To(true),
wantReady: false,
},
{
Expand All @@ -212,14 +193,13 @@ func TestRequestReplyReadyCondition(t *testing.T) {
Conditions: []apis.Condition{
{Type: RequestReplyConditionReady, Status: corev1.ConditionTrue},
{Type: RequestReplyConditionAddressable, Status: corev1.ConditionTrue},
{Type: RequestReplyConditionIngress, Status: corev1.ConditionTrue},
{Type: RequestReplyConditionTriggers, Status: corev1.ConditionTrue},
{Type: RequestReplyConditionEventPoliciesReady, Status: corev1.ConditionTrue},
{Type: RequestReplyConditionBrokerReady, Status: corev1.ConditionTrue},
},
},
},
markAddresableSucceeded: ptr.To(true),
markIngressReadySucceeded: ptr.To(true),
markTriggersReadySucceeded: ptr.To(true),
markEventPoliciesReadySucceeded: ptr.To(false),
wantReady: false,
Expand All @@ -236,13 +216,6 @@ func TestRequestReplyReadyCondition(t *testing.T) {

}
}
if test.markIngressReadySucceeded != nil {
if *test.markIngressReadySucceeded {
test.rrs.MarkIngressReady()
} else {
test.rrs.MarkIngressNotReadyWithReason("", "")
}
}
if test.markTriggersReadySucceeded != nil {
if *test.markTriggersReadySucceeded {
test.rrs.MarkTriggersReady()
Expand All @@ -257,6 +230,13 @@ func TestRequestReplyReadyCondition(t *testing.T) {
test.rrs.MarkEventPoliciesFailed("", "")
}
}
if test.markBrokerReady != nil {
if *test.markBrokerReady {
test.rrs.MarkBrokerReady()
} else {
test.rrs.MarkBrokerNotReady("", "")
}
}
rr := RequestReply{Status: *test.rrs}
got := rr.GetConditionSet().Manage(test.rrs).IsHappy()
if test.wantReady != got {
Expand Down
8 changes: 6 additions & 2 deletions pkg/apis/eventing/v1alpha1/requestreply_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,6 @@ type RequestReplySpec struct {
Timeout *string `json:"timeout,omitempty"`

Delivery *eventingduckv1.DeliverySpec `json:"delivery,omitempty"`

Secrets []string `json:"secrets"`
}

// RequestReplyStatus represents the current state of a RequestReply.
Expand All @@ -95,6 +93,12 @@ type RequestReplyStatus struct {
// AppliedEventPoliciesStatus contains the list of EventPolicies which apply to this Broker.
// +optional
eventingduckv1.AppliedEventPoliciesStatus `json:",inline"`

// DesiredReplicas is the number of replicas (StatefulSet pod + trigger) that is desired
DesiredReplicas *int32 `json:"desiredReplicas,omitempty"`

// ReadyReplicas is the number of ready replicas (StatefulSet pod + trigger) for this RequestReply resource
ReadyReplicas *int32 `json:"readyReplicas,omitempty"`
}

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
Expand Down
13 changes: 6 additions & 7 deletions pkg/apis/eventing/v1alpha1/requestreply_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,13 @@ func (rrs *RequestReplySpec) Validate(ctx context.Context) *apis.FieldError {

if rrs.Timeout != nil {
timeout, err := period.Parse(*rrs.Timeout)
if err != nil || timeout.IsZero() || timeout.IsNegative() {
errs = errs.Also(apis.ErrInvalidValue(*rrs.Timeout, "timeout"))
if err != nil {
errs = errs.Also(apis.ErrInvalidValue(*rrs.Timeout, "timeout", err.Error()))
Comment thread
creydr marked this conversation as resolved.
} else if timeout.IsNegative() || timeout.IsZero() {
errs = errs.Also(apis.ErrInvalidValue(*rrs.Timeout, "timeout", "timeout must be a positive duration"))
}

}

if len(rrs.Secrets) == 0 {
errs = errs.Also(apis.ErrInvalidValue(rrs.Secrets, "secrets", "one or more secrets must be provided"))
} else {
errs = errs.Also(apis.ErrMissingField("timeout"))
}

if rrs.CorrelationAttribute == "" ||
Expand Down
Loading
Loading