Skip to content

Commit a89c7f2

Browse files
committed
feat: add flush for consuming stream outside of blocked mode
1 parent 207fe32 commit a89c7f2

File tree

1 file changed

+15
-0
lines changed

1 file changed

+15
-0
lines changed

src/stream.ts

+15
Original file line numberDiff line numberDiff line change
@@ -268,6 +268,21 @@ export class RedisStream {
268268
await this.maybeUnblock()
269269
}
270270

271+
public async flush(client?: RedisClient) {
272+
if (!this.pendingAcks.size) return
273+
let c = client
274+
if (!this.done) {
275+
c = c ?? this.control ? this.control : this.client
276+
}
277+
if (this.done && !this.createdConnection) {
278+
c = c ?? this.client
279+
}
280+
if (!c) throw new Error('No suitable client')
281+
const pipeline = c.pipeline()
282+
ack(pipeline, this)
283+
await pipeline.exec()
284+
}
285+
271286
protected async return(): Promise<void> {
272287
await this.quit()
273288
}

0 commit comments

Comments
 (0)