-
Notifications
You must be signed in to change notification settings - Fork 4
/
server.js
87 lines (75 loc) · 2.78 KB
/
server.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
/**
* This is a typical Node.js server setup, which is used to serve up a demo React app that
* will be using atomic-kafka to produce data to a Kafka cluster.
*/
const express = require("express");
const app = express();
const path = require("path");
const AtomicKafkaServer = require("atomic-kafka/server");
const fs = require("fs");
const port = 3001;
app.use(express.urlencoded({ extended: true }));
app.use(express.json());
app.use("/assets", express.static(path.join(__dirname, "./assets")));
// serve root
app.get("/", (req, res) => {
res.sendFile(path.resolve(__dirname + "/index.html"));
});
// 404 handler
app.use("*", (req, res) => {
return res
.status(404)
.send("********** GLOBAL BAD REQUEST / 404 ERROR **********");
});
const server = app.listen(port, () => {
console.log(`Listening on port ${port}`);
});
/**
* Pass in the server that the Atomic Kafka instance will interact with on the backend
* as well as the frontend. These ports need to match so that the socket on the back and
* frontend are connected correctly.
*/
const atomicKafkaInstance = new AtomicKafkaServer(server);
// newProducer generates a Kafka producer instance using the topic name passed in by the user
atomicKafkaInstance.newProducer("test_topic");
// socketProduce produces messages to the topic in the cluster via a websocket connection;
// in order for the message to be routed from front to backend, the event string must match
atomicKafkaInstance.socketProduce('postMessage', 'test_topic')
/**
* OPTIONAL: The user can write and run a custom function that invokes one of Atomic Kafka's
* produce functions if their use case necessitates a different way to handle the data.
* For example, produceMyWay reads a stream of JSON data on loop from the user's file system.
*/
const produceMyWay = () => {
let data;
let idNum = 0;
let toSend = [];
try {
data = fs.readFileSync("salesData.json", "UTF-8");
const lines = data.split(/\r?\n/);
lines.forEach((line) => {
toSend.push(line);
});
} catch (err) {
console.error(err);
}
let i = 0;
const interval = setInterval(async () => {
try {
let sendObj;
if (typeof toSend[i] === "string") sendObj = JSON.parse(toSend[i]);
else sendObj = toSend[i];
sendObj.ID = "gen" + idNum; // add a generated ID to sendObj, which will be expected by the demo consumer
console.log("executing send with: ", sendObj);
atomicKafkaInstance.fileProduce(sendObj, "test_topic");
// reset i upon reaching last piece of data on file
if (i >= toSend.length - 1) i = 0;
else i++;
idNum++;
} catch (err) {
console.log("Error with producing in produce(): ", err);
}
}, 4000);
};
// Uncomment invocation below to test this app with an endless stream of toy data
// produceMyWay();