Skip to content

Commit c9cc839

Browse files
authored
Feature update: pipe (#7)
* Pipe is now called with .pipe extension to public or private path instead of the /pipe prefix * Private pipe must precede public * Defunct pipes plumbed under timeout * Features: - private pipe accepts ?fail= query string to pass 404 page url - public posting to /channel now enabled. Adds channel to metadata - if request is bodyless, use value stored @ private/key/channel as value * Bump version * Refactor
1 parent f718493 commit c9cc839

File tree

6 files changed

+157
-67
lines changed

6 files changed

+157
-67
lines changed

api/_utils.js

Lines changed: 70 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import { Buffer } from "node:buffer";
88
import { Redis } from '@upstash/redis';
99
import { Octokit } from '@octokit/core';
1010
import { createOrUpdateTextFile } from '@octokit/plugin-create-or-update-text-file';
11+
import { waitUntil } from '@vercel/functions';
1112

1213
const secret = process.env.SECRET;
1314
const sigLen = parseInt(process.env.SIG_LEN);
@@ -19,21 +20,17 @@ const streamTimeout = parseInt(process.env.STREAM_TIMEOUT);
1920
const maxStreamCount = parseInt(process.env.MAX_STREAM_COUNT);
2021
const maxPublicPostCount = parseInt(process.env.MAX_PUBLIC_POSTS_RETAINED);
2122
const maxFieldsCount = parseInt(process.env.MAX_PRIVATE_POST_FIELDS);
23+
const pipingServerURL = process.env.PIPING_SERVER_URL;
24+
const defunctPipePlumbTimeout = parseInt(process.env.DEFUNCT_PIPE_PLUMB_TIMEOUT);
2225

2326
const dbKeyPrefix = {
2427
manyToOne: "m2o:",
2528
oneToMany: "o2m:",
2629
oneToOne: "o2o:",
2730
cache: "cache:",
28-
stream: {
29-
public: {
30-
send: "pipePubSend:",
31-
receive: "pipePubRecv:"
32-
},
33-
private: {
34-
send: "pipePrivSend:",
35-
receive: "pipePrivRecv:"
36-
}
31+
pipe: {
32+
send: "pipeSend:",
33+
receive: "pipeRecv:"
3734
}
3835
}
3936

@@ -279,31 +276,76 @@ export async function oneToOneTTL(privateKey, key){
279276
return {ttl: bool ? ttl : 0};
280277
}
281278

282-
// Tokens are stored in LIFO stacks. Old and unused tokens are trimmed.
283-
// Timestamps (Unix-time in seconds) are stored with the tokens using string concatenation
284-
export async function streamToken(privateOrPublicKey, receive=true){
285-
const type = parseKey(privateOrPublicKey).type;
286-
const typeComplement = (type == 'private') ? 'public' : 'private';
287-
const publicKey = genPublicKey(privateOrPublicKey, { validate: false });
288-
const publicKeyRandom = parseKey(publicKey, { validate: false }).random;
289-
const mode = receive ? "receive" : "send";
290-
const modeComplement = receive ? "send" : "receive";
291-
const existing = await redisData.lpop(dbKeyPrefix.stream[typeComplement][modeComplement] + publicKeyRandom);
292-
const timeNow = Math.round(Date.now()/1000);
293-
if (existing) {
294-
const [token, timestamp] = existing.split('@');
295-
// Expired unused tokens are possible as adding new tokens refreshes the expiry of the entire list (see below).
296-
// Return token that is not expired.
297-
if ((timeNow - timestamp) < streamTimeout) return token;
279+
// Whether provided http method means 'send' or 'receive' mode.
280+
// If option `complement` is true, returns the complementary mode instead.
281+
function methodToMode(method, complement=false){
282+
const table = { send: ['POST', 'PUT'], receive: ['GET', 'HEAD'] };
283+
const mode = Object.keys(table).find((key) => {
284+
return table[key].includes(method) === !complement;
285+
})
286+
if (!mode) throw new Error('Method Not Allowed');
287+
return mode;
288+
}
289+
290+
// Plumbs defunct private pipes with bodyless http-requests under timeout
291+
function plumbDefunctPipes(tokens, privateMode){
292+
// If private mode is sending, plumb with 'GET'.
293+
// Plumb with 'POST', otherwise.
294+
const table = {send: 'GET', receive: 'POST'};
295+
if (Boolean(tokens?.length) === false) return;
296+
const method = table[privateMode];
297+
const opts = {
298+
method,
299+
body: '',
300+
signal: AbortSignal.timeout(defunctPipePlumbTimeout)
298301
}
302+
tokens.forEach((el) => {
303+
const [ token, ] = el.split('@');
304+
waitUntil(fetch(pipingServerURL + token, opts).catch((err) => {}));
305+
})
306+
}
307+
308+
// streamTimeout simply guarantees that a pipe, if plumbed, will be plumbed within that time period.
309+
// Tokens are stored in LIFO stacks of finite max length.
310+
// New tokens evict old ones, once stack is filled.
311+
// Pipes corresponding to evicted tokens are never discovered, i.e. are defunct.
312+
// Defunct pipes are plumbed later with bodyless pipes: see plumbDefunctPipes().
313+
// Timestamps (Unix-time in seconds) are stored with the tokens using string concatenation.
314+
export async function pipeToPublic(privateKey, httpMethod){
315+
const publicKey = genPublicKey(privateKey);
316+
const privateMode = methodToMode(httpMethod);
317+
const dbKey = dbKeyPrefix.pipe[privateMode] + parseKey(publicKey, { validate: false }).random;
299318
const token = randStr();
300-
const dbKey = dbKeyPrefix.stream[type][mode] + publicKeyRandom;
319+
const timeNow = Math.round(Date.now()/1000);
301320
const [ count, ] = await Promise.all([
302321
redisData.lpush(dbKey, token + '@' + timeNow),
303322
redisData.expire(dbKey, streamTimeout)
304323
])
305-
if (count > maxStreamCount) await redisData.ltrim(dbKey, 0, maxStreamCount - 1);
306-
return token;
324+
if (count > maxStreamCount) {
325+
const expiredTokens = await redisData.rpop(dbKey, count - maxStreamCount);
326+
plumbDefunctPipes(expiredTokens, privateMode);
327+
}
328+
return pipingServerURL + token;
329+
}
330+
331+
// Expired, unused tokens imply defunct pipes (see above).
332+
export async function pipeToPrivate(publicKey, httpMethod){
333+
const privateMode = methodToMode(httpMethod, true);
334+
const dbKey = dbKeyPrefix.pipe[privateMode] + parseKey(publicKey).random;
335+
const fromDB = await redisData.lpop(dbKey);
336+
const timeNow = Math.round(Date.now()/1000);
337+
if (fromDB) {
338+
const [token, timestamp] = fromDB.split('@');
339+
// Expired unused tokens are possible as adding new tokens refreshes the expiry of the entire list
340+
// Return token if it is not expired
341+
// Because of LIFO, if the first-out token is expired, the entire list is expired, so pop those off
342+
if ((timeNow - timestamp) < streamTimeout) {
343+
return pipingServerURL + token;
344+
} else {
345+
const expiredTokens = await redisData.lpop(dbKey, maxStreamCount);
346+
plumbDefunctPipes([token+'@'+timestamp,...expiredTokens], privateMode);
347+
}
348+
}
307349
}
308350

309351
export function OneSignalID(app){

api/index.js

Lines changed: 68 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,12 @@ const callInsufficientStorage = function(reply, msg){
4949
reply.code(507).send({message: msg, error: "Insufficient Storage", statusCode: reply.statusCode});
5050
}
5151

52+
const callMethodNotAllowed = function(reply, allowedMethods, msg){
53+
reply.code(405)
54+
.header('Allow', allowedMethods)
55+
.send({message: msg, error: "Method Not Allowed", statusCode: reply.statusCode});
56+
}
57+
5258
fastify.get('/keys', (request, reply) => {
5359
reply.send(helper.genKeyPair());
5460
})
@@ -68,11 +74,11 @@ fastify.get('/keys/:key', (request, reply) => {
6874
}
6975
})
7076

71-
fastify.post('/public/:publicKey', async (request, reply) => {
72-
const { publicKey } = request.params;
77+
fastify.post('/public/:publicKey/:channel?', async (request, reply) => {
78+
const { publicKey, channel } = request.params;
7379
const redirectOnOk = request.query.ok;
7480
const redirectOnErr = request.query.err;
75-
81+
let payload = request.body;
7682
try {
7783
if (helper.parseKey(publicKey, { validate: false }).type !== 'public') throw new Error('Unauthorized');
7884

@@ -85,11 +91,20 @@ fastify.post('/public/:publicKey', async (request, reply) => {
8591
'x-vercel-ip-country': country,
8692
'x-vercel-ip-country-region': region,
8793
'x-vercel-ip-city': city,
88-
'x-real-ip': ip
94+
'x-real-ip': ip,
95+
'content-length': bodySize
8996
} = request.headers;
9097
if (country || region || city) meta.geolocation = [country, region, city].join('/');
9198
if (ip) meta.ip = ip;
92-
const data = helper.decoratePayload(request.body, meta);
99+
if (channel) {
100+
meta.channel = channel;
101+
// If request doesn't have a body, use the value stored in channel(key) instead
102+
if (parseInt(bodySize) === 0) {
103+
const channelData = await helper.oneToOneConsume(publicKey, channel);
104+
payload = channelData?.data ?? {};
105+
}
106+
}
107+
const data = helper.decoratePayload(payload, meta);
93108

94109
// Try posting the data to webhook, if any, with timeout.
95110
// On fail, store data and send web-push notifications to owner.
@@ -366,38 +381,56 @@ fastify.get('/private/:privateKey/:key', async (request, reply) => {
366381
}
367382
})
368383

369-
const streamHandler = async (request, reply) => {
370-
const { key } = request.params;
371-
try {
372-
let recvBool;
373-
switch (request.method) {
374-
case 'POST': // Using fallthrough! POST and PUT cases run the same code.
375-
case 'PUT':
376-
recvBool = false;
377-
break;
378-
case 'HEAD': // HEAD and GET handled similarly
379-
case 'GET':
380-
recvBool = true;
381-
break;
382-
default:
383-
throw new Error('Unsupported Method');
384-
}
385-
const token = await helper.streamToken(key, recvBool);
386-
reply.redirect('https://ppng.io/' + token, 307);
387-
} catch (err) {
388-
if (err.message == 'Unauthorized') {
389-
callUnauthorized(reply, 'This path is for internal use only');
390-
} else if (err.message == 'Invalid Key') {
391-
callBadRequest(reply, 'Provided key is invalid');
392-
} else if (err.message == 'Unsupported Method') {
393-
callBadRequest(reply, 'Unsupported method');
394-
} else {
395-
callInternalServerError(reply, err.message);
384+
fastify.all('/private/:privateKey.pipe', async (request, reply) => {
385+
const { privateKey } = request.params;
386+
const pipeFail = request.query.fail;
387+
try {
388+
if (helper.parseKey(privateKey, { validate: false }).type !== 'private') throw new Error('Unauthorized');
389+
const pipeURL = await helper.pipeToPublic(privateKey, request.method);
390+
if (pipeFail) waitUntil(helper.cacheSet(privateKey, { pipeFail }));
391+
reply.redirect(pipeURL, 307);
392+
} catch (err) {
393+
if (err.message == 'Unauthorized') {
394+
callUnauthorized(reply, 'Provided key is not Private');
395+
} else if (err.message === 'Invalid Key') {
396+
callBadRequest(reply, 'Provided key is invalid');
397+
} else if (err.message === 'Method Not Allowed') {
398+
callMethodNotAllowed(reply, 'GET,POST,PUT', 'Provided method is not allowed for piping');
399+
} else {
400+
callInternalServerError(reply, err.message);
401+
}
396402
}
397-
}
398-
}
403+
});
399404

400-
fastify.all('/pipe/:key', streamHandler);
405+
fastify.all('/public/:publicKey.pipe', async (request, reply) => {
406+
const { publicKey } = request.params;
407+
try {
408+
if (helper.parseKey(publicKey, { validate: false }).type !== 'public') throw new Error('Unauthorized');
409+
const pipeURL = await helper.pipeToPrivate(publicKey, request.method);
410+
if (pipeURL) {
411+
reply.redirect(pipeURL, 307);
412+
} else {
413+
const page404 = await helper.cacheGet(publicKey, 'pipeFail');
414+
if (page404) {
415+
reply.redirect(page404, 303);
416+
} else {
417+
throw new Error('No Data');
418+
}
419+
}
420+
} catch (err) {
421+
if (err.message == 'Unauthorized') {
422+
callUnauthorized(reply, 'Provided key is not Private');
423+
} else if (err.message === 'Invalid Key') {
424+
callBadRequest(reply, 'Provided key is invalid');
425+
} else if (err.message === 'No Data') {
426+
reply.callNotFound();
427+
} else if (err.message === 'Method Not Allowed') {
428+
callMethodNotAllowed(reply, 'GET,POST,PUT', 'Provided method is not allowed for piping');
429+
} else {
430+
callInternalServerError(reply, err.message);
431+
}
432+
}
433+
});
401434

402435
export default async function handler(req, res) {
403436
await fastify.ready();

build.sh

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@ echo "${id}" > "${build_dir}/id.txt"
1313
cat > "${build_dir}/properties.json" <<-EOF
1414
{
1515
"versions": {
16-
"specification": "0.0",
17-
"implementation": "0.0.0"
16+
"specification": "0.1",
17+
"implementation": "0.0.1"
1818
},
1919
"id": "${id}",
2020
"OneSignalAppId": {

example.env

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@ CDN_TTL= 30 # TTL for CDN in days
1818
WEBHOOK_TIMEOUT=4000 # Request timeout for POST to webhook, in milliseconds
1919
STREAM_TIMEOUT=60 # TTL of stream/pipe tokens in seconds
2020
MAX_STREAM_COUNT=5 # Max number of stream/pipe tokens in memory
21+
DEFUNCT_PIPE_PLUMB_TIMEOUT=2000 # in milliseconds
22+
23+
PIPING_SERVER_URL='https://ppng.io/' # or, https://httprelay.io/
2124

2225
# Redis credentials to be used for the main database
2326
UPSTASH_REDIS_REST_URL_MAIN=

middleware.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ export default async function middleware (request) {
7979
if (request.headers.has('expect')) return errorResponse(417, 'Expect header is not allowed');
8080

8181
// Detect a pipe request to let it have any Content-Type and Length, including `Transfer-Encoding: chunked` header
82-
const isPiped = new URL(request.url).pathname.startsWith('/pipe/');
82+
const isPiped = new URL(request.url).pathname.endsWith('.pipe');
8383

8484
// Block unallowed methods and chunked transfer if not pipe
8585
if (!isPiped) {

vercel.json

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,19 @@
2020
],
2121
"rewrites": [
2222
{
23-
"source": "/(public|private|keys|pipe)/:path*",
23+
"source": "/keys",
24+
"destination": "/api"
25+
},
26+
{
27+
"source": "/(public|private|keys)/([\\w-]+)",
28+
"destination": "/api"
29+
},
30+
{
31+
"source": "/(public|private)/([\\w-]+)/([\\w-]+)",
32+
"destination": "/api"
33+
},
34+
{
35+
"source": "/(public|private)/([\\w-]+).pipe",
2436
"destination": "/api"
2537
},
2638
{

0 commit comments

Comments
 (0)