Skip to content

Commit 894db87

Browse files
sjpotterbobymicroby
authored andcommitted
address review comments
1 parent 4963687 commit 894db87

File tree

7 files changed

+47
-122
lines changed

7 files changed

+47
-122
lines changed

packages/client/lib/client/cache.ts

Lines changed: 14 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
11
import { EventEmitter } from 'stream';
2-
import RedisClient, { RedisClientType } from '.';
2+
import RedisClient from '.';
33
import { RedisArgument, ReplyUnion, TransformReply, TypeMapping } from '../RESP/types';
44
import { BasicCommandParser } from './parser';
55

66
type CachingClient = RedisClient<any, any, any, any, any>;
7-
type CachingClientType = RedisClientType<any, any, any, any, any>;
87
type CmdFunc = () => Promise<ReplyUnion>;
98

109
export interface ClientSideCacheConfig {
@@ -23,6 +22,17 @@ interface ClientSideCacheEntry {
2322
validate(): boolean;
2423
}
2524

25+
function generateCacheKey(redisArgs: ReadonlyArray<RedisArgument>): string {
26+
const tmp = new Array(redisArgs.length*2);
27+
28+
for (let i = 0; i < redisArgs.length; i++) {
29+
tmp[i] = redisArgs[i].length;
30+
tmp[i+redisArgs.length] = redisArgs[i];
31+
}
32+
33+
return tmp.join('_');
34+
}
35+
2636
abstract class ClientSideCacheEntryBase implements ClientSideCacheEntry {
2737
#invalidated = false;
2838
readonly #expireTime: number;
@@ -125,7 +135,7 @@ export class BasicClientSideCache extends ClientSideCacheProvider {
125135
) {
126136
let reply: ReplyUnion;
127137

128-
const cacheKey = parser.cacheKey;
138+
const cacheKey = generateCacheKey(parser.redisArgs);
129139

130140
// "2"
131141
let cacheEntry = this.get(cacheKey);
@@ -339,10 +349,6 @@ export class BasicClientSideCache extends ClientSideCacheProvider {
339349
export abstract class PooledClientSideCacheProvider extends BasicClientSideCache {
340350
#disabled = false;
341351

342-
abstract updateRedirect(id: number): void;
343-
abstract addClient(client: CachingClientType): void;
344-
abstract removeClient(client: CachingClientType): void;
345-
346352
disable() {
347353
this.#disabled = true;
348354
}
@@ -367,27 +373,13 @@ export abstract class PooledClientSideCacheProvider extends BasicClientSideCache
367373
return super.has(cacheKey);
368374
}
369375

370-
onPoolConnect(factory: () => CachingClientType) {};
371-
372376
onPoolClose() {
373377
this.clear();
374378
};
375379
}
376380

377381
// doesn't do anything special in pooling, clears cache on every client disconnect
378382
export class BasicPooledClientSideCache extends PooledClientSideCacheProvider {
379-
380-
override updateRedirect(id: number): void {
381-
return;
382-
}
383-
384-
override addClient(client: CachingClientType): void {
385-
return;
386-
}
387-
override removeClient(client: CachingClientType): void {
388-
return;
389-
}
390-
391383
override onError() {
392384
this.clear(false);
393385
}
@@ -459,75 +451,4 @@ export class PooledNoRedirectClientSideCache extends BasicPooledClientSideCache
459451
override onError() {}
460452

461453
override onClose() {}
462-
}
463-
464-
// Only clears cache on "management"/"redirect" client disconnect
465-
export class PooledRedirectClientSideCache extends PooledClientSideCacheProvider {
466-
#id?: number;
467-
#clients: Set<CachingClientType> = new Set();
468-
#redirectClient?: CachingClientType;
469-
470-
constructor(config: ClientSideCacheConfig) {
471-
super(config);
472-
this.disable();
473-
}
474-
475-
override trackingOn(): string[] {
476-
if (this.#id) {
477-
return ['CLIENT', 'TRACKING', 'ON', 'REDIRECT', this.#id.toString()];
478-
} else {
479-
return [];
480-
}
481-
}
482-
483-
override updateRedirect(id: number) {
484-
this.#id = id;
485-
for (const client of this.#clients) {
486-
client.sendCommand(this.trackingOn()).catch(() => {});
487-
}
488-
}
489-
490-
override addClient(client: CachingClientType) {
491-
this.#clients.add(client);
492-
}
493-
494-
override removeClient(client: CachingClientType) {
495-
this.#clients.delete(client);
496-
}
497-
498-
override onError(): void {};
499-
500-
override async onPoolConnect(factory: () => CachingClientType) {
501-
const client = factory();
502-
this.#redirectClient = client;
503-
504-
client.on("error", () => {
505-
this.disable();
506-
this.clear();
507-
}).on("ready", async () => {
508-
const clientId = await client.withTypeMapping({}).clientId();
509-
this.updateRedirect(clientId);
510-
this.enable();
511-
})
512-
513-
try {
514-
await client.connect();
515-
} catch (err) {
516-
throw err;
517-
}
518-
}
519-
520-
override onClose() {};
521-
522-
override onPoolClose() {
523-
super.onPoolClose();
524-
525-
if (this.#redirectClient) {
526-
this.#id = undefined;
527-
const client = this.#redirectClient;
528-
this.#redirectClient = undefined;
529-
530-
return client.close();
531-
}
532-
}
533-
}
454+
}

packages/client/lib/client/commands-queue.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ export default class RedisCommandsQueue {
111111
onErrorReply: err => this.#onErrorReply(err),
112112
onPush: push => {
113113
if (!this.#onPush(push)) {
114+
// currently only supporting "invalidate" over RESP3 push messages
114115
switch (push[0].toString()) {
115116
case "invalidate": {
116117
if (this.#invalidateCallback) {

packages/client/lib/client/index.ts

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -308,9 +308,13 @@ export default class RedisClient<
308308
// was in a watch transaction when
309309
// a topology change occured
310310
#dirtyWatch?: string;
311-
#watchEpoch?: number;
311+
#watchEpoch?: number;
312312
#clientSideCache?: ClientSideCacheProvider;
313313
#credentialsSubscription: Disposable | null = null;
314+
get clientSideCache() {
315+
return this._self.#clientSideCache;
316+
}
317+
314318

315319
get options(): RedisClientOptions<M, F, S, RESP> | undefined {
316320
return this._self.#options;
@@ -332,7 +336,6 @@ export default class RedisClient<
332336
return this._self.#socket.socketEpoch;
333337
}
334338

335-
336339
get isWatching() {
337340
return this._self.#watchEpoch !== undefined;
338341
}
@@ -532,10 +535,7 @@ export default class RedisClient<
532535
}
533536

534537
if (this.#clientSideCache) {
535-
const tracking = this.#clientSideCache.trackingOn();
536-
if (tracking) {
537-
commands.push(tracking);
538-
}
538+
commands.push(this.#clientSideCache.trackingOn());
539539
}
540540

541541
return commands;
@@ -973,7 +973,7 @@ export default class RedisClient<
973973
}
974974

975975
const chainId = Symbol('Pipeline Chain'),
976-
promise = Promise.allSettled(
976+
promise = Promise.all(
977977
commands.map(({ args }) => this._self.#queue.addCommand(args, {
978978
chainId,
979979
typeMapping: this._commandOptions?.typeMapping

packages/client/lib/client/parser.ts

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,14 @@ export class BasicCommandParser implements CommandParser {
3434
}
3535

3636
get cacheKey() {
37-
let cacheKey = this.#redisArgs.map((arg) => arg.length).join('_');
38-
return cacheKey + '_' + this.#redisArgs.join('_');
37+
const tmp = new Array(this.#redisArgs.length*2);
38+
39+
for (let i = 0; i < this.#redisArgs.length; i++) {
40+
tmp[i] = this.#redisArgs[i].length;
41+
tmp[i+this.#redisArgs.length] = this.#redisArgs[i];
42+
}
43+
44+
return tmp.join('_');
3945
}
4046

4147
push(...arg: Array<RedisArgument>) {

packages/client/lib/client/pool.ts

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import { TimeoutError } from '../errors';
77
import { attachConfig, functionArgumentsPrefix, getTransformReply, scriptArgumentsPrefix } from '../commander';
88
import { CommandOptions } from './commands-queue';
99
import RedisClientMultiCommand, { RedisClientMultiCommandType } from './multi-command';
10-
import { BasicPooledClientSideCache, ClientSideCacheConfig, PooledClientSideCacheProvider, PooledNoRedirectClientSideCache, PooledRedirectClientSideCache } from './cache';
10+
import { BasicPooledClientSideCache, ClientSideCacheConfig, PooledClientSideCacheProvider } from './cache';
1111
import { BasicCommandParser } from './parser';
1212

1313
export interface RedisPoolOptions {
@@ -215,6 +215,9 @@ export class RedisClientPool<
215215
}
216216

217217
#clientSideCache?: PooledClientSideCacheProvider;
218+
get clientSideCache() {
219+
return this._self.#clientSideCache;
220+
}
218221

219222
/**
220223
* You are probably looking for {@link RedisClient.createPool `RedisClient.createPool`},
@@ -241,8 +244,7 @@ export class RedisClientPool<
241244
} else {
242245
const cscConfig = options.clientSideCache;
243246
this.#clientSideCache = clientOptions.clientSideCache = new BasicPooledClientSideCache(cscConfig);
244-
this.#clientSideCache = clientOptions.clientSideCache = new PooledNoRedirectClientSideCache(cscConfig);
245-
this.#clientSideCache = clientOptions.clientSideCache = new PooledRedirectClientSideCache(cscConfig);
247+
// this.#clientSideCache = clientOptions.clientSideCache = new PooledNoRedirectClientSideCache(cscConfig);
246248
}
247249
}
248250

@@ -312,13 +314,6 @@ export class RedisClientPool<
312314
if (this._self.#isOpen) return; // TODO: throw error?
313315
this._self.#isOpen = true;
314316

315-
try {
316-
this._self.#clientSideCache?.onPoolConnect(this._self.#clientFactory);
317-
} catch (err) {
318-
this.destroy();
319-
throw err;
320-
}
321-
322317
const promises = [];
323318
while (promises.length < this._self.#options.minimum) {
324319
promises.push(this._self.#create());
@@ -334,18 +329,14 @@ export class RedisClientPool<
334329
return this as unknown as RedisClientPoolType<M, F, S, RESP, TYPE_MAPPING>;
335330
}
336331

337-
async #create(redirect?: boolean) {
332+
async #create() {
338333
const node = this._self.#clientsInUse.push(
339334
this._self.#clientFactory()
340335
.on('error', (err: Error) => this.emit('error', err))
341336
);
342337

343338
try {
344339
const client = node.value;
345-
if (this._self.#clientSideCache) {
346-
this._self.#clientSideCache.addClient(node.value);
347-
}
348-
349340
await client.connect();
350341
} catch (err) {
351342
this._self.#clientsInUse.remove(node);
@@ -436,7 +427,6 @@ export class RedisClientPool<
436427
for (let i = 0; i < toDestroy; i++) {
437428
// TODO: shift vs pop
438429
const client = this.#idleClients.shift()!
439-
this.#clientSideCache?.removeClient(client);
440430
client.destroy();
441431
}
442432
}

packages/client/lib/cluster/index.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,10 @@ export default class RedisCluster<
270270
return this._self.#slots.slots;
271271
}
272272

273+
get clientSideCache() {
274+
return this._self.#slots.clientSideCache;
275+
}
276+
273277
/**
274278
* An array of the cluster masters.
275279
* Use with {@link RedisCluster.prototype.nodeClient} to get the client for a specific master node.

packages/client/lib/sentinel/index.ts

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import { RedisVariadicArgument } from '../commands/generic-transformers';
1616
import { WaitQueue } from './wait-queue';
1717
import { TcpNetConnectOpts } from 'node:net';
1818
import { RedisTcpSocketOptions } from '../client/socket';
19-
import { BasicPooledClientSideCache, PooledClientSideCacheProvider, PooledNoRedirectClientSideCache, PooledRedirectClientSideCache } from '../client/cache';
19+
import { BasicPooledClientSideCache, PooledClientSideCacheProvider } from '../client/cache';
2020

2121
interface ClientInfo {
2222
id: number;
@@ -302,6 +302,10 @@ export default class RedisSentinel<
302302
#masterClientCount = 0;
303303
#masterClientInfo?: ClientInfo;
304304

305+
get clientSideCache() {
306+
return this._self.#internal.clientSideCache;
307+
}
308+
305309
constructor(options: RedisSentinelOptions<M, F, S, RESP, TYPE_MAPPING>) {
306310
super();
307311

@@ -618,7 +622,7 @@ class RedisSentinelInternal<
618622

619623
readonly #name: string;
620624
readonly #nodeClientOptions: RedisClientOptions<M, F, S, RESP, TYPE_MAPPING, RedisTcpSocketOptions>;
621-
readonly #sentinelClientOptions: RedisClientOptions<typeof RedisSentinelModule, F, S, RESP, TYPE_MAPPING, RedisTcpSocketOptions>;
625+
readonly #sentinelClientOptions: RedisClientOptions<typeof RedisSentinelModule, RedisFunctions, RedisScripts, RespVersions, TypeMapping, RedisTcpSocketOptions>;
622626
readonly #scanInterval: number;
623627
readonly #passthroughClientErrorEvents: boolean;
624628

@@ -679,8 +683,7 @@ class RedisSentinelInternal<
679683
} else {
680684
const cscConfig = options.clientSideCache;
681685
this.#clientSideCache = this.#nodeClientOptions.clientSideCache = new BasicPooledClientSideCache(cscConfig);
682-
this.#clientSideCache = this.#nodeClientOptions.clientSideCache = new PooledNoRedirectClientSideCache(cscConfig);
683-
this.#clientSideCache = this.#nodeClientOptions.clientSideCache = new PooledRedirectClientSideCache(cscConfig);
686+
// this.#clientSideCache = this.#nodeClientOptions.clientSideCache = new PooledNoRedirectClientSideCache(cscConfig);
684687
}
685688
}
686689

0 commit comments

Comments
 (0)