Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions examples/amqp-proton/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# AMQP 1.0 + CloudEvents sample
This example uses the vertx-proton library and the AMQP 1.0 protocol binding for cloud events to implement a client and server that communicates over AMQP to exchange cloud event messages.

The vertx-proton library makes it easy to create (reactive) AMQP clients and servers and is a wrapper around Qpid proton--a library used to implement the AMQP 1.0 protocol binding for cloud events.

## Build

```shell
mvn package
```

## Start AMQP Server
Starts the AMQP server on `127.0.0.1` to listen for incoming connections on the default (insecure) AMQP port `5672`. Once the server is started, it can either receive or send messages to a connected client. The opening and closing of a connection is initiated by a client.

```shell
mvn exec:java -Dexec.mainClass="io.cloudevents.examples.amqp.vertx.AmqpServer"
```

## Start AMQP Client
The client simply opens a connection with the server and either sends or receives a message containing a cloud event. Once the client completes sending or receiving a message, it initiates the closing of the connection by emitting the `AMQP CLOSE frame`.

Send a message to the server.

```shell
mvn exec:java -Dexec.mainClass="io.cloudevents.examples.amqp.vertx.AmqpClient" -Dexec.args="send"
```

Receive a message from the server.

```shell
mvn exec:java -Dexec.mainClass="io.cloudevents.examples.amqp.vertx.AmqpClient" -Dexec.args="receive"
```
34 changes: 34 additions & 0 deletions examples/amqp-proton/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>cloudevents-examples</artifactId>
<groupId>io.cloudevents</groupId>
<version>2.0.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>cloudevents-amqp-proton-example</artifactId>

<properties>
<vertx.version>4.0.0.Beta1</vertx.version>
</properties>
<dependencies>
<dependency>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-amqp-proton</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-core</artifactId>
<version>${vertx.version}</version>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-proton</artifactId>
<version>${vertx.version}</version>
</dependency>

</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
package io.cloudevents.examples.amqp.vertx;

import java.io.PrintWriter;

import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.message.Message;

import io.cloudevents.CloudEvent;
import io.cloudevents.amqp.ProtonAmqpMessageFactory;
import io.cloudevents.core.message.MessageReader;
import io.cloudevents.core.v1.CloudEventBuilder;
import io.cloudevents.core.v1.CloudEventV1;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.json.JsonObject;
import io.vertx.proton.ProtonClient;
import io.vertx.proton.ProtonClientOptions;
import io.vertx.proton.ProtonConnection;
import io.vertx.proton.ProtonMessageHandler;
import io.vertx.proton.ProtonQoS;
import io.vertx.proton.ProtonReceiver;
import io.vertx.proton.ProtonSender;

