-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathsse.js
75 lines (62 loc) · 2.19 KB
/
sse.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
const serialize = require('./public/serialize');
const { MIDDLEWARE_WAIT_TIME } = require('./constants');
const { SSE, REQUESTS } = require('./globals');
const { delay, normalizeEvent } = require('./helpers')
const startSSEPiping = async () => {
while (true) {
await delay(1000)
const chunks = SSE.backflow.splice(0, SSE.backflow.length);
if (!chunks.length) continue;
const chunksByRequests = {};
for (const chunk of chunks) {
if (!(chunk.id in chunksByRequests)) chunksByRequests[chunk.id] = [];
chunksByRequests[chunk.id].push(chunk)
chunksByRequests[chunk.id].sort((a, b) => b.end - a.end)
}
const holding = [];
const now = performance.now();
for (const chunks of Object.values(chunksByRequests)) {
const latest = chunks[0];
if (now - latest.event.end <= MIDDLEWARE_WAIT_TIME * 2) {
delete chunksByRequests[latest.id];
holding.push(...chunks);
}
}
SSE.backflow.push(...holding);
const readyChunks = Object.values(chunksByRequests).flat();
for (const chunk of readyChunks) chunk.event = await normalizeEvent(chunk.event);
const newRequests = {};
for (const chunk of readyChunks) {
const info = REQUESTS.get(chunk.id);
if (!info) continue;
if (!(chunk.id in newRequests)) newRequests[chunk.id] = { id: chunk.id, start: info.start, events: [] };
newRequests[chunk.id].events.push(chunk.event);
newRequests[chunk.id].events.sort((a, b) => a.start - b.start);
}
const json = JSON.stringify(serialize(newRequests, { json: true }));
for (const client of SSE.clients) client.write(`data: ${json}\n\n`);
}
}
function handleSSERequests(_, response) {
response.set({
'Cache-Control': 'no-cache',
'Content-Type': 'text/event-stream',
'Connection': 'keep-alive'
});
response.flushHeaders();
response.write('retry: 10000\n\n');
SSE.clients.push(response);
response.on('close', () => {
response.end();
for (let i = SSE.clients.length; i >= 0; i--) {
if (SSE.clients[i] === response) return SSE.clients.splice(i, 1);
}
})
}
function startSSE(){
SSE.heartbeatInterval = setInterval(() => {
for (const client of SSE.clients) client.write(': .\n\n')
}, 10000);
startSSEPiping();
}
module.exports = { SSE, handleSSERequests, startSSE }