Skip to content

Commit 77d1687

Browse files
committed
Optimization using auto-pipeling @ upstash-redis and Promise.all()
1 parent 5092396 commit 77d1687

File tree

4 files changed

+74
-34
lines changed

4 files changed

+74
-34
lines changed

api/helper.js

Lines changed: 47 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
1+
/*
2+
Refs:
3+
https://upstash.com/docs/redis/sdks/ts/pipelining/pipeline-transaction
4+
https://upstash.com/docs/redis/sdks/ts/pipelining/auto-pipeline
5+
*/
16
import Crypto from 'node:crypto';
2-
import { createClient } from '@vercel/kv';
7+
import { Redis } from '@upstash/redis';
38

49
const secret = process.env.SECRET;
510
const sigLen = parseInt(process.env.SIG_LEN);
@@ -13,14 +18,18 @@ const dbKeyPrefix = {
1318
cache: "cache:"
1419
}
1520
// Redis client for user database
16-
const redisData = createClient({
21+
const redisData = new Redis({
1722
url: process.env.UPSTASH_REDIS_REST_URL,
1823
token: process.env.UPSTASH_REDIS_REST_TOKEN,
24+
latencyLogging: false,
25+
enableAutoPipelining: true
1926
})
2027
// Redis client for ratelimiter database
21-
const redisRateLimit = createClient({
28+
const redisRateLimit = new Redis({
2229
url: process.env.KV_REST_API_URL,
2330
token: process.env.KV_REST_API_TOKEN,
31+
latencyLogging: false,
32+
enableAutoPipelining: true
2433
})
2534

2635
function hash(str){
@@ -61,7 +70,13 @@ export function genPublicKey(privateOrPublicKey){
6170
export function cacheSet(privateKey, obj){
6271
const publicKey = genPublicKey(privateKey);
6372
const dbKey = dbKeyPrefix.cache + publicKey;
64-
return redisRateLimit.hset(dbKey, obj).then(redisRateLimit.expire(dbKey, cacheTtl));
73+
// Promise.all below enables both commands to be executed in a single http request (using same pipeline)
74+
// As Redis is single-threaded, the commands are executed in order
75+
// See https://upstash.com/docs/redis/sdks/ts/pipelining/auto-pipeline
76+
return Promise.all([
77+
redisRateLimit.hset(dbKey, obj),
78+
redisRateLimit.expire(dbKey, cacheTtl)
79+
])
6580
}
6681

6782
export function cacheGet(publicKey, key){
@@ -84,15 +99,20 @@ export function genKeyPair(seed = Crypto.randomUUID()){
8499

85100
export async function publicProduce(publicKey, data){
86101
const dbKey = dbKeyPrefix.manyToOne + publicKey;
87-
return redisData.rpush(dbKey, data).then(redisData.expire(dbKey, ttl));
102+
return Promise.all([
103+
redisData.rpush(dbKey, data),
104+
redisData.expire(dbKey, ttl)
105+
])
88106
}
89107

90108
export async function privateConsume(privateKey){
91109
const publicKey = genPublicKey(privateKey);
92110
const dbKey = dbKeyPrefix.manyToOne + publicKey;
93-
const llen = await redisData.llen(dbKey);
94-
if (!llen) return [];
95-
return redisData.lpop(dbKey, llen);
111+
const atomicTransaction = redisData.multi();
112+
atomicTransaction.lrange(dbKey, 0, -1);
113+
atomicTransaction.del(dbKey);
114+
return atomicTransaction.exec()
115+
.then((values) => values[0]);
96116
}
97117

98118
export async function privateProduce(privateKey, data){
@@ -117,9 +137,11 @@ export async function privateStats(privateKey){
117137
const publicKey = genPublicKey(privateKey);
118138
const dbKeyConsume = dbKeyPrefix.manyToOne + publicKey;
119139
const dbKeyPublish = dbKeyPrefix.oneToMany + publicKey;
120-
const countConsume = await redisData.llen(dbKeyConsume);
121-
const ttlConsume = await redisData.ttl(dbKeyConsume);
122-
const ttlPublish = await redisData.ttl(dbKeyPublish);
140+
const [ countConsume, ttlConsume, ttlPublish ] = await Promise.all([
141+
redisData.llen(dbKeyConsume),
142+
redisData.ttl(dbKeyConsume),
143+
redisData.ttl(dbKeyPublish)
144+
])
123145
return {
124146
consume: {
125147
count: countConsume,
@@ -141,23 +163,29 @@ export async function oneToOneProduce(privateKey, key, data){
141163
const dbKey = dbKeyPrefix.oneToOne + publicKey;
142164
let field = {};
143165
field[key] = data;
144-
return redisData.hset(dbKey, field).then(redisData.expire(dbKey, ttl));
166+
return Promise.all([
167+
redisData.hset(dbKey, field),
168+
redisData.expire(dbKey, ttl)
169+
])
145170
}
146171

147172
export async function oneToOneConsume(publicKey, key){
148173
const dbKey = dbKeyPrefix.oneToOne + publicKey;
149174
const field = key;
150-
return redisData.hget(dbKey, field).then(redisData.hdel(dbKey, field));
175+
const atomicTransaction = redisData.multi();
176+
atomicTransaction.hget(dbKey, field);
177+
atomicTransaction.hdel(dbKey, field);
178+
return atomicTransaction.exec()
179+
.then((values) => values[0]);
151180
}
152181

153182
export async function oneToOneTTL(privateKey, key){
154183
const publicKey = genPublicKey(privateKey);
155184
const dbKey = dbKeyPrefix.oneToOne + publicKey;
156185
const field = key;
157-
const bool = await redisData.hexists(dbKey, field);
158-
if (bool) {
159-
return {ttl: await redisData.ttl(dbKey)};
160-
} else {
161-
return {ttl: 0};
162-
}
186+
const [ bool, ttl ] = await Promise.all([
187+
redisData.hexists(dbKey, field),
188+
redisData.ttl(dbKey)
189+
])
190+
return {ttl: bool ? ttl : 0};
163191
}

api/index.js

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ fastify.post('/public/:publicKey', async (request, reply) => {
7272
// Try posting the data to webhook, if any, with timeout. On fail, store/bin data for later retrieval.
7373
try {
7474
const webhook = await helper.cacheGet(publicKey, 'hook');
75-
if (webhook == null) throw new Error('No webhook');
75+
if (!webhook) throw new Error('No webhook');
7676

7777
await fetch(webhook, {
7878
method: "POST",
@@ -86,9 +86,10 @@ fastify.post('/public/:publicKey', async (request, reply) => {
8686

8787
webhookUsed = webhook;
8888
} catch (err) {
89-
await helper.publicProduce(publicKey, data);
90-
// Delete webhook from cache if webhook is of no use!
91-
if (err.message !== 'No webhook') await helper.cacheDel(publicKey, 'hook');
89+
await Promise.all([
90+
helper.publicProduce(publicKey, data),
91+
helper.cacheDel(publicKey, 'hook')
92+
])
9293
}
9394

9495
if (redirectOnOk == null) {
@@ -113,15 +114,30 @@ fastify.get('/private/:privateKey', async (request, reply) => {
113114
const { privateKey } = request.params;
114115
const webhook = request.query.hook;
115116
const statsPresent = 'stats' in request.query;
117+
118+
let webhookHandler, statsHandler, dataHandler;
119+
webhookHandler = statsHandler = dataHandler = () => null; // default
120+
116121
try {
117122
if (helper.validate(privateKey) !== 'private') throw 401;
118123
if (webhook == null) {
119-
await helper.cacheDel(privateKey, 'hook');
124+
webhookHandler = () => helper.cacheDel(privateKey, 'hook');
125+
} else {
126+
webhookHandler = () => helper.cacheSet(privateKey, {hook:webhook});
127+
}
128+
if (statsPresent) {
129+
statsHandler = () => helper.privateStats(privateKey);
120130
} else {
121-
await helper.cacheSet(privateKey, {hook:webhook});
131+
dataHandler = () => helper.privateConsume(privateKey);
122132
}
123-
if (statsPresent) return helper.privateStats(privateKey);
124-
const dataArray = await helper.privateConsume(privateKey);
133+
134+
const [ _, statsObj, dataArray ] = await Promise.all([
135+
webhookHandler(),
136+
statsHandler(),
137+
dataHandler()
138+
])
139+
140+
if (statsPresent) return statsObj;
125141
if (!dataArray.length) throw 404;
126142
reply.send(dataArray);
127143
} catch (err) {

api/test.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,6 @@ console.log(await helper.publicConsume(key.public));
2525

2626
await helper.oneToOneProduce(key.private, 'some Key', 'data "for one to one at some key');
2727
console.log(await helper.oneToOneConsume(key.public, 'some Key'));
28-
console.log(await helper.oneToOneIsConsumed(key.private, 'some Key'));
28+
console.log(await helper.oneToOneTTL(key.private, 'some Key'));
2929

3030
console.log('End of synchronous execution. Anything logged after this is from async only!')

middleware.js

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,8 @@ const ratelimit = new Ratelimit({
2828

2929
export default async function middleware(request) {
3030
// You could alternatively limit based on user ID or similar
31-
const ip = ipAddress(request) || '127.0.0.1'
32-
const { success, pending, reset } = await ratelimit.limit(
33-
ip
34-
)
35-
36-
await pending;
31+
const ip = ipAddress(request) || '127.0.0.1';
32+
const { success, reset } = await ratelimit.limit(ip);
3733

3834
return success ? next() : Response.json(
3935
{ message: `Try after ${(reset - Date.now())/1000} seconds`, error: "Too Many Requests", statusCode: 429 },

0 commit comments

Comments
 (0)