/**
* A example vertx-based AMQP client that interacts with a remote AMQP server to send and receive CloudEvent messages.
*/
public class AmqpClient {

private static ProtonConnection connection;

private static final String SERVER_HOST = "127.0.0.1";
private static final int SERVER_PORT = 5672;
private static final String SEND_MESSAGE = "send";
private static final String RECEIVE_MESSAGE = "receive";

final static Vertx VERTX = Vertx.vertx();
private static PrintWriter writer = new PrintWriter(System.out, true);

public static void main(String args[]) {

if (args.length < 1) {
writer.println("Usage: AmqpClient [send|receive]");
return;
}

final String action = args[0].toLowerCase();

switch (action) {

case SEND_MESSAGE:
sendMessage();
break;

case RECEIVE_MESSAGE:
receiveMessage();
break;

default:
writer.println("Unknown action");
}
}

private static void sendMessage() {
connectToServer(SERVER_HOST, SERVER_PORT)
.compose(conn -> {
connection = conn;
writer.printf("[Client] Connected to %s:%s", SERVER_HOST, SERVER_PORT);

return openSenderLink();
}).onSuccess(sender -> {

final JsonObject payload = new JsonObject().put("temp", 50);

final CloudEvent event = new CloudEventBuilder()
.withAttribute(CloudEventV1.ID, "client-id")
.withAttribute(CloudEventV1.SOURCE, "http://127.0.0.1/amqp-client")
.withAttribute(CloudEventV1.TYPE, "com.example.sampletype1")
.withAttribute(CloudEventV1.TIME, "2020-11-06T21:47:12.037467+00:00")
.withData(payload.toString().getBytes())
.build();

final Message message = ProtonAmqpMessageFactory.createWriter().writeBinary(event);
message.setAddress("/telemetry");
sender.send(message, delivery -> {
if (Accepted.class.isInstance(delivery.getRemoteState())) {
writer.println("[Client:] message delivered and accepted by remote peer");
}
connection.close();
});
}).onFailure(t -> {
writer.printf("[Client] Connection failed (%s)", t.getCause().getMessage());
});

}

private static void receiveMessage() {
connectToServer(SERVER_HOST, SERVER_PORT)
.compose(conn -> {
connection = conn;
writer.println("[Client] Connected");
return Future.succeededFuture();
}).onSuccess(success ->
openReceiverLink((delivery, message) -> {
final MessageReader reader = ProtonAmqpMessageFactory.createReader(message);
final CloudEvent event = reader.toEvent();
writer.printf("[Client] received CloudEvent[Id=%s, Source=%s]", event.getId(),
event.getSource().toString());
connection.close();
})
).onFailure(t -> {
writer.println("[Client] Connection failed");
});
}

private static Future<ProtonConnection> connectToServer(final String host, final int port) {

final Promise<ProtonConnection> connectAttempt = Promise.promise();
final ProtonClientOptions options = new ProtonClientOptions();
final ProtonClient client = ProtonClient.create(VERTX);

client.connect(options, host, port, connectAttempt);

return connectAttempt.future()
.compose(unopenedConnection -> {
final Promise<ProtonConnection> con = Promise.promise();
unopenedConnection.openHandler(con);
unopenedConnection.open();
return con.future();
});
}

private static Future<ProtonSender> openSenderLink() {
if (connection == null || connection.isDisconnected()) {
throw new IllegalStateException("[Client] connection not established");
}

final Promise<ProtonSender> result = Promise.promise();
final ProtonSender sender = connection.createSender(null);
sender.openHandler(result);
sender.open();
return result.future();
}

private static Future<ProtonReceiver> openReceiverLink(final ProtonMessageHandler msgHandler) {
if (connection == null || connection.isDisconnected()) {
throw new IllegalStateException("[Client] connection not established");
}

final Promise<ProtonReceiver> result = Promise.promise();
final ProtonReceiver receiver = connection.createReceiver(null);
receiver.setQoS(ProtonQoS.AT_LEAST_ONCE);
receiver.handler(msgHandler);
receiver.openHandler(result);
receiver.open();
return result.future().map(recver -> {
// Ready to receive messages
return recver;
});
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package io.cloudevents.examples.amqp.vertx;

import java.io.PrintWriter;
import java.net.URI;
import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import org.apache.qpid.proton.message.Message;

import io.cloudevents.CloudEvent;
import io.cloudevents.amqp.ProtonAmqpMessageFactory;
import io.cloudevents.core.builder.CloudEventBuilder;
import io.cloudevents.core.message.MessageReader;
import io.cloudevents.core.message.MessageWriter;
import io.vertx.core.Vertx;
import io.vertx.proton.ProtonConnection;
import io.vertx.proton.ProtonServer;
import io.vertx.proton.ProtonServerOptions;

/**
* An example vertx-based AMQP server that receives and sends CloudEvent messages to/from a remote client.
*/
public class AmqpServer {

private static final String DEFAULT_HOST = "127.0.0.1";
private static final int DEFAULT_PORT = 5672;

private static PrintWriter writer = new PrintWriter(System.out, true);

public static void main(String argv[]) {
final Vertx vertx = Vertx.vertx();

final List<String> args = new ArrayList<>();
if (argv.length > 1) {
args.addAll(Arrays.asList(argv[0].split(":", 1)));
}

final String host = args.isEmpty() ? DEFAULT_HOST : args.get(0);
final int port = args.isEmpty() ? DEFAULT_PORT : Integer.parseInt(args.get(1));

final ProtonServerOptions options = new ProtonServerOptions();
options.setHost(host);
options.setPort(port);

final ProtonServer server = ProtonServer.create(vertx, options);
server.connectHandler(con -> onConnectRequest(con)).listen(ar -> {
if (ar.succeeded()) {
writer.printf("[Server] started and listening on %s:%s\n", host, port);
} else {
writer.printf("[Server] failed to start (%s)\n", ar.cause());
System.exit(1);
}
});
}

private static void onConnectRequest(final ProtonConnection con) {
// BEGIN frame received
con.sessionOpenHandler(remoteSession -> {
remoteSession.open();
});
// ATTACH frame received -> client wants to send messages to this server.
con.receiverOpenHandler(remoteReceiver -> {
remoteReceiver.handler((delivery, message) -> {
// message received -> convert to CloudEvent
final MessageReader reader = ProtonAmqpMessageFactory.createReader(message);
final CloudEvent event = reader.toEvent();

writer.printf("[Server] received CloudEvent[Id=%s, Source=%s]\n", event.getId(),
event.getSource().toString());

}).open();
});
// ATTACH frame received -> client wants to receive messages from this server.
con.senderOpenHandler(sender -> {
try {
MessageWriter<?, Message> writer = ProtonAmqpMessageFactory.createWriter();
final CloudEvent event = CloudEventBuilder.v1()
.withId("amqp-server-id")
.withType("com.example.sampletype1")
.withSource(URI.create("http://127.0.0.1/amqp-server"))
.withTime(OffsetDateTime.now())
.withData("{\"temp\": 5}".getBytes())
.build();

final Message message = writer.writeBinary(event);
sender.send(message);

} catch (final Exception e) {
writer.println("[Server] failed to send ");
}
sender.open();
});
//OPEN frame received
con.openHandler(remoteOpen -> {
if (remoteOpen.failed()) {
// connection with client failed.
writer.println(remoteOpen.cause());
} else {
remoteOpen.result().open();
}
});
// CLOSE Frame received
con.closeHandler(remoteClose -> {
con.close();
});
}
}
2 changes: 1 addition & 1 deletion examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
<module>vertx</module>
<module>basic-http</module>
<module>restful-ws-spring-boot</module>
<module>amqp-proton</module>
</modules>


</project>