This repository has been archived by the owner on Oct 9, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 213
/
Copy pathworker.js
149 lines (119 loc) · 4.06 KB
/
worker.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
import bodyParser from 'body-parser';
import './environment';
import logger from './utils/logger';
import renderBatch from './utils/renderBatch';
import { runAppLifecycle, errorSync, raceTo } from './utils/lifecycle';
import BatchManager from './utils/BatchManager';
const attachMiddleware = (app, config) => {
app.use(bodyParser.json(config.bodyParser));
};
const attachEndpoint = (app, config, callback) => {
app.post(config.endpoint, renderBatch(config, callback));
};
function exit(code) {
return () => process.exit(code);
}
class Server {
constructor(app, config, callback) {
this.server = null;
this.app = app;
this.config = config;
this.callback = callback;
this.closing = false;
this.close = this.close.bind(this);
this.errorHandler = this.errorHandler.bind(this);
this.shutDownSequence = this.shutDownSequence.bind(this);
}
close() {
return new Promise((resolve) => {
if (!this.server) {
resolve();
return;
}
try {
this.closing = true;
this.server.close((e) => {
if (e) { logger.info('Ran into error during close', { stack: e.stack }); }
resolve();
});
} catch (e) {
logger.info('Ran into error on close', { stack: e.stack });
resolve();
}
});
}
shutDownSequence(error, req, code = 1) {
if (error) {
logger.info(error.stack);
}
raceTo(this.close(), 1000, 'Closing the worker took too long.')
.then(() => runAppLifecycle('shutDown', this.config.plugins, this.config, error, req))
.then(exit(code))
.catch(exit(code));
}
errorHandler(err, req, res, next) { // eslint-disable-line no-unused-vars
// If there is an error with body-parser and the status is set then we can safely swallow
// the error and report it.
// Here are a list of errors https://github.com/expressjs/body-parser#errors
if (err.status && err.status >= 400 && err.status < 600) {
logger.info('Non-fatal error encountered.');
logger.info(err.stack);
res.status(err.status).end();
// In a promise in case one of the plugins throws an error.
new Promise(() => { // eslint-disable-line no-new
const manager = new BatchManager(req, res, req.body, this.config);
errorSync(err, this.config.plugins, manager);
});
return;
}
this.shutDownSequence(err, req, 1);
}
initialize() {
// run through the initialize methods of any plugins that define them
runAppLifecycle('initialize', this.config.plugins, this.config)
.then(() => {
this.server = this.app.listen(...this.config.listenArgs, this.callback);
return null;
})
.catch(this.shutDownSequence);
}
}
const initServer = (app, config, callback) => {
const server = new Server(app, config, callback);
// Middleware
app.use(server.errorHandler);
// Last safety net
process.on('uncaughtException', server.errorHandler);
// if all the workers are ready then we should be good to start accepting requests
process.on('message', (msg) => {
if (msg === 'kill') {
server.shutDownSequence(null, null, 0);
}
});
server.initialize();
return server;
};
const worker = (app, config, onServer, workerId) => {
// ===== Middleware =========================================================
attachMiddleware(app, config);
if (onServer) {
onServer(app, process);
}
let server;
// ===== Routes =============================================================
// server.closing
attachEndpoint(app, config, () => server && server.closing);
// ===== initialize server's nuts and bolts =================================
server = initServer(app, config, () => {
if (process.send) {
// tell our coordinator that we're ready to start receiving requests
process.send({ workerId, ready: true });
}
logger.info('Connected', { listen: config.listenArgs });
});
};
worker.attachMiddleware = attachMiddleware;
worker.attachEndpoint = attachEndpoint;
worker.initServer = initServer;
worker.Server = Server;
export default worker;