Skip to content

Commit

Permalink
added workers to load balance
Browse files Browse the repository at this point in the history
  • Loading branch information
WaviestBalloon committed May 3, 2023
1 parent 1b05d5d commit b4fc91d
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 12 deletions.
56 changes: 46 additions & 10 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import fastify, { FastifyRequest, FastifyReply } from "fastify";
import { readFileSync } from "fs";
import { dirname, join } from "path";
import { fileURLToPath } from "url";
import { createHash } from "crypto";
import cluster from "node:cluster";
import os from "node:os";
import { readFileSync } from "node:fs";
import { dirname, join } from "node:path";
import { fileURLToPath } from "node:url";

import render from "./render.js";

Expand All @@ -11,17 +14,29 @@ const nameList = readFileSync(join(__dirname, `../fillernames.txt`), "utf-8").sp
const server = fastify({ logger: false });
let portNumber = 3000;
let cacheRemovalTimer = 285000;
let enableMultiThread = true;
let cache: { query: any, buffer: Buffer; }[] = [];
process.argv.forEach((val, index) => {
if (index < 2) return;
switch (val) {
case "--port": portNumber = parseInt(process.argv[index + 1]);
case "--cache-removal-timer": cacheRemovalTimer = parseInt(process.argv[index + 1]);
case "--use-cluster": enableMultiThread = process.argv[index + 1] === "true" ? true : false;
default: return;
}
});

server.get("/pfp", async (request: FastifyRequest, reply: FastifyReply) => { // TODO: caching rendered images
if (enableMultiThread && cluster.isPrimary) {
cluster.setupPrimary({ exec: join(__dirname, "worker.js") });
//cluster.on("online", (worker) => { console.log(`Worker ${worker.id} is online`); });
cluster.on("exit", (worker, code, signal) => { console.log(`Worker ${worker.id} exited with code ${code} and signal ${signal}`); });
for (let i = 0; i < os.cpus().length; i++) {
console.log(`Forking worker threads ${i + 1}/${os.cpus().length}`);
cluster.fork();
}
}

server.get("/pfp", async (request: FastifyRequest, reply: FastifyReply) => {
let startTimer = Date.now();
let query = request.query as any;
for (let i = 0; i < cache.length; i++) {
Expand All @@ -30,7 +45,7 @@ server.get("/pfp", async (request: FastifyRequest, reply: FastifyReply) => { //
return reply.header("Content-Type", "image/png").header("X-Completed-In", Date.now() - startTimer).header("X-Returned-Cache", true).send(cache[i].buffer);
}
}
if (query?.name === undefined) query.name = Math.random().toString(36).substring(7);
if (query?.name === undefined) query.name = (Math.random() * 1000000).toString(36).substring(7);
console.log(query);

let mag = query?.mag ?? 10;
Expand All @@ -45,7 +60,7 @@ server.get("/pfp", async (request: FastifyRequest, reply: FastifyReply) => { //
let colour = query?.colour ?? null;
if (colour !== null) {
if (colour.length !== 6) {
return reply.code(400).send("Colour must be 6 characters long (Hexadecimal format, eg. e8c8e8)");
return reply.code(400).send("Colour must be 6 characters long (Hexadecimal format, e.g. e8c8e8)");
}
}

Expand All @@ -56,10 +71,29 @@ server.get("/pfp", async (request: FastifyRequest, reply: FastifyReply) => { //
if (wh * mag > 10000) {
return reply.code(400).send(`Width/Height and/or Magnification collectively exceeds 10000 pixels (${wh * mag}) (Try lowering your magnification or width/height, or both)`);
}

let buffer = await render(query?.name, width, height, wh, mag, blockSize, colour);

cache.push({ query: query, buffer: buffer });
let canvasBuffer: Buffer;
let workIdentifier: string | undefined;
let workerId: any | undefined;
if (enableMultiThread) {
workIdentifier = createHash("md5").update(Date.now().toString()).digest("hex");
workerId = Object.keys(cluster.workers)[Math.floor(Math.random() * Object.keys(cluster.workers).length)];
let threadWorker = cluster.workers[workerId]?.process
threadWorker?.send({ data: { name: query?.name, width: width, height: height, wh: wh, mag: mag, blockSize: blockSize, colour: colour }, workId: workIdentifier });

canvasBuffer = await new Promise((resolve, reject) => {
threadWorker?.on("message", (data: { type: string; workId: string; data: Buffer; }) => { //{ type: string, data: Buffer }
if (data.type == "completed" && data.workId === workIdentifier) {
resolve(Buffer.from(data.data));
}
});
});
} else {
canvasBuffer = await render(query?.name, width, height, wh, mag, blockSize, colour);
}

console.log(`Adding ${query?.name} to cache - ${cache.length} items remaining in cache`);
cache.push({ query: query, buffer: canvasBuffer });
setTimeout(() => {
for (let i = 0; i < cache.length; i++) {
if (cache[i].query === query) {
Expand All @@ -70,7 +104,9 @@ server.get("/pfp", async (request: FastifyRequest, reply: FastifyReply) => { //
console.log(`Removed ${query?.name} from cache - ${cache.length} items remaining in cache`);
}, cacheRemovalTimer);
console.log(`Completed in ${Date.now() - startTimer}ms`);
reply.header("Content-Type", "image/png").header("X-Completed-In", Date.now() - startTimer).send(buffer);
reply.header("Content-Type", "image/png").header("X-Completed-In", Date.now() - startTimer);
if (workIdentifier !== undefined) reply.header("X-By-Worker", workerId); reply.header("X-Work-Identifier", workIdentifier);
reply.send(canvasBuffer);
});
server.get("/", async (request: FastifyRequest, reply: FastifyReply) => {
let randomName = nameList[Math.floor(Math.random() * nameList.length)];
Expand Down
2 changes: 1 addition & 1 deletion src/render.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ export default async function render(name: any, width: number, height: number, w
let binaryNoSpace = binary.join("");
let binaryArray = [];
for (let i = 0; i < binaryNoSpace.length; i++) {
if (i > 1) { // this fixes a bug where the first 2 bits are always 1
if (i > 1) { // this "fixes" a bug where the first 2 bits are always 1
binaryArray.push(binaryNoSpace[i]);
}
}
Expand Down
19 changes: 19 additions & 0 deletions src/worker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import cluster from "node:cluster";
import render from "./render.js";
if (!cluster.isWorker) process.exit(1);
const workerId = cluster.worker.id;

cluster.worker.on("message", async (msg: any) => {
console.log(`Worker ${workerId} received render request (#${msg.workId})`);
let params = msg.data;
let receivedBuffer = await render(params.name, params.width, params.height, params.wh, params.mag, params.blockSize, params.colour)
console.log(`Worker ${workerId} completed render request`);

cluster.worker.send({
type: "completed",
workId: msg.workId,
data: receivedBuffer as Buffer
});
});

console.log(`Worker ${workerId} fully initialised`);
2 changes: 1 addition & 1 deletion start.sh
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
rm -rf dist
npx tsc
node . --port 3000 --cache-removal-timer 285000
node . --port 3000 --cache-removal-timer 285000 --use-cluster true

0 comments on commit b4fc91d

Please sign in to comment.