-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathapp.js
109 lines (83 loc) · 3.04 KB
/
app.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
const createError = require('http-errors');
const express = require('express');
const cookieParser = require('cookie-parser');
const logger = require('morgan');
const mongoose = require('mongoose');
const amqp = require('amqplib/callback_api');
const devConfig = require('./config/development');
const recordingsRouter = require('./routes/recordings');
const initialNodes = require('./routes/initial-nodes');
const app = express();
const handleNodeInitialStateMessage = require("./handlers/node-initial-state-message.handler");
const handleRecordMessage = require("./handlers/record-message.handler");
app.use(logger('dev'));
app.use(express.json());
app.use(express.urlencoded({extended: false}));
app.use(cookieParser());
app.use('/api/recordings', recordingsRouter);
app.use('/api/initial-nodes', initialNodes);
// catch 404 and forward to error handler
app.use(function (req, res, next) {
next(createError(404));
});
// error handler
app.use(function (err, req, res, next) {
// set locals, only providing error in development
res.locals.message = err.message;
res.locals.error = req.app.get('env') === 'development' ? err : {};
// render the error page
res.status(err.status || 500);
res.send('error');
});
connect();
function connect() {
const options = {
poolSize: 2,
promiseLibrary: global.Promise,
useNewUrlParser: true
};
return mongoose.connect(devConfig.db, options).connection;
}
//todo: This is a quick and dirty solution, required to implement in fault tolerance fashion
//using exponential backoffs and other best practices
const amqpConnect = (url) => {
return new Promise((resolve, reject) => {
tryToConnect();
function tryToConnect() {
console.log("RECORDINGS-SERVICE trying to establish connection with AMQP");
amqp.connect(url, connectionStrategy);
}
function connectionStrategy(err, conn) {
if (err) {
console.log("RECORDINGS-SERVICE failed to establish connection with AMQP, retry in 3 sec");
setTimeout(tryToConnect, 3000);
return
}
console.log("RECORDINGS-SERVICE successfully connected to AMQP");
resolve(conn);
}
});
};
amqpConnect("amqp://rabbitmq").then(conn => {
conn.createChannel((err, ch) => {
consumeFromQueue(ch, "nodeInitialStateRecords", handleNodeInitialStateMessage);
[
"addedNodeRecords",
"removedNodeRecords",
"mutatedNodeRecords",
"mouseClickRecords",
"innerScrollRecords",
"innerWindowRecords",
"mouseMoveRecords",
"viewPortResizeRecords",
"inputRecords"
].forEach(queue => consumeFromQueue(ch, queue, handleRecordMessage))
})
});
function consumeFromQueue(channel, queue, handler) {
channel.assertQueue(queue, {durable: false});
channel.consume(queue, (msg) => {
handler(JSON.parse(msg.content.toString()))
}, {noAck: true})
}
module.exports = app;