Skip to content

Improve Validator Management with Redis and IP Location Lookup #5

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions apps/frontend/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,6 @@ yarn-error.log*
# typescript
*.tsbuildinfo
next-env.d.ts

# clerk configuration (can include secrets)
/.clerk/
215 changes: 109 additions & 106 deletions apps/hub/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,24 @@ import { prismaClient } from "db/client";
import { PublicKey } from "@solana/web3.js";
import nacl from "tweetnacl";
import nacl_util from "tweetnacl-util";
import { redis } from "cache/redis";

const availableValidators: { validatorId: string, socket: ServerWebSocket<unknown>, publicKey: string }[] = [];

const CALLBACKS : { [callbackId: string]: (data: IncomingMessage) => void } = {}
const CALLBACKS: { [callbackId: string]: (data: IncomingMessage) => void } = {};
const COST_PER_VALIDATION = 100; // in lamports

Bun.serve({
const server = Bun.serve({
fetch(req, server) {
if (server.upgrade(req)) {
return;
}
return new Response("Upgrade failed", { status: 500 });
if (server.upgrade(req)) {
return;
}
return new Response("Upgrade failed", { status: 500 });
},
port: 8081,
websocket: {
async message(ws: ServerWebSocket<unknown>, message: string) {
const data: IncomingMessage = JSON.parse(message);

if (data.type === 'signup') {

if (data.type === "signup") {
const verified = await verifyMessage(
`Signed message for ${data.data.callbackId}, ${data.data.publicKey}`,
data.data.publicKey,
Expand All @@ -32,127 +30,132 @@ Bun.serve({
if (verified) {
await signupHandler(ws, data.data);
}
} else if (data.type === 'validate') {
} else if (data.type === "validate" && CALLBACKS[data.data.callbackId]) {
CALLBACKS[data.data.callbackId](data);
delete CALLBACKS[data.data.callbackId];
}
},
async close(ws: ServerWebSocket<unknown>) {
availableValidators.splice(availableValidators.findIndex(v => v.socket === ws), 1);
}
const validatorId = await redis.hget("wsToValidator", ws.toString());
if (!validatorId) {
console.log("Disconnected WebSocket not found in Redis.");
return;
}

console.log(`Removing validator ${validatorId} from active list`);

await redis.srem("availableValidators", validatorId);
await redis.lrem("validatorQueue", 0, validatorId);
await redis.hdel("wsToValidator", ws.toString());
await redis.hdel("validatorWs", validatorId);

const channelName = `validator:${validatorId}`;
ws.unsubscribe(channelName);

console.log(`Validator ${validatorId} successfully removed.`);

},
},
});

async function signupHandler(ws: ServerWebSocket<unknown>, { ip, publicKey, signedMessage, callbackId }: SignupIncomingMessage) {
const validatorDb = await prismaClient.validator.findFirst({
where: {
publicKey,
},
});
async function getIpLocation(ip: string) {
try {
const res = await fetch(`http://ip-api.com/json/${ip}`);
const data = await res.json();
return JSON.stringify({
country: data.country,
city: data.city,
region: data.regionName,
lat: data.lat,
lon: data.lon,
});
} catch (error) {
console.error("Error fetching IP location:", error);
return "unknown";
}
}

if (validatorDb) {
ws.send(JSON.stringify({
type: 'signup',
data: {
validatorId: validatorDb.id,
callbackId,
},
}));

availableValidators.push({
validatorId: validatorDb.id,
socket: ws,
publicKey: validatorDb.publicKey,
async function signupHandler(ws: ServerWebSocket<unknown>, { publicKey, callbackId }: SignupIncomingMessage) {
let validator = await prismaClient.validator.findFirst({
where: { publicKey },
});

if (!validator) {
const ip = ws?.remoteAddress || "unknown";
const location = await getIpLocation(ip)
validator = await prismaClient.validator.create({
data: { ip, publicKey, location },
});
return;
}

//TODO: Given the ip, return the location
const validator = await prismaClient.validator.create({
data: {
ip,
publicKey,
location: 'unknown',
},
});

ws.send(JSON.stringify({
type: 'signup',
data: {
validatorId: validator.id,
callbackId,
},
type: "signup",
data: { validatorId: validator.id, callbackId },
}));

availableValidators.push({
validatorId: validator.id,
socket: ws,
publicKey: validator.publicKey,
});
await redis.sadd("availableValidators", validator.id);
await redis.lpush("validatorQueue", validator.id);
await redis.hset("wsToValidator", ws.toString(), validator.id);
const channelName = `validator:${validator.id}`;
ws.subscribe(channelName);
await redis.hset("validatorWs", validator.id, ws.toString());
}

async function verifyMessage(message: string, publicKey: string, signature: string) {
const messageBytes = nacl_util.decodeUTF8(message);
const result = nacl.sign.detached.verify(
messageBytes,
new Uint8Array(JSON.parse(signature)),
new PublicKey(publicKey).toBytes(),
);

return result;
const signatureBytes = new Uint8Array(JSON.parse(signature));
return nacl.sign.detached.verify(messageBytes, signatureBytes, new PublicKey(publicKey).toBuffer());
}

setInterval(async () => {
const websitesToMonitor = await prismaClient.website.findMany({
where: {
disabled: false,
},
where: { disabled: false },
});

for (const website of websitesToMonitor) {
availableValidators.forEach(validator => {
const callbackId = randomUUIDv7();
console.log(`Sending validate to ${validator.validatorId} ${website.url}`);
validator.socket.send(JSON.stringify({
type: 'validate',
data: {
url: website.url,
callbackId
},
}));

CALLBACKS[callbackId] = async (data: IncomingMessage) => {
if (data.type === 'validate') {
const { validatorId, status, latency, signedMessage } = data.data;
const verified = await verifyMessage(
`Replying to ${callbackId}`,
validator.publicKey,
signedMessage
);
if (!verified) {
return;
}

await prismaClient.$transaction(async (tx) => {
await tx.websiteTick.create({
data: {
websiteId: website.id,
validatorId,
status,
latency,
createdAt: new Date(),
},
});

await tx.validator.update({
where: { id: validatorId },
data: {
pendingPayouts: { increment: COST_PER_VALIDATION },
},
});
const validationQueue = websitesToMonitor.map(website => ({
websiteId: website.id,
url: website.url,
}));

while (validationQueue.length > 0) {
const validationRequest = validationQueue.shift();
if (!validationRequest) continue;

const validatorId = await redis.rpoplpush("validatorQueue", "validatorQueue");
if (!validatorId) continue;

const validatorWsString = await redis.hget("validatorWs", validatorId);
if (!validatorWsString) continue;

const validator = await prismaClient.validator.findUnique({ where: { id: validatorId } });
if (!validator) continue;

const callbackId = randomUUIDv7();
console.log(`Sending validation request to ${validatorId}: ${validationRequest.url}`);

server.publish(`validator:${validatorId}`, JSON.stringify({
type: "validate",
data: { url: validationRequest.url, callbackId },
}));

CALLBACKS[callbackId] = async (data: IncomingMessage) => {
if (data.type === "validate") {
const { status, latency, signedMessage } = data.data;
const verified = await verifyMessage(`Replying to ${callbackId}`, validator.publicKey, signedMessage);
if (!verified) return;

await prismaClient.$transaction(async tx => {
await tx.websiteTick.create({
data: { websiteId: validationRequest.websiteId, validatorId, status, latency, createdAt: new Date() },
});
}
};
});

await tx.validator.update({
where: { id: validatorId },
data: { pendingPayouts: { increment: COST_PER_VALIDATION } },
});
});
}
};
}
}, 60 * 1000);
}, 60 * 1000);
3 changes: 2 additions & 1 deletion apps/validator/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@ import type { OutgoingMessage, SignupOutgoingMessage, ValidateOutgoingMessage }
import { Keypair } from "@solana/web3.js";
import nacl from "tweetnacl";
import nacl_util from "tweetnacl-util";
import bs58 from 'bs58'

const CALLBACKS: {[callbackId: string]: (data: SignupOutgoingMessage) => void} = {}

let validatorId: string | null = null;

async function main() {
const keypair = Keypair.fromSecretKey(
Uint8Array.from(JSON.parse(process.env.PRIVATE_KEY!))
bs58.decode(process.env.PRIVATE_KEY!)
);
const ws = new WebSocket("ws://localhost:8081");

Expand Down
1 change: 1 addition & 0 deletions apps/validator/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
"dependencies": {
"@solana/kit": "^2.1.0",
"@solana/web3.js": "^1.98.0",
"bs58": "^6.0.0",
"tweetnacl": "^1.0.3",
"tweetnacl-util": "^0.15.1"
}
Expand Down
Loading