Skip to content

Commit 0fac64a

Browse files
committed
fix: handle added stream group creation
1 parent f81ab96 commit 0fac64a

10 files changed

+596
-148
lines changed

.vscode/settings.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -32,4 +32,4 @@
3232
"editor.codeActionsOnSave": {
3333
"source.fixAll.eslint": true
3434
}
35-
}
35+
}

package-lock.json

+482-92
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

+17-5
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@
2323
"commitlint": "commitlint -f $(git rev-list --tags --max-count=1) -t $(git log --pretty=%H | head -1)",
2424
"lint": "eslint \"src/**/*\" --fix --ext \"*.ts\"",
2525
"prepublishOnly": "npm run build",
26-
"coverage": "npm run test -- --collectCoverage",
27-
"test": "npm run commitlint && vitest run",
26+
"coverage": "npm run test -- --coverage",
27+
"test": "npm run commitlint && vitest run --no-isolate",
2828
"watch:test": "vitest",
2929
"watch:typescript": "tsc -w"
3030
},
@@ -50,10 +50,13 @@
5050
"@calebboyd/semantic-release-config": "^1.0.1",
5151
"@commitlint/cli": "^17.3.0",
5252
"@commitlint/config-angular": "^17.3.0",
53+
"@types/chance": "^1.1.3",
5354
"@types/debug": "^4.1.7",
5455
"@types/node": "^18.11.9",
55-
"@typescript-eslint/eslint-plugin": "^5.44.0",
56-
"@typescript-eslint/parser": "^5.44.0",
56+
"@typescript-eslint/eslint-plugin": "^5.45.0",
57+
"@typescript-eslint/parser": "^5.45.0",
58+
"@vitest/coverage-c8": "^0.25.3",
59+
"chance": "^1.1.9",
5760
"eslint": "^8.28.0",
5861
"eslint-config-prettier": "~8.5.0",
5962
"eslint-plugin-prettier": "^4.2.1",
@@ -110,7 +113,16 @@
110113
"*.d.ts"
111114
],
112115
"rules": {
113-
"@typescript-eslint/no-use-before-define": 0
116+
"@typescript-eslint/no-use-before-define": 0,
117+
"@typescript-eslint/no-unused-vars": [
118+
"error",
119+
{
120+
"ignoreRestSiblings": true,
121+
"destructuredArrayIgnorePattern": "^_",
122+
"varsIgnorePattern": "^_",
123+
"argsIgnorePattern": "^_"
124+
}
125+
]
114126
}
115127
},
116128
"contributors": [

src/redis.ts

+28-3
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import Redis, { ChainableCommander, RedisOptions } from 'ioredis'
22
import { RedisStream } from './stream.js'
33
import mkDebug from 'debug'
4-
import { XStreamResult, env } from './types.js'
4+
import { XStreamResult, env, RedisStreamOptions } from './types.js'
55

66
const debug = mkDebug('redis-x-stream')
77

@@ -67,7 +67,17 @@ export function ack(
6767
pendingAcks.clear()
6868
}
6969

70-
function xgroup(client: ChainableCommander, { group, streams, first }: RedisStream): void {
70+
function xgroup(client: ChainableCommander, stream: RedisStream): void {
71+
const { group, streams, first, addedStreams } = stream
72+
if (addedStreams) {
73+
for (const [key, start] of addedStreams) {
74+
streams.set(key, start)
75+
if (group && !first) {
76+
client.xgroup('CREATE', key, group, start, 'MKSTREAM')
77+
}
78+
}
79+
stream.addedStreams = null
80+
}
7181
if (!first || !group) return
7282
for (const [key, start] of streams) {
7383
debug(`xgroup create ${key} ${group} ${start} mkstream`)
@@ -104,10 +114,25 @@ function xreadgroup(
104114
;(client as KindaAny)[buffers ? 'xreadgroupBuffer' : 'xreadgroup'](...args)
105115
}
106116

117+
export function* initStreams(
118+
streams: RedisStreamOptions['streams'] | string
119+
): Iterable<[string, string]> {
120+
if (typeof streams === 'string') streams = [streams]
121+
if (Array.isArray(streams)) {
122+
for (const stream of streams) {
123+
yield [stream, '0']
124+
}
125+
} else if (streams) {
126+
yield* Object.entries(streams)
127+
} else {
128+
return
129+
}
130+
}
131+
107132
export function createClient(options?: Redis | string | RedisOptions) {
108133
let client: Redis,
109134
created = true
110-
if (typeof options === 'object') {
135+
if (options && typeof options === 'object') {
111136
if ('pipeline' in options) {
112137
client = options
113138
created = false

src/stream.spec.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { describe, it, expect } from 'vitest'
22
import Redis from 'ioredis'
3-
import redisStream from './stream'
43
import { rand } from './test.util.spec'
4+
import redisStream from './stream'
55

66
//eslint-disable-next-line @typescript-eslint/no-explicit-any
77
type JustForTests = any

src/stream.ts

+14-12
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { hostname } from 'node:os'
2-
import { ack, createClient, readAckDelete } from './redis.js'
2+
import { ack, createClient, initStreams, readAckDelete } from './redis.js'
33
import {
44
RedisStreamOptions,
55
RedisClient,
@@ -11,6 +11,7 @@ import {
1111
export { RedisStreamOptions }
1212

1313
const allowedKeys = {
14+
stream: 1,
1415
streams: 1,
1516
group: 1,
1617
consumer: 1,
@@ -57,8 +58,9 @@ export class RedisStream {
5758
public done = false
5859
public first = false
5960
public draining = false
61+
public reading = false
62+
public addedStreams: null | Iterable<[string, string]> = null
6063
private unblocked = false
61-
private reading = false
6264

6365
private readerId: number | null = null
6466
private pendingId: Promise<number | null> | null = null
@@ -94,6 +96,12 @@ export class RedisStream {
9496
this.block = options.block
9597
}
9698

99+
this.streams = new Map([
100+
...initStreams(streams),
101+
...initStreams(options.streams),
102+
...initStreams(options.stream),
103+
])
104+
97105
if (this.block === 0 || this.block === Infinity) {
98106
this.blocked = true
99107
const { client, created } = createClient(options.redisControl)
@@ -128,14 +136,6 @@ export class RedisStream {
128136
this.first = true
129137
}
130138

131-
if (Array.isArray(options.streams)) {
132-
this.streams = options.streams.reduce((keys, key) => {
133-
return keys.set(key, '0')
134-
}, new Map<string, string>())
135-
} else {
136-
this.streams = new Map(Object.entries(options.streams))
137-
}
138-
139139
if (typeof options.count === 'number') {
140140
this.count = options.count
141141
}
@@ -247,8 +247,10 @@ export class RedisStream {
247247
}
248248
}
249249

250-
public async addStream(streamName: string) {
251-
this.streams.set(streamName, '0')
250+
public async addStream(streams: RedisStreamOptions['streams'] | string) {
251+
this.addedStreams = this.addedStreams
252+
? [...this.addedStreams, ...initStreams(streams)]
253+
: initStreams(streams)
252254
await this.maybeUnblock()
253255
}
254256

src/test.util.spec.ts

+14-4
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,13 @@
11
import { RedisClient, StreamEntry, XEntryResult } from './types.js'
2+
import { afterAll } from 'vitest'
23
import mkDebug from 'debug'
4+
import { randomInt } from 'node:crypto'
5+
import Chance from 'chance'
36

4-
const debug = mkDebug('redis-x-stream')
7+
const seed = Number(process.env.TEST_SEED) || randomInt(Date.now())
8+
const chance = new Chance(seed)
9+
10+
const debug = mkDebug('test-redis-x-stream')
511

612
const delay = (ms: number): Promise<void> => new Promise((resolve) => setTimeout(resolve, ms)),
713
times = <T>(count: number, fn: (_: undefined, i: number) => T): Array<T> =>
@@ -10,8 +16,8 @@ const delay = (ms: number): Promise<void> => new Promise((resolve) => setTimeout
1016
await client.quit()
1117
return new Promise((resolve) => client.once('end', resolve))
1218
},
13-
randNum = (min: number, max: number) => Math.floor(Math.random() * (max - min) + min),
14-
rand = (): string => Math.random().toString(36).slice(6),
19+
randNum = (min: number, max: number) => chance.integer({ min, max }),
20+
rand = (): string => chance.string({}),
1521
drain = async (iterable: AsyncIterable<XEntryResult>): Promise<Map<string, StreamEntry[]>> => {
1622
const results = new Map<string, StreamEntry[]>()
1723
for await (const [streamName, entry] of iterable) {
@@ -34,4 +40,8 @@ async function hydrateForTest(writer: RedisClient, stream: string, ...values: st
3440
return values
3541
}
3642

37-
export { delay, times, quit, hydrateForTest, rand, testEntries, redisIdRegex, drain }
43+
afterAll(() => {
44+
console.log(`Seed set to: ${seed}`)
45+
})
46+
47+
export { delay, times, quit, hydrateForTest, rand, randNum, testEntries, redisIdRegex, drain }

src/types.ts

+7-1
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,16 @@ if (typeof process !== 'undefined' && process.env) {
2727
env.REDIS_X_STREAM_URL = process.env.REDIS_X_STREAM_URL
2828
}
2929
export interface RedisStreamOptions {
30+
/**
31+
* Redis stream keys to be read. If a Record is provided each value is the starting id for that stream
32+
*
33+
* *alias* for stream
34+
*/
35+
streams?: string[] | Record<string, string> | string
3036
/**
3137
* Redis stream keys to be read. If a Record is provided each value is the starting id for that stream
3238
*/
33-
streams: string[] | Record<string, string>
39+
stream?: string[] | Record<string, string> | string
3440
/**
3541
* The consumer group.
3642
* Note: if only a group is provided a consumer is created automatically

src/xread.spec.ts

+29-26
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,15 @@
11
import Redis from 'ioredis'
22
import { describe, it, expect, beforeAll, afterAll, beforeEach, afterEach } from 'vitest'
3+
import {
4+
delay,
5+
hydrateForTest,
6+
quit,
7+
testEntries,
8+
redisIdRegex,
9+
randNum,
10+
} from './test.util.spec.js'
311
import { RedisStream } from './stream.js'
412
import { RedisClient } from './types.js'
5-
import { delay, hydrateForTest, quit, times, testEntries, redisIdRegex } from './test.util.spec.js'
613

714
describe('redis-x-stream xread', () => {
815
let writer!: RedisClient, reader: RedisClient, prefix: string
@@ -28,8 +35,8 @@ describe('redis-x-stream xread', () => {
2835
return quit(reader)
2936
})
3037
it('should dispense entries', async () => {
31-
const streamName = key('my-straam'),
32-
iterable = new RedisStream(streamName)
38+
const streamName = key('my-stream'),
39+
iterable = new RedisStream({ stream: streamName, count: randNum(1, 20) })
3340
await hydrateForTest(writer, streamName, ...testEntries)
3441
let entryIdx = 0,
3542
asserted = false
@@ -47,33 +54,25 @@ describe('redis-x-stream xread', () => {
4754
it('should block waiting for new entries', async () => {
4855
let entries = 0
4956
const streamName = key('my-stream'),
50-
redisIdRegex = /\d+-\d/,
5157
block = 200,
5258
iterable = new RedisStream({
5359
block,
60+
count: randNum(1, 50),
5461
streams: [streamName],
55-
}),
56-
hydrate = () => hydrateForTest(writer, streamName),
57-
iterate = async () => {
58-
for await (const [str, entry] of iterable) {
59-
entries++
60-
expect(str).toEqual(streamName)
61-
expect(entry[0]).toMatch(redisIdRegex)
62-
}
63-
}
62+
})
63+
const hydrate = () => hydrateForTest(writer, streamName)
6464
await hydrate()
65-
const consuming = iterate()
66-
await delay(block)
67-
times(9, hydrate)
68-
await delay(block)
69-
times(10, hydrate)
70-
await consuming
71-
expect(entries).toEqual(testEntries.length * 20)
65+
for await (const _ of iterable) {
66+
if (entries++ === testEntries.length - 1) {
67+
delay(block - 20).then(hydrate)
68+
}
69+
}
70+
expect(entries).toEqual(testEntries.length * 2)
7271
})
7372

7473
it('should throw if ack is called without a group or consumer', async () => {
7574
const streamName = key('my-stream')
76-
const stream = new RedisStream(streamName)
75+
const stream = new RedisStream({ stream: [streamName], count: randNum(1, 500) })
7776
const values = await hydrateForTest(writer, streamName)
7877
let ackAttempts = 0
7978
for await (const [_, [id]] of stream) {
@@ -96,25 +95,29 @@ describe('redis-x-stream xread', () => {
9695
const laterStream = key('later-stream')
9796
await hydrateForTest(writer, myStream)
9897
await hydrateForTest(writer, laterStream)
99-
const stream = new RedisStream({ streams: [myStream], block: Infinity })
98+
const stream = new RedisStream({
99+
streams: [myStream],
100+
block: Infinity,
101+
count: randNum(300, 400),
102+
})
100103
let i = 0
101104
for await (const [streamName, _] of stream) {
102105
i++
103106
if (i === testEntries.length) {
104107
expect(streamName).toEqual(myStream)
105108
setTimeout(() => {
106-
//TODO: expect stream is blocked?
109+
expect(stream.reading).toBe(true)
107110
stream.addStream(laterStream)
108111
})
109112
}
110113
if (i > testEntries.length) {
111114
expect(streamName).toEqual(laterStream)
112115
}
113-
if (i === testEntries.length * 2) {
116+
if (i === testEntries.length * 2 - 1) {
114117
setTimeout(() => {
115118
i++
116119
stream.end() //break;
117-
})
120+
}, 100)
118121
}
119122
}
120123
//stream will block indefinitely (i++ in the future to assert after loop)
@@ -123,7 +126,7 @@ describe('redis-x-stream xread', () => {
123126

124127
it('should not allow re-iteration (done is set)', async () => {
125128
const streamName = key('my-stream'),
126-
iterable = new RedisStream(streamName)
129+
iterable = new RedisStream({ stream: streamName, count: randNum(1, 2) })
127130
let entries = 0
128131
const values = await hydrateForTest(writer, streamName)
129132
const iterate = async () => {

src/xreadgroup.spec.ts

+3-3
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
1+
import Redis from 'ioredis'
12
import { describe, it, expect, beforeAll, afterAll, beforeEach, afterEach } from 'vitest'
2-
import redisStream, { RedisStream } from './stream.js'
33
import { hostname } from 'os'
44
import { drain, hydrateForTest, quit, rand, redisIdRegex, testEntries } from './test.util.spec.js'
5+
import redisStream, { RedisStream } from './stream.js'
56
import { RedisClient } from './types.js'
6-
import Redis from 'ioredis'
77

88
describe('redis-x-stream xreadgroup', () => {
99
let writer!: RedisClient, reader: RedisClient, prefix: string
@@ -143,7 +143,7 @@ describe('redis-x-stream xreadgroup', () => {
143143
deleteOnAck: true,
144144
})
145145
let i = 0
146-
for await (const [streamName, [id, keyvals]] of stream) {
146+
for await (const _ of stream) {
147147
if (++i === 3) {
148148
void stream.drain()
149149
}

0 commit comments

Comments
 (0)