Skip to content

Commit

Permalink
Bug 1 prefix (#24)
Browse files Browse the repository at this point in the history
* Debugging Key Prefix Issue. Removed Ioredis types dev dep.

* bug-1: strip the prefix prior to binding listeners if using keyPrefix. (#20)

* bug-1: strip the prefix prior to binding listeners if using keyPrefix.

* Additional strip / prepend logic to account for ioredis xgroup not using keyPrefix.

---------

Co-authored-by: Zach Sherbondy <zach.sherbondy@five9.com>
Co-authored-by: Zach Sherbondy <zachsherbondy@Zachs-MBP.lan>

* Removed debug logs. Changed createConsumerGroup debug log to the new stream name with the prefix if existent.
Reverted example apps state.

* Bumped version to 1.2.0.

---------

Co-authored-by: Zach <34084826+zeejers@users.noreply.github.com>
Co-authored-by: Zach Sherbondy <zach.sherbondy@five9.com>
Co-authored-by: Zach Sherbondy <zachsherbondy@Zachs-MBP.lan>
  • Loading branch information
4 people authored Sep 13, 2024
1 parent f4920a6 commit 45d1873
Show file tree
Hide file tree
Showing 7 changed files with 706 additions and 688 deletions.
418 changes: 206 additions & 212 deletions examples/client-app/package-lock.json

Large diffs are not rendered by default.

429 changes: 207 additions & 222 deletions examples/users-microservice/package-lock.json

Large diffs are not rendered by default.

1 change: 0 additions & 1 deletion examples/users-microservice/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
"@nestjs/schematics": "^9.2.0",
"@nestjs/testing": "^9.4.3",
"@types/express": "^4.17.21",
"@types/ioredis": "^4.28.10",
"@types/jest": "28.1.8",
"@types/node": "^16.18.101",
"@types/supertest": "^2.0.16",
Expand Down
1 change: 1 addition & 0 deletions examples/users-microservice/src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ async function bootstrap() {
strategy: new RedisStreamStrategy({
connection: {
url: '0.0.0.0:6379',
// keyPrefix: 'my-key-prefix:',
},
streams: {
block: 5000,
Expand Down
51 changes: 43 additions & 8 deletions lib/redis.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,15 @@ export class RedisStreamStrategy
try {
if (!this.redis) throw new Error('Redis instance not found.');

await this.redis.xgroup('CREATE', stream, consumerGroup, '$', 'MKSTREAM');
const modifiedStreamKey = this.prependPrefix(stream);

await this.redis.xgroup(
'CREATE',
modifiedStreamKey,
consumerGroup,
'$',
'MKSTREAM',
);

return true;
} catch (error) {
Expand All @@ -106,7 +114,7 @@ export class RedisStreamStrategy
'Consumer Group "' +
consumerGroup +
'" already exists for stream: ' +
stream,
this.prependPrefix(stream),
);
return true;
} else {
Expand Down Expand Up @@ -214,12 +222,13 @@ export class RedisStreamStrategy

private async notifyHandlers(stream: string, messages: any[]) {
try {
const handler = this.streamHandlerMap[stream];
const modifiedStream = this.stripPrefix(stream);
const handler = this.streamHandlerMap[modifiedStream];

await Promise.all(
messages.map(async (message) => {
let ctx = new RedisStreamContext([
stream,
modifiedStream,
message[0], // message id needed for ACK.
this.options?.streams?.consumerGroup,
this.options?.streams?.consumer,
Expand Down Expand Up @@ -269,10 +278,14 @@ export class RedisStreamStrategy
'BLOCK',
this.options?.streams?.block || 0,
'STREAMS',
...(Object.keys(this.streamHandlerMap) as string[]), // streams keys
...(Object.keys(this.streamHandlerMap) as string[]).map(
(stream: string) => '>',
), // '>', this is needed for xreadgroup as id.
...(Object.keys(this.streamHandlerMap).map((s) =>
this.stripPrefix(s),
) as string[]), // streams keys
...(
Object.keys(this.streamHandlerMap).map((s) =>
this.stripPrefix(s),
) as string[]
).map((stream: string) => '>'), // '>', this is needed for xreadgroup as id.
);

// if BLOCK time ended, and results are null, listen again.
Expand All @@ -285,10 +298,32 @@ export class RedisStreamStrategy

return this.listenOnStreams();
} catch (error) {
console.log('Error in listenOnStreams: ', error);
this.logger.error(error);
}
}

// When the stream handler name is stored in streamHandlerMap, its stored WITH the key prefix, so sending additional redis commands when using the prefix with the existing key will cause a duplicate prefix. This ensures to strip the first occurrence of the prefix when binding listeners.
private stripPrefix(streamHandlerName: string) {
const keyPrefix = this?.redis?.options?.keyPrefix;
if (!keyPrefix || !streamHandlerName.startsWith(keyPrefix)) {
return streamHandlerName;
}
// Replace just the first instance of the substring
return streamHandlerName.replace(keyPrefix, '');
}

// xgroup CREATE command with ioredis does not automatically prefix the keyPrefix, though many other commands do, such as xreadgroup.
// https://github.com/redis/ioredis/issues/1659
private prependPrefix(key: string) {
const keyPrefix = this?.redis?.options?.keyPrefix;
if (keyPrefix && !key.startsWith(keyPrefix)) {
return `${keyPrefix}${key}`;
} else {
return key;
}
}

// for redis instances. need to add mechanism to try to connect back.
public handleError(stream: any) {
stream.on(ERROR_EVENT, (err: any) => {
Expand Down
Loading

0 comments on commit 45d1873

Please sign in to comment.