Skip to content

Commit 71b8201

Browse files
authored
Pipe/stream (#3)
* Experimental: added /stream/:key path * Stream tokens stored as LIFO. Excess tokens trimmed. Unnecessary await omitted. * pipe * stream path is rate-limited if not requested by middleware * STREAM_COUNT => MAX_STREAM_COUNT
1 parent d86b990 commit 71b8201

File tree

7 files changed

+112
-10
lines changed

7 files changed

+112
-10
lines changed

api/helper.js

Lines changed: 37 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,24 @@ const sigLen = parseInt(process.env.SIG_LEN);
1212
const hashLen = parseInt(process.env.HASH_LEN);
1313
const ttl = parseInt(process.env.TTL);
1414
const cacheTtl = parseInt(process.env.CACHE_TTL);
15+
const streamTimeout = parseInt(process.env.STREAM_TIMEOUT);
16+
const maxStreamCount = parseInt(process.env.MAX_STREAM_COUNT);
17+
1518
const dbKeyPrefix = {
1619
manyToOne: "m2o:",
1720
oneToMany: "o2m:",
1821
oneToOne: "o2o:",
19-
cache: "cache:"
22+
cache: "cache:",
23+
stream: {
24+
public: {
25+
send: "pipePubSend:",
26+
receive: "pipePubRecv:"
27+
},
28+
private: {
29+
send: "pipePrivSend:",
30+
receive: "pipePrivRecv:"
31+
}
32+
}
2033
}
2134
// Redis client for user database
2235
const redisData = new Redis({
@@ -44,9 +57,9 @@ function sign(str){
4457
return createHmac('md5', secret).update(str).digest('base64url').substring(0,sigLen);
4558
}
4659

47-
//Brief: Return random base64url string of given length
60+
// Brief: Return random base64url string of given length
4861
function randStr(len = hashLen){
49-
const byteSize = Math.ceil(len*6/8);
62+
const byteSize = Math.ceil(len*6/8); // base64 char is 6 bit, byte is 8
5063
const buff = Buffer.alloc(byteSize);
5164
getRandomValues(buff);
5265
return buff.toString('base64url').substring(0,len);
@@ -56,14 +69,15 @@ export function id(){
5669
return sign('id');
5770
}
5871

59-
export function validate(key){
72+
export function validate(key, silent=true){
6073
const sig = key.substring(0, sigLen);
6174
const hash = key.substring(sigLen);
6275
if (sig === sign(hash + 'public')){
6376
return 'public';
6477
} else if (sig === sign(hash + 'private')){
6578
return 'private';
6679
} else {
80+
if (!silent) throw new Error('Invalid Key');
6781
return false;
6882
}
6983
}
@@ -199,3 +213,22 @@ export async function oneToOneTTL(privateKey, key){
199213
])
200214
return {ttl: bool ? ttl : 0};
201215
}
216+
217+
// Tokens are stored in LIFO stacks. Old and unused tokens are trimmed.
218+
export async function streamToken(privateOrPublicKey, receive=true){
219+
const type = validate(privateOrPublicKey, false);
220+
const typeComplement = (type == 'private') ? 'public' : 'private';
221+
const publicKey = genPublicKey(privateOrPublicKey);
222+
const mode = receive ? "receive" : "send";
223+
const modeComplement = receive ? "send" : "receive";
224+
const existing = await redisData.lpop(dbKeyPrefix.stream[typeComplement][modeComplement] + publicKey);
225+
if (existing) return existing;
226+
const token = randStr();
227+
const dbKey = dbKeyPrefix.stream[type][mode] + publicKey;
228+
Promise.all([
229+
redisData.lpush(dbKey, token),
230+
redisData.expire(dbKey, streamTimeout),
231+
redisData.ltrim(dbKey, 0, maxStreamCount - 1)
232+
])
233+
return token;
234+
}

api/index.js

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ const fastify = Fastify({
1515
fastify.addHook('onRequest', (request, reply, done) => {
1616
reply.headers({
1717
'Access-Control-Allow-Origin': '*',
18-
'Access-Control-Allow-Methods': 'GET,POST,DELETE'
18+
'Access-Control-Allow-Methods': 'GET,POST,PATCH,DELETE'
1919
})
2020
done();
2121
})
@@ -76,6 +76,7 @@ fastify.post('/public/:publicKey', async (request, reply) => {
7676

7777
await fetch(webhook, {
7878
method: "POST",
79+
redirect: "follow",
7980
headers: { "Content-type": "application/json" },
8081
body: data,
8182
signal: AbortSignal.timeout(webhookTimeout)
@@ -290,6 +291,36 @@ fastify.get('/private/:privateKey/:key', async (request, reply) => {
290291
}
291292
})
292293

294+
const streamHandler = async (request, reply) => {
295+
const { key } = request.params;
296+
try {
297+
let recvBool;
298+
switch (request.method) {
299+
case 'POST': // Using fallthrough! POST and PUT cases run the same code.
300+
case 'PUT':
301+
recvBool = false;
302+
break;
303+
case 'GET':
304+
recvBool = true;
305+
break;
306+
default:
307+
throw new Error('Unsupported Method');
308+
}
309+
const token = await helper.streamToken(key, recvBool);
310+
reply.redirect('https://ppng.io/' + token, 307);
311+
} catch (err) {
312+
if (err.message == 'Invalid Key') {
313+
callUnauthorized(reply, 'Provided key is invalid');
314+
} else if (err.message == 'Unsupported Method') {
315+
callBadRequest(reply, 'Unsupported method');
316+
} else {
317+
callInternalServerError(reply, err);
318+
}
319+
}
320+
}
321+
322+
fastify.all('/stream/:key', streamHandler);
323+
293324
export default async function handler(req, res) {
294325
await fastify.ready();
295326
fastify.server.emit('request', req, res);

api/test.js

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,12 @@ console.log('This should show public: ' + helper.validate(key.public));
1414
console.log('This should show private: ' + helper.validate(key.private));
1515
console.log('This should show false: ' + helper.validate('random'));
1616

17+
console.log('Stream token private POST:', await helper.streamToken(key.private, false));
18+
console.log('Stream token public GET:', await helper.streamToken(key.public));
19+
console.log('Stream token public POST:', await helper.streamToken(key.public, false));
20+
console.log('Stream token public POST:', await helper.streamToken(key.public, false));
21+
console.log('Stream token private GET:', await helper.streamToken(key.private, true));
22+
1723
await helper.publicProduce(key.public, 'dataA hi"');
1824
await helper.publicProduce(key.public, 'dataB hi"');
1925
await helper.publicProduce(key.public, 'dataC hi"');

example.env

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ RATELIMIT=20 # Max number of requests within following time period
1414
RATELIMIT_WINDOW=300
1515
CACHE_TTL=86400 # TTL for cache in seconds
1616
WEBHOOK_TIMEOUT=4000 # Request timeout for POST to webhook, in milliseconds
17+
STREAM_TIMEOUT=60 # TTL of stream/pipe tokens in seconds
18+
MAX_STREAM_COUNT=5 # Max number of stream/pipe tokens in memory
1719

1820
# Redis credentials to be used for rate-limiting
1921
KV_REST_API_URL=

middleware.js

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,12 @@ import { next } from '@vercel/edge';
1212
import { Ratelimit } from '@upstash/ratelimit';
1313
import { Redis } from '@upstash/redis';
1414

15+
export const config = {
16+
matcher: ['/public/:path*', '/private/:path*', '/keys/:path*', '/pipe/:path*', '/stream/:path*']
17+
};
18+
19+
const middlewareSig = process.env.SECRET; // Secret known to middleware only
20+
1521
const cache = new Map(); // must be outside of your serverless function handler
1622

1723
const ratelimit = new Ratelimit({
@@ -26,17 +32,40 @@ const ratelimit = new Ratelimit({
2632
enableProtection: true
2733
})
2834

35+
// Forwards requests at path /pipe/* to /stream/* in index.js after trimming the request body and headers
36+
// and returns the response. This is done in middleware.js because, unlike index.js, middleware.js doesn't have
37+
// problems with the `Expect: 100-continue` header sent by `curl -T- <url>`. middleware.js also doesn't read the
38+
// request body. index.js also has a set bodyLimit which is incompatible with payloads at /pipe/* of arbitrary size.
39+
// /stream/ path is exposed only to middleware.
40+
// For requests not at path /pipe/*, returns null.
41+
async function pipeToStream(request) {
42+
const pipeUrl = new URL(request.url);
43+
if (!pipeUrl.pathname.startsWith('/pipe/')) return null;
44+
const streamUrl = pipeUrl.href.replace('/pipe/', '/stream/');
45+
return fetch(streamUrl, { method: request.method, headers: { 'x-middleware' : middlewareSig }, redirect: 'manual' });
46+
}
47+
2948
export default async function middleware(request) {
30-
// You could alternatively limit based on user ID or similar
49+
const fromMiddleware = request.headers.get('x-middleware') === middlewareSig;
50+
51+
// You could alternatively rate-limit based on user ID or similar
3152
const ip = ipAddress(request) || '127.0.0.1';
32-
const { success, reset } = await ratelimit.limit(ip);
53+
// For requests from middleware, no rate-limiting is necessary
54+
const { success, reset } = fromMiddleware ? { success: true, reset: 0 } : await ratelimit.limit(ip);
3355

34-
return success ? next() : Response.json(
56+
if (success) {
57+
const streamResponse = await pipeToStream(request);
58+
// For non-pipe requests, streamResponse is null.
59+
return streamResponse ?? next();
60+
}
61+
else {
62+
return Response.json(
3563
{ message: `Try after ${(reset - Date.now())/1000} seconds`, error: "Too Many Requests", statusCode: 429 },
3664
{
3765
status: 429,
3866
statusText: "Too Many Requests",
3967
headers: {"Access-Control-Allow-Origin":"*"}
4068
},
4169
)
70+
}
4271
}

package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
},
1212
"type": "module",
1313
"scripts": {
14-
"test": "node --env-file=.env api/test.js"
14+
"test": "node --env-file=.env api/test.js",
15+
"lint": "semistandard --fix"
1516
}
1617
}

vercel.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
],
2020
"rewrites": [
2121
{
22-
"source": "/(public|private|keys)/:path*",
22+
"source": "/(public|private|keys|stream)/:path*",
2323
"destination": "/api"
2424
},
2525
{

0 commit comments

Comments
 (0)