Skip to content

Add streaming capabilities to both the listen task and to the asyncapi (subscribe operation) call #1070

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 99 additions & 6 deletions dsl-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,9 @@
+ [Container Lifetime](#container-lifetime)
+ [Process Result](#process-result)
+ [AsyncAPI Server](#asyncapi-server)
+ [AsyncAPI Message](#asyncapi-message)
+ [AsyncAPI Outbound Message](#asyncapi-outbound-message)
+ [AsyncAPI Subscription](#asyncapi-subscription)
+ [Subscription Iterator](#subscription-iterator)

## Abstract

Expand Down Expand Up @@ -311,7 +312,7 @@ The [AsyncAPI Call](#asyncapi-call) enables workflows to interact with external
| operation | `string` | `yes` | A reference to the AsyncAPI [operation](https://www.asyncapi.com/docs/reference/specification/v3.0.0#operationObject) to call.<br>*Used only in case the referenced document uses AsyncAPI `v3.0.0`.* |
| server | [`asyncApiServer`](#asyncapi-server) | `no` | An object used to configure to the [server](https://www.asyncapi.com/docs/reference/specification/v3.0.0#serverObject) to call the specified AsyncAPI [operation](https://www.asyncapi.com/docs/reference/specification/v3.0.0#operationObject) on.<br>If not set, default to the first [server](https://www.asyncapi.com/docs/reference/specification/v3.0.0#serverObject) matching the operation's channel. |
| protocol | `string` | `no` | The [protocol](https://www.asyncapi.com/docs/reference/specification/v3.0.0#definitionsProtocol) to use to select the target [server](https://www.asyncapi.com/docs/reference/specification/v3.0.0#serverObject). <br>Ignored if `server` has been set.<br>*Supported values are: `amqp`, `amqp1`, `anypointmq`, `googlepubsub`, `http`, `ibmmq`, `jms`, `kafka`, `mercure`, `mqtt`, `mqtt5`, `nats`, `pulsar`, `redis`, `sns`, `solace`, `sqs`, `stomp` and `ws`* |
| message | [`asyncApiMessage`](#asyncapi-message) | `no` | An object used to configure the message to publish using the target operation.<br>*Required if `subscription` has not been set.* |
| message | [`asyncApiMessage`](#asyncapi-outbound-message) | `no` | An object used to configure the message to publish using the target operation.<br>*Required if `subscription` has not been set.* |
| subscription | [`asyncApiSubscription`](#asyncapi-subscription) | `no` | An object used to configure the subscription to messages consumed using the target operation.<br>*Required if `message` has not been set.* |
| authentication | `string`<br>[`authentication`](#authentication) | `no` | The authentication policy, or the name of the authentication policy, to use when calling the AsyncAPI operation. |

Expand Down Expand Up @@ -650,7 +651,19 @@ Provides a mechanism for workflows to await and react to external events, enabli

| Name | Type | Required | Description|
|:--|:---:|:---:|:---|
| listen.to | [`eventConsumptionStrategy`](#event-consumption-strategy) | `yes` | Configures the event(s) the workflow must listen to. |
| listen.to | [`eventConsumptionStrategy`](#event-consumption-strategy) | `yes` | Configures the [event(s)](https://cloudevents.io/) the workflow must listen to. |
| listen.read | `string` | `no` | Specifies how [events](https://cloudevents.io/) are read during the listen operation.<br>*Supported values are:*<br>*- `data`: Reads the [event's](https://cloudevents.io/) data.*<br>*- `envelope`: Reads the [event's](https://cloudevents.io/) envelope, including its [context attributes](https://github.com/cloudevents/spec/blob/main/cloudevents/spec.md#context-attributes).*<br>*- `raw`: Reads the [event's](https://cloudevents.io/) raw data.*<br>*Defaults to `data`.*|
| foreach | [`subscriptionIterator`](#subscription-iterator) | `no` | Configures the iterator, if any, for processing each consumed [event](https://cloudevents.io/). |

> [!NOTE]
> A `listen` task produces a sequentially ordered array of all the [events](https://cloudevents.io/) it has consumed, and potentially transformed using `foreach.output.as`.

> [!NOTE]
> When `foreach` is set, the configured operations for a [events](https://cloudevents.io/) must complete before moving on to the next one. As a result, consumed [events](https://cloudevents.io/) should be stored in a First-In-First-Out (FIFO) queue while awaiting iteration.

> [!WARNING]
> [Events](https://cloudevents.io/) consumed by an `until` clause should not be included in the task's output. These [events](https://cloudevents.io/) are used solely to determine when the until condition has been met, and they do not contribute to the result or data produced by the task itself


##### Examples

Expand Down Expand Up @@ -2038,7 +2051,7 @@ do:
bar: baz
```

### AsyncAPI Message
### AsyncAPI Outbound Message

Configures an AsyncAPI message to publish.

Expand Down Expand Up @@ -2073,6 +2086,29 @@ do:
bar: baz
```

### AsyncAPI Inbound Message

Configures an AsyncAPI message consumed by a subscription.

#### Properties

| Name | Type | Required | Description |
|:-------|:------:|:----------:|:--------------|
| payload | `object` | `no` | The message's payload, if any. |
| headers | `object` | `no` | The message's headers, if any. |
| correlationId | `string` | `no` | The message's correlation id, if any. |

#### Examples

```yaml
payload:
greetings: Hello, World!
headers:
foo: bar
bar: baz
correlationid: '123456'
```

### AsyncAPI Subscription

Configures a subscription to an AsyncAPI operation.
Expand All @@ -2081,8 +2117,15 @@ Configures a subscription to an AsyncAPI operation.

| Name | Type | Required | Description |
|:-------|:------:|:----------:|:--------------|
| filter | `string` | `no` | A [runtime expression](dsl.md#runtime-expressions), if any, used to filter consumed messages. |
| filter | `string` | `no` | A [runtime expression](dsl.md#runtime-expressions), if any, used to filter consumed [messages](#asyncapi-inbound-message). |
| consume | [`subscriptionLifetime`](#asyncapi-subscription-lifetime) | `yes` | An object used to configure the subscription's lifetime. |
| foreach | [`subscriptionIterator`](#subscription-iterator) | `no` | Configures the iterator, if any, for processing each consumed [message](#asyncapi-inbound-message). |

> [!NOTE]
> An AsyncAPI subscribe operation call produces a sequentially ordered array of all the [messages](#asyncapi-inbound-message) it has consumed, and potentially transformed using `foreach.output.as`.

> [!NOTE]
> When `foreach` is set, the configured operations for a [message](#asyncapi-inbound-message) must complete before moving on to the next one. As a result, consumed [messages](#asyncapi-inbound-message) should be stored in a First-In-First-Out (FIFO) queue while awaiting iteration.

#### Examples

Expand Down Expand Up @@ -2115,7 +2158,7 @@ Configures the lifetime of an AsyncAPI subscription
#### Properties

| Name | Type | Required | Description |
|:-------|:------:|:----------:|:--------------|
|:-----|:----:|:--------:|:------------|
| amount | `integer` | `no` | The amount of messages to consume.<br>*Required if `while` and `until` have not been set.* |
| for | [`duration`](#duration) | `no` | The [`duration`](#duration) that defines for how long to consume messages. |
| while | `string` | `no` | A [runtime expression](dsl.md#runtime-expressions), if any, used to determine whether or not to keep consuming messages.<br>*Required if `amount` and `until` have not been set.* |
Expand Down Expand Up @@ -2143,4 +2186,54 @@ do:
until: '${ ($context.messages | length) == 5 }'
for:
seconds: 10
```

### Subscription Iterator

Configures the iteration over each item (event or message) consumed by a subscription. It encapsulates configuration for processing tasks, output formatting, and export behavior for every item encountered.

#### Properties

| Name | Type | Required | Description |
|:-----|:----:|:--------:|:------------|
| item | `string` | `no` | The name of the variable used to store the current item being enumerated.<br>*Defaults to `item`.* |
| at | `string` | `no` | The name of the variable used to store the index of the current item being enumerated.<br>*Defaults to `index`.* |
| do | [`map[string, task][]`](#task) | `no` | The tasks to perform for each consumed item. |
| output | [`output`](#output) | `no` | An object, if any, used to customize the item's output and to document its schema. |
| export | [`export`](#export) | `no` | An object, if any, used to customize the content of the workflow context. |

#### Examples

```yaml
document:
dsl: '1.0.0-alpha5'
namespace: test
name: asyncapi-example
version: '0.1.0'
do:
- subscribeToChatInboxUntil:
call: asyncapi
with:
document:
endpoint: https://fake.com/docs/asyncapi.json
operation: chat-inbox
protocol: http
subscription:
filter: ${ . == $workflow.input.chat.roomId }
consume:
until: '${ ($context.messages | length) == 5 }'
for:
seconds: 10
foreach:
item: message
at: index
do:
- emitEvent:
emit:
event:
with:
source: https://serverlessworkflow.io/samples
type: io.serverlessworkflow.samples.asyncapi.message.consumed.v1
data:
message: '${ $message }'
```
28 changes: 28 additions & 0 deletions examples/call-asyncapi-subscribe-consume-forever-foreach.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
document:
dsl: '1.0.0-alpha5'
namespace: examples
name: bearer-auth
version: '0.1.0'
do:
- getNotifications:
call: asyncapi
with:
document:
endpoint: https://fake.com/docs/asyncapi.json
operation: getNotifications
subscription:
filter: '${ .correlationId == $context.userId and .payload.from.firstName == $context.contact.firstName and .payload.from.lastName == $context.contact.lastName }'
consume:
while: '${ true }'
foreach:
item: message
do:
- publishCloudEvent:
emit:
event:
with:
source: https://serverlessworkflow.io/samples
type: io.serverlessworkflow.samples.asyncapi.message.consumed.v1
data:
message: '${ $message }'

17 changes: 17 additions & 0 deletions examples/listen-to-all read-envelope.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
document:
dsl: '1.0.0-alpha5'
namespace: test
name: listen-to-all-read-envelope
version: '0.1.0'
do:
- callDoctor:
listen:
to:
all:
- with:
type: com.fake-hospital.vitals.measurements.temperature
data: ${ .temperature > 38 }
- with:
type: com.fake-hospital.vitals.measurements.bpm
data: ${ .bpm < 60 or .bpm > 100 }
read: envelope
22 changes: 22 additions & 0 deletions examples/listen-to-any-forever-foreach.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
document:
dsl: '1.0.0-alpha1'
namespace: test
name: listen-to-any-while-foreach
version: '0.1.0'
do:
- listenToGossips:
listen:
to:
any: []
until: '${ false }'
foreach:
item: event
at: i
do:
- postToChatApi:
call: http
with:
method: post
endpoint: https://fake-chat-api.com/room/{roomId}
body:
event: ${ $event }
42 changes: 42 additions & 0 deletions schema/workflow.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,17 @@ $defs:
$ref: '#/$defs/eventConsumptionStrategy'
title: ListenTo
description: Defines the event(s) to listen to.
read:
type: string
enum: [ data, envelope, raw ]
default: data
title: ListenAndReadAs
description: Specifies how events are read during the listen operation.
required: [ to ]
foreach:
$ref: '#/$defs/subscriptionIterator'
title: ListenIterator
description: Configures the iterator, if any, for processing consumed event(s).
raiseTask:
type: object
$ref: '#/$defs/taskBase'
Expand Down Expand Up @@ -1710,6 +1720,10 @@ $defs:
$ref: '#/$defs/asyncApiMessageConsumptionPolicy'
title: AsyncApiMessageConsumptionPolicy
description: An object used to configure the subscription's message consumption policy.
foreach:
$ref: '#/$defs/subscriptionIterator'
title: AsyncApiSubscriptionIterator
description: Configures the iterator, if any, for processing consumed messages(s).
required: [ consume ]
asyncApiMessageConsumptionPolicy:
type: object
Expand Down Expand Up @@ -1740,3 +1754,31 @@ $defs:
title: AsyncApiMessageConsumptionPolicyUntil
description: A runtime expression evaluated before each consumed (filtered) message to decide if message consumption should continue.
required: [ until ]
subscriptionIterator:
type: object
title: SubscriptionIterator
description: Configures the iteration over each item (event or message) consumed by a subscription.
unevaluatedProperties: false
properties:
item:
type: string
title: SubscriptionIteratorItem
description: The name of the variable used to store the current item being enumerated.
default: item
at:
type: string
title: SubscriptionIteratorIndex
description: The name of the variable used to store the index of the current item being enumerated.
default: index
do:
$ref: '#/$defs/taskList'
title: SubscriptionIteratorTasks
description: The tasks to perform for each consumed item.
output:
$ref: '#/$defs/output'
title: SubscriptionIteratorOutput
description: An object, if any, used to customize the item's output and to document its schema.
export:
$ref: '#/$defs/export'
title: SubscriptionIteratorExport
description: An object, if any, used to customize the content of the workflow context.
Loading