11package io .cloudevents .examples .amqp .vertx ;
22
33import java .io .PrintWriter ;
4- import java .util .HashMap ;
5- import java .util .Map ;
64
75import org .apache .qpid .proton .amqp .messaging .Accepted ;
8- import org .apache .qpid .proton .amqp .messaging .ApplicationProperties ;
96import org .apache .qpid .proton .message .Message ;
107
118import io .cloudevents .CloudEvent ;
129import io .cloudevents .amqp .ProtonAmqpMessageFactory ;
1310import io .cloudevents .core .message .MessageReader ;
11+ import io .cloudevents .core .v1 .CloudEventBuilder ;
12+ import io .cloudevents .core .v1 .CloudEventV1 ;
1413import io .vertx .core .Future ;
1514import io .vertx .core .Promise ;
1615import io .vertx .core .Vertx ;
1716import io .vertx .core .json .JsonObject ;
1817import io .vertx .proton .ProtonClient ;
1918import io .vertx .proton .ProtonClientOptions ;
2019import io .vertx .proton .ProtonConnection ;
21- import io .vertx .proton .ProtonHelper ;
2220import io .vertx .proton .ProtonMessageHandler ;
2321import io .vertx .proton .ProtonQoS ;
2422import io .vertx .proton .ProtonReceiver ;
@@ -36,6 +34,7 @@ public class AmqpClient {
3634 private static final String SEND_MESSAGE = "send" ;
3735 private static final String RECEIVE_MESSAGE = "receive" ;
3836
37+ final static Vertx VERTX = Vertx .vertx ();
3938 private static PrintWriter writer = new PrintWriter (System .out , true );
4039
4140 public static void main (String args []) {
@@ -47,75 +46,78 @@ public static void main(String args[]) {
4746
4847 final String action = args [0 ].toLowerCase ();
4948
50- final Vertx vertx = Vertx .vertx ();
51-
5249 switch (action ) {
5350
5451 case SEND_MESSAGE :
55- connectToServer (vertx , SERVER_HOST , SERVER_PORT )
56- .compose (conn -> {
57- connection = conn ;
58- writer .printf ("[Client] Connected to %s:%s" , SERVER_HOST , SERVER_PORT );
59-
60- return openSenderLink ();
61- }).map (sender -> {
62-
63- final JsonObject payload = new JsonObject ().put ("temp" , 50 );
64- final String to = "/telemetry" ;
65-
66- final Message message = ProtonHelper .message (to , payload .toString ());
67-
68- // set attributes
69- final Map <String , Object > attributes = new HashMap <>();
70- attributes .put ("cloudEvents:type" , "com.example.sampletype1" );
71- attributes .put ("cloudEvents:source" , "http://127.0.0.1/amqp-client" );
72- attributes .put ("cloudEvents:id" , "client-id" );
73- attributes .put ("cloudEvents:specversion" , "1.0" );
74- attributes .put ("cloudEvents:time" , "2020-11-06T21:47:12.037467+00:00" );
75- message .setApplicationProperties (new ApplicationProperties (attributes ));
76-
77- sender .send (message , delivery -> {
78- if (Accepted .class .isInstance (delivery .getRemoteState ())) {
79- writer .println ("[Client:] message delivered and accepted by remote peer" );
80- }
81- connection .close ();
82- });
83- return null ;
84- }).otherwise (t -> {
85- writer .printf ("[Client] Connection failed (%s)" , t .getCause ().getMessage ());
86- return null ;
87- });
52+ sendMessage ();
8853 break ;
54+
8955 case RECEIVE_MESSAGE :
90- connectToServer (vertx , SERVER_HOST , SERVER_PORT )
91- .compose (conn -> {
92- connection = conn ;
93- writer .println ("[Client] Connected" );
94- return Future .succeededFuture ();
95- }).map (success -> {
96-
97- return openReceiverLink ((delivery , message ) -> {
98- final MessageReader reader = ProtonAmqpMessageFactory .createReader (message );
99- final CloudEvent event = reader .toEvent ();
100- writer .printf ("[Client] received CloudEvent[Id=%s, Source=%s]" , event .getId (),
101- event .getSource ().toString ());
102- connection .close ();
103- });
104- }).otherwise (t -> {
105- writer .println ("[Client] Connection failed" );
106- return null ;
107- });
56+ receiveMessage ();
10857 break ;
58+
10959 default :
11060 writer .println ("Unknown action" );
11161 }
11262 }
11363
114- private static Future <ProtonConnection > connectToServer (final Vertx vertx , final String host , final int port ) {
64+ private static void sendMessage () {
65+ connectToServer (SERVER_HOST , SERVER_PORT )
66+ .compose (conn -> {
67+ connection = conn ;
68+ writer .printf ("[Client] Connected to %s:%s" , SERVER_HOST , SERVER_PORT );
69+
70+ return openSenderLink ();
71+ }).onSuccess (sender -> {
72+
73+ final JsonObject payload = new JsonObject ().put ("temp" , 50 );
74+
75+ final CloudEvent event = new CloudEventBuilder ()
76+ .withAttribute (CloudEventV1 .ID , "client-id" )
77+ .withAttribute (CloudEventV1 .SOURCE , "http://127.0.0.1/amqp-client" )
78+ .withAttribute (CloudEventV1 .TYPE , "com.example.sampletype1" )
79+ .withAttribute (CloudEventV1 .TIME , "2020-11-06T21:47:12.037467+00:00" )
80+ .withData (payload .toString ().getBytes ())
81+ .build ();
82+
83+ final Message message = ProtonAmqpMessageFactory .createWriter ().writeBinary (event );
84+ message .setAddress ("/telemetry" );
85+ sender .send (message , delivery -> {
86+ if (Accepted .class .isInstance (delivery .getRemoteState ())) {
87+ writer .println ("[Client:] message delivered and accepted by remote peer" );
88+ }
89+ connection .close ();
90+ });
91+ }).onFailure (t -> {
92+ writer .printf ("[Client] Connection failed (%s)" , t .getCause ().getMessage ());
93+ });
94+
95+ }
96+
97+ private static void receiveMessage () {
98+ connectToServer (SERVER_HOST , SERVER_PORT )
99+ .compose (conn -> {
100+ connection = conn ;
101+ writer .println ("[Client] Connected" );
102+ return Future .succeededFuture ();
103+ }).onSuccess (success ->
104+ openReceiverLink ((delivery , message ) -> {
105+ final MessageReader reader = ProtonAmqpMessageFactory .createReader (message );
106+ final CloudEvent event = reader .toEvent ();
107+ writer .printf ("[Client] received CloudEvent[Id=%s, Source=%s]" , event .getId (),
108+ event .getSource ().toString ());
109+ connection .close ();
110+ })
111+ ).onFailure (t -> {
112+ writer .println ("[Client] Connection failed" );
113+ });
114+ }
115+
116+ private static Future <ProtonConnection > connectToServer (final String host , final int port ) {
115117
116118 final Promise <ProtonConnection > connectAttempt = Promise .promise ();
117119 final ProtonClientOptions options = new ProtonClientOptions ();
118- final ProtonClient client = ProtonClient .create (vertx );
120+ final ProtonClient client = ProtonClient .create (VERTX );
119121
120122 client .connect (options , host , port , connectAttempt );
121123
0 commit comments