Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 70 additions & 28 deletions api/_utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { Buffer } from "node:buffer";
import { Redis } from '@upstash/redis';
import { Octokit } from '@octokit/core';
import { createOrUpdateTextFile } from '@octokit/plugin-create-or-update-text-file';
import { waitUntil } from '@vercel/functions';

const secret = process.env.SECRET;
const sigLen = parseInt(process.env.SIG_LEN);
Expand All @@ -19,21 +20,17 @@ const streamTimeout = parseInt(process.env.STREAM_TIMEOUT);
const maxStreamCount = parseInt(process.env.MAX_STREAM_COUNT);
const maxPublicPostCount = parseInt(process.env.MAX_PUBLIC_POSTS_RETAINED);
const maxFieldsCount = parseInt(process.env.MAX_PRIVATE_POST_FIELDS);
const pipingServerURL = process.env.PIPING_SERVER_URL;
const defunctPipePlumbTimeout = parseInt(process.env.DEFUNCT_PIPE_PLUMB_TIMEOUT);

const dbKeyPrefix = {
manyToOne: "m2o:",
oneToMany: "o2m:",
oneToOne: "o2o:",
cache: "cache:",
stream: {
public: {
send: "pipePubSend:",
receive: "pipePubRecv:"
},
private: {
send: "pipePrivSend:",
receive: "pipePrivRecv:"
}
pipe: {
send: "pipeSend:",
receive: "pipeRecv:"
}
}

Expand Down Expand Up @@ -279,31 +276,76 @@ export async function oneToOneTTL(privateKey, key){
return {ttl: bool ? ttl : 0};
}

// Tokens are stored in LIFO stacks. Old and unused tokens are trimmed.
// Timestamps (Unix-time in seconds) are stored with the tokens using string concatenation
export async function streamToken(privateOrPublicKey, receive=true){
const type = parseKey(privateOrPublicKey).type;
const typeComplement = (type == 'private') ? 'public' : 'private';
const publicKey = genPublicKey(privateOrPublicKey, { validate: false });
const publicKeyRandom = parseKey(publicKey, { validate: false }).random;
const mode = receive ? "receive" : "send";
const modeComplement = receive ? "send" : "receive";
const existing = await redisData.lpop(dbKeyPrefix.stream[typeComplement][modeComplement] + publicKeyRandom);
const timeNow = Math.round(Date.now()/1000);
if (existing) {
const [token, timestamp] = existing.split('@');
// Expired unused tokens are possible as adding new tokens refreshes the expiry of the entire list (see below).
// Return token that is not expired.
if ((timeNow - timestamp) < streamTimeout) return token;
// Whether provided http method means 'send' or 'receive' mode.
// If option `complement` is true, returns the complementary mode instead.
function methodToMode(method, complement=false){
const table = { send: ['POST', 'PUT'], receive: ['GET', 'HEAD'] };
const mode = Object.keys(table).find((key) => {
return table[key].includes(method) === !complement;
})
if (!mode) throw new Error('Method Not Allowed');
return mode;
}

// Plumbs defunct private pipes with bodyless http-requests under timeout
function plumbDefunctPipes(tokens, privateMode){
// If private mode is sending, plumb with 'GET'.
// Plumb with 'POST', otherwise.
const table = {send: 'GET', receive: 'POST'};
if (Boolean(tokens?.length) === false) return;
const method = table[privateMode];
const opts = {
method,
body: '',
signal: AbortSignal.timeout(defunctPipePlumbTimeout)
}
tokens.forEach((el) => {
const [ token, ] = el.split('@');
waitUntil(fetch(pipingServerURL + token, opts).catch((err) => {}));
})
}

// streamTimeout simply guarantees that a pipe, if plumbed, will be plumbed within that time period.
// Tokens are stored in LIFO stacks of finite max length.
// New tokens evict old ones, once stack is filled.
// Pipes corresponding to evicted tokens are never discovered, i.e. are defunct.
// Defunct pipes are plumbed later with bodyless pipes: see plumbDefunctPipes().
// Timestamps (Unix-time in seconds) are stored with the tokens using string concatenation.
export async function pipeToPublic(privateKey, httpMethod){
const publicKey = genPublicKey(privateKey);
const privateMode = methodToMode(httpMethod);
const dbKey = dbKeyPrefix.pipe[privateMode] + parseKey(publicKey, { validate: false }).random;
const token = randStr();
const dbKey = dbKeyPrefix.stream[type][mode] + publicKeyRandom;
const timeNow = Math.round(Date.now()/1000);
const [ count, ] = await Promise.all([
redisData.lpush(dbKey, token + '@' + timeNow),
redisData.expire(dbKey, streamTimeout)
])
if (count > maxStreamCount) await redisData.ltrim(dbKey, 0, maxStreamCount - 1);
return token;
if (count > maxStreamCount) {
const expiredTokens = await redisData.rpop(dbKey, count - maxStreamCount);
plumbDefunctPipes(expiredTokens, privateMode);
}
return pipingServerURL + token;
}

// Expired, unused tokens imply defunct pipes (see above).
export async function pipeToPrivate(publicKey, httpMethod){
const privateMode = methodToMode(httpMethod, true);
const dbKey = dbKeyPrefix.pipe[privateMode] + parseKey(publicKey).random;
const fromDB = await redisData.lpop(dbKey);
const timeNow = Math.round(Date.now()/1000);
if (fromDB) {
const [token, timestamp] = fromDB.split('@');
// Expired unused tokens are possible as adding new tokens refreshes the expiry of the entire list
// Return token if it is not expired
// Because of LIFO, if the first-out token is expired, the entire list is expired, so pop those off
if ((timeNow - timestamp) < streamTimeout) {
return pipingServerURL + token;
} else {
const expiredTokens = await redisData.lpop(dbKey, maxStreamCount);
plumbDefunctPipes([token+'@'+timestamp,...expiredTokens], privateMode);
}
}
}

export function OneSignalID(app){
Expand Down
103 changes: 68 additions & 35 deletions api/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ const callInsufficientStorage = function(reply, msg){
reply.code(507).send({message: msg, error: "Insufficient Storage", statusCode: reply.statusCode});
}

const callMethodNotAllowed = function(reply, allowedMethods, msg){
reply.code(405)
.header('Allow', allowedMethods)
.send({message: msg, error: "Method Not Allowed", statusCode: reply.statusCode});
}

fastify.get('/keys', (request, reply) => {
reply.send(helper.genKeyPair());
})
Expand All @@ -68,11 +74,11 @@ fastify.get('/keys/:key', (request, reply) => {
}
})

fastify.post('/public/:publicKey', async (request, reply) => {
const { publicKey } = request.params;
fastify.post('/public/:publicKey/:channel?', async (request, reply) => {
const { publicKey, channel } = request.params;
const redirectOnOk = request.query.ok;
const redirectOnErr = request.query.err;

let payload = request.body;
try {
if (helper.parseKey(publicKey, { validate: false }).type !== 'public') throw new Error('Unauthorized');

Expand All @@ -85,11 +91,20 @@ fastify.post('/public/:publicKey', async (request, reply) => {
'x-vercel-ip-country': country,
'x-vercel-ip-country-region': region,
'x-vercel-ip-city': city,
'x-real-ip': ip
'x-real-ip': ip,
'content-length': bodySize
} = request.headers;
if (country || region || city) meta.geolocation = [country, region, city].join('/');
if (ip) meta.ip = ip;
const data = helper.decoratePayload(request.body, meta);
if (channel) {
meta.channel = channel;
// If request doesn't have a body, use the value stored in channel(key) instead
if (parseInt(bodySize) === 0) {
const channelData = await helper.oneToOneConsume(publicKey, channel);
payload = channelData?.data ?? {};
}
}
const data = helper.decoratePayload(payload, meta);

// Try posting the data to webhook, if any, with timeout.
// On fail, store data and send web-push notifications to owner.
Expand Down Expand Up @@ -366,38 +381,56 @@ fastify.get('/private/:privateKey/:key', async (request, reply) => {
}
})

const streamHandler = async (request, reply) => {
const { key } = request.params;
try {
let recvBool;
switch (request.method) {
case 'POST': // Using fallthrough! POST and PUT cases run the same code.
case 'PUT':
recvBool = false;
break;
case 'HEAD': // HEAD and GET handled similarly
case 'GET':
recvBool = true;
break;
default:
throw new Error('Unsupported Method');
}
const token = await helper.streamToken(key, recvBool);
reply.redirect('https://ppng.io/' + token, 307);
} catch (err) {
if (err.message == 'Unauthorized') {
callUnauthorized(reply, 'This path is for internal use only');
} else if (err.message == 'Invalid Key') {
callBadRequest(reply, 'Provided key is invalid');
} else if (err.message == 'Unsupported Method') {
callBadRequest(reply, 'Unsupported method');
} else {
callInternalServerError(reply, err.message);
fastify.all('/private/:privateKey.pipe', async (request, reply) => {
const { privateKey } = request.params;
const pipeFail = request.query.fail;
try {
if (helper.parseKey(privateKey, { validate: false }).type !== 'private') throw new Error('Unauthorized');
const pipeURL = await helper.pipeToPublic(privateKey, request.method);
if (pipeFail) waitUntil(helper.cacheSet(privateKey, { pipeFail }));
reply.redirect(pipeURL, 307);
} catch (err) {
if (err.message == 'Unauthorized') {
callUnauthorized(reply, 'Provided key is not Private');
} else if (err.message === 'Invalid Key') {
callBadRequest(reply, 'Provided key is invalid');
} else if (err.message === 'Method Not Allowed') {
callMethodNotAllowed(reply, 'GET,POST,PUT', 'Provided method is not allowed for piping');
} else {
callInternalServerError(reply, err.message);
}
}
}
}
});

fastify.all('/pipe/:key', streamHandler);
fastify.all('/public/:publicKey.pipe', async (request, reply) => {
const { publicKey } = request.params;
try {
if (helper.parseKey(publicKey, { validate: false }).type !== 'public') throw new Error('Unauthorized');
const pipeURL = await helper.pipeToPrivate(publicKey, request.method);
if (pipeURL) {
reply.redirect(pipeURL, 307);
} else {
const page404 = await helper.cacheGet(publicKey, 'pipeFail');
if (page404) {
reply.redirect(page404, 303);
} else {
throw new Error('No Data');
}
}
} catch (err) {
if (err.message == 'Unauthorized') {
callUnauthorized(reply, 'Provided key is not Private');
} else if (err.message === 'Invalid Key') {
callBadRequest(reply, 'Provided key is invalid');
} else if (err.message === 'No Data') {
reply.callNotFound();
} else if (err.message === 'Method Not Allowed') {
callMethodNotAllowed(reply, 'GET,POST,PUT', 'Provided method is not allowed for piping');
} else {
callInternalServerError(reply, err.message);
}
}
});

export default async function handler(req, res) {
await fastify.ready();
Expand Down
4 changes: 2 additions & 2 deletions build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ echo "${id}" > "${build_dir}/id.txt"
cat > "${build_dir}/properties.json" <<-EOF
{
"versions": {
"specification": "0.0",
"implementation": "0.0.0"
"specification": "0.1",
"implementation": "0.0.1"
},
"id": "${id}",
"OneSignalAppId": {
Expand Down
3 changes: 3 additions & 0 deletions example.env
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ CDN_TTL= 30 # TTL for CDN in days
WEBHOOK_TIMEOUT=4000 # Request timeout for POST to webhook, in milliseconds
STREAM_TIMEOUT=60 # TTL of stream/pipe tokens in seconds
MAX_STREAM_COUNT=5 # Max number of stream/pipe tokens in memory
DEFUNCT_PIPE_PLUMB_TIMEOUT=2000 # in milliseconds

PIPING_SERVER_URL='https://ppng.io/' # or, https://httprelay.io/

# Redis credentials to be used for the main database
UPSTASH_REDIS_REST_URL_MAIN=
Expand Down
2 changes: 1 addition & 1 deletion middleware.js
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ export default async function middleware (request) {
if (request.headers.has('expect')) return errorResponse(417, 'Expect header is not allowed');

// Detect a pipe request to let it have any Content-Type and Length, including `Transfer-Encoding: chunked` header
const isPiped = new URL(request.url).pathname.startsWith('/pipe/');
const isPiped = new URL(request.url).pathname.endsWith('.pipe');

// Block unallowed methods and chunked transfer if not pipe
if (!isPiped) {
Expand Down
14 changes: 13 additions & 1 deletion vercel.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,19 @@
],
"rewrites": [
{
"source": "/(public|private|keys|pipe)/:path*",
"source": "/keys",
"destination": "/api"
},
{
"source": "/(public|private|keys)/([\\w-]+)",
"destination": "/api"
},
{
"source": "/(public|private)/([\\w-]+)/([\\w-]+)",
"destination": "/api"
},
{
"source": "/(public|private)/([\\w-]+).pipe",
"destination": "/api"
},
{
Expand Down