Skip to content

Commit 7108fef

Browse files
committed
Use XACK in streams example
1 parent 215d687 commit 7108fef

File tree

1 file changed

+26
-19
lines changed

1 file changed

+26
-19
lines changed

examples/streams.rs

Lines changed: 26 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -82,9 +82,30 @@ fn demo_group_reads(client: &redis::Client) {
8282
}
8383

8484
for _ in 0..repeat {
85-
read_group_records(&ca, *slowness).expect("group read");
86-
87-
thread::sleep(Duration::from_millis(random_wait_millis(*slowness)))
85+
let read_reply = read_group_records(&ca, *slowness).expect("group read");
86+
87+
// fake some expensive work
88+
for StreamKey { key, ids } in read_reply.keys {
89+
for StreamId { id, map: _ } in &ids {
90+
thread::sleep(Duration::from_millis(random_wait_millis(*slowness)));
91+
println!(
92+
"Stream {} ID {} Consumer slowness {} SysTime {}",
93+
key,
94+
id,
95+
slowness,
96+
SystemTime::now()
97+
.duration_since(UNIX_EPOCH)
98+
.expect("time")
99+
.as_millis()
100+
);
101+
}
102+
103+
// acknowledge each stream and message ID once all messages are
104+
// correctly processed
105+
let id_strs: Vec<&String> =
106+
ids.iter().map(|StreamId { id, map: _ }| id).collect();
107+
con.xack(key, GROUP_NAME, &id_strs).expect("ack")
108+
}
88109
}
89110
}))
90111
}
@@ -219,7 +240,7 @@ fn consumer_name(slowness: u8) -> String {
219240
const GROUP_NAME: &str = "example-group-aaa";
220241

221242
#[cfg(feature = "streams")]
222-
fn read_group_records(client: &redis::Client, slowness: u8) -> RedisResult<()> {
243+
fn read_group_records(client: &redis::Client, slowness: u8) -> RedisResult<StreamReadReply> {
223244
let mut con = client.get_connection().expect("conn");
224245

225246
let opts = StreamReadOptions::default()
@@ -235,21 +256,7 @@ fn read_group_records(client: &redis::Client, slowness: u8) -> RedisResult<()> {
235256
)
236257
.expect("records");
237258

238-
for StreamKey { key, ids } in srr.keys {
239-
for StreamId { id, map: _ } in ids {
240-
println!(
241-
"Stream {} ID {} Consumer slowness {} SysTime {}",
242-
key,
243-
id,
244-
slowness,
245-
SystemTime::now()
246-
.duration_since(UNIX_EPOCH)
247-
.expect("time")
248-
.as_millis()
249-
);
250-
}
251-
}
252-
Ok(())
259+
Ok(srr)
253260
}
254261

255262
#[cfg(feature = "streams")]

0 commit comments

Comments
 (0)