Skip to content

Commit

Permalink
Add example for RabbitMQ stream plugin
Browse files Browse the repository at this point in the history
It shows how to use the plugin with the AMQP client.
  • Loading branch information
Gsantomaggio committed Jan 30, 2022
1 parent ee3fadd commit db480c5
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 0 deletions.
15 changes: 15 additions & 0 deletions examples/stream_plugin/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
RabbitMQ Stream Examples
---
The [stream plugin](https://www.rabbitmq.com/streams.html) is available starting from RabbitMQ 3.9

These examples show how to use this lib to create/send/receive messages from stream queues.

Send a message to a stream queue
```
./send_stream.js
```

Reveice all the messages from stream queue:
```
./receive_stream.js
```
43 changes: 43 additions & 0 deletions examples/stream_plugin/receive_stream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
#!/usr/bin/env node

var amqp = require('amqplib');

amqp.connect('amqp://localhost').then(function (conn) {
process.once('SIGINT', function () { conn.close(); });
return conn.createChannel().then(function (ch) {

var q = 'my_first_stream';
// Define the queue stream
// Mandatory: exclusive: false, durable: true autoDelete: false
var ok = ch.assertQueue(q, {
exclusive: false,
durable: true,
autoDelete: false,
arguments: {
'x-queue-type': 'stream', // Mandatory to define stream queue
'x-max-length-bytes': 2_000_000_000 // Set the queue retention to 2GB else the stream doesn't have any limit
}
})

ch.qos(100); // this is mandatory

ok = ok.then(function (_qok) {
return ch.consume(q, function (msg) {
console.log(" [x] Received '%s'", msg.content.toString());
ch.ack(msg); // mandatory
}, {
noAck: false,
arguments: {
'x-stream-offset': 'first' // here you can specify the offset: : first, last, next, and timestamp
// with first start consuming always from the beginning

}
},
);
});

return ok.then(function (_consumeOk) {
console.log(' [*] Waiting for messages. To exit press CTRL+C');
});
});
}).catch(console.warn);
30 changes: 30 additions & 0 deletions examples/stream_plugin/send_stream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#!/usr/bin/env node

var amqp = require('amqplib');

amqp.connect('amqp://localhost').then(function (conn) {
return conn.createChannel().then(function (ch) {
var q = 'my_first_stream';

// Define the queue stream
// Mandatory: exclusive: false, durable: true autoDelete: false
var ok = ch.assertQueue(q, {
exclusive: false,
durable: true,
autoDelete: false,
arguments: {
'x-queue-type': 'stream', // Mandatory to define stream queue
'x-max-length-bytes': 2_000_000_000 // Set the queue retention to 2GB else the stream doesn't have any limit
}
})

var msg = 'Hello World!';

// send the message to the stream queue
return ok.then(function (_qok) {
ch.sendToQueue(q, Buffer.from(msg));
console.log(" [x] Sent '%s'", msg);
return ch.close();
});
}).finally(function () { conn.close(); });
}).catch(console.warn);

0 comments on commit db480c5

Please sign in to comment.