Skip to content

Commit 010a433

Browse files
authored
Merge pull request redis-rs#350 from Terkwood/fix/streams-example
2 parents 6dfc3cc + debc7d9 commit 010a433

File tree

1 file changed

+28
-20
lines changed

1 file changed

+28
-20
lines changed

examples/streams.rs

Lines changed: 28 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -82,9 +82,30 @@ fn demo_group_reads(client: &redis::Client) {
8282
}
8383

8484
for _ in 0..repeat {
85-
read_group_records(&ca, *slowness).expect("group read");
86-
87-
thread::sleep(Duration::from_millis(random_wait_millis(*slowness)))
85+
let read_reply = read_group_records(&ca, *slowness).expect("group read");
86+
87+
// fake some expensive work
88+
for StreamKey { key, ids } in read_reply.keys {
89+
for StreamId { id, map: _ } in &ids {
90+
thread::sleep(Duration::from_millis(random_wait_millis(*slowness)));
91+
println!(
92+
"Stream {} ID {} Consumer slowness {} SysTime {}",
93+
key,
94+
id,
95+
slowness,
96+
SystemTime::now()
97+
.duration_since(UNIX_EPOCH)
98+
.expect("time")
99+
.as_millis()
100+
);
101+
}
102+
103+
// acknowledge each stream and message ID once all messages are
104+
// correctly processed
105+
let id_strs: Vec<&String> =
106+
ids.iter().map(|StreamId { id, map: _ }| id).collect();
107+
con.xack(key, GROUP_NAME, &id_strs).expect("ack")
108+
}
88109
}
89110
}))
90111
}
@@ -155,8 +176,9 @@ fn thrifty_rand() -> u8 {
155176
+ 1
156177
}
157178

179+
const MAGIC: u64 = 11;
158180
fn random_wait_millis(slowness: u8) -> u64 {
159-
thrifty_rand() as u64 * thrifty_rand() as u64 * 35 * slowness as u64
181+
thrifty_rand() as u64 * thrifty_rand() as u64 * MAGIC * slowness as u64
160182
}
161183

162184
/// Generate a potentially unique value.
@@ -219,7 +241,7 @@ fn consumer_name(slowness: u8) -> String {
219241
const GROUP_NAME: &str = "example-group-aaa";
220242

221243
#[cfg(feature = "streams")]
222-
fn read_group_records(client: &redis::Client, slowness: u8) -> RedisResult<()> {
244+
fn read_group_records(client: &redis::Client, slowness: u8) -> RedisResult<StreamReadReply> {
223245
let mut con = client.get_connection().expect("conn");
224246

225247
let opts = StreamReadOptions::default()
@@ -235,21 +257,7 @@ fn read_group_records(client: &redis::Client, slowness: u8) -> RedisResult<()> {
235257
)
236258
.expect("records");
237259

238-
for StreamKey { key, ids } in srr.keys {
239-
for StreamId { id, map: _ } in ids {
240-
println!(
241-
"Stream {} ID {} Consumer slowness {} SysTime {}",
242-
key,
243-
id,
244-
slowness,
245-
SystemTime::now()
246-
.duration_since(UNIX_EPOCH)
247-
.expect("time")
248-
.as_millis()
249-
);
250-
}
251-
}
252-
Ok(())
260+
Ok(srr)
253261
}
254262

255263
#[cfg(feature = "streams")]

0 commit comments

Comments
 (0)