-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrpc_client.js
115 lines (96 loc) · 3.31 KB
/
rpc_client.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
// #!/usr/bin/env node
const amqp = require('amqplib/callback_api');
const { resolve, reject } = require('bluebird');
const EventEmitter = require('events');
class MyEmitter extends EventEmitter { }
class RequestQueue {
constructor() {
this.timeout = 3000; // Default timeout: 3s
}
init(cb) {
this.emitter = new MyEmitter();
const self = this;
amqp.connect(process.env.RABBITMQ_URL || 'amqp://localhost', function (error0, connection) {
if (error0) {
throw error0;
}
self.connection = connection;
connection.createChannel(function (error1, channel) {
if (error1) {
return reject(error1);
}
self.channel = channel;
channel.assertQueue('', {
exclusive: true
}, function (error2, q) {
if (error2) {
throw error2;
}
self.q = q;
console.log('Response consume');
channel.consume(q.queue, function (msg) {
console.info("Message: ", msg.content.toString());
if (msg.properties.correlationId) {
let mes = msg.content.toString();
if (!self.emitter.emit(msg.properties.correlationId, null, mes)) {
console.error("Missing listen of correlationId: ", msg.properties.correlationId, mes);
}
}
}, {
noAck: true
});
// Callback
cb();
});
});
});
}
sendRequest(message) {
const correlationId = generateUuid();
const self = this;
const result = new Promise((resolve, reject) => {
self.emitter.once(correlationId, (err, v) => {
if (err) {
return reject(err);
}
console.info(' [.] Got %s', v);
resolve(v);
});
});
self.channel.sendToQueue('rpc_queue', Buffer.from(message.toString()), {
correlationId: correlationId,
replyTo: self.q.queue
});
this.rejectTimeout(correlationId);
return result;
}
rejectTimeout(correlationId) {
const self = this;
setTimeout(() => {
self.emitter.emit(correlationId, new Error('Timeout'));
}, self.timeout);
}
}
function generateUuid() {
return Math.random().toString() +
Math.random().toString() +
Math.random().toString();
}
const express = require('express');
const app = express();
const port = 3000;
const rq = new RequestQueue();
app.get('/', (req, res) => {
const num = parseInt(req.query.num);
console.info("Request num: ", num);
rq.sendRequest(num)
.then(v => res.send(v))
.catch(err => res.send(err.message));
});
rq.init(() => {
const args = process.argv.slice(2);
app.listen(args[0] || port, () => {
console.log(`Example app listening at http://localhost:${port}`);
console.log(`Use this url for test http://localhost:${port}?num=1`);
});
});