@@ -42,21 +42,19 @@ pub async fn consumer(cfg: &RedisInputOpts) -> Result<DynConsumer> {
4242 . unwrap_or_else ( || format ! ( "{}_delays" , cfg. queue_key) ) ;
4343 let delayed_lock_key = format ! ( "{delayed_queue_key}_lock" ) ;
4444
45- backends:: RedisBackend :: < backends:: redis:: RedisMultiplexedConnectionManager > :: builder (
46- backends:: RedisConfig {
47- dsn : cfg. dsn . clone ( ) ,
48- max_connections : cfg. max_connections ,
49- reinsert_on_nack : cfg. reinsert_on_nack ,
50- queue_key : cfg. queue_key . clone ( ) ,
51- delayed_queue_key,
52- delayed_lock_key,
53- consumer_group : cfg. consumer_group . clone ( ) ,
54- consumer_name : cfg. consumer_name . clone ( ) ,
55- // FIXME: expose in config?
56- payload_key : "payload" . to_string ( ) ,
57- ack_deadline_ms : cfg. ack_deadline_ms ,
58- } ,
59- )
45+ backends:: RedisBackend :: builder ( backends:: RedisConfig {
46+ dsn : cfg. dsn . clone ( ) ,
47+ max_connections : cfg. max_connections ,
48+ reinsert_on_nack : cfg. reinsert_on_nack ,
49+ queue_key : cfg. queue_key . clone ( ) ,
50+ delayed_queue_key,
51+ delayed_lock_key,
52+ consumer_group : cfg. consumer_group . clone ( ) ,
53+ consumer_name : cfg. consumer_name . clone ( ) ,
54+ // FIXME: expose in config?
55+ payload_key : "payload" . to_string ( ) ,
56+ ack_deadline_ms : cfg. ack_deadline_ms ,
57+ } )
6058 . make_dynamic ( )
6159 . build_consumer ( )
6260 . await
@@ -69,22 +67,20 @@ pub async fn producer(cfg: &RedisOutputOpts) -> Result<DynProducer> {
6967 . unwrap_or_else ( || format ! ( "{}_delays" , cfg. queue_key) ) ;
7068 let delayed_lock_key = format ! ( "{delayed_queue_key}_lock" ) ;
7169
72- backends:: RedisBackend :: < backends:: redis:: RedisMultiplexedConnectionManager > :: builder (
73- backends:: RedisConfig {
74- dsn : cfg. dsn . clone ( ) ,
75- max_connections : cfg. max_connections ,
76- queue_key : cfg. queue_key . clone ( ) ,
77- delayed_queue_key,
78- delayed_lock_key,
79- // FIXME: expose in config?
80- payload_key : "payload" . to_string ( ) ,
81- // consumer stuff we don't care about.
82- reinsert_on_nack : false ,
83- consumer_group : String :: new ( ) ,
84- consumer_name : String :: new ( ) ,
85- ack_deadline_ms : cfg. ack_deadline_ms ,
86- } ,
87- )
70+ backends:: RedisBackend :: builder ( backends:: RedisConfig {
71+ dsn : cfg. dsn . clone ( ) ,
72+ max_connections : cfg. max_connections ,
73+ queue_key : cfg. queue_key . clone ( ) ,
74+ delayed_queue_key,
75+ delayed_lock_key,
76+ // FIXME: expose in config?
77+ payload_key : "payload" . to_string ( ) ,
78+ // consumer stuff we don't care about.
79+ reinsert_on_nack : false ,
80+ consumer_group : String :: new ( ) ,
81+ consumer_name : String :: new ( ) ,
82+ ack_deadline_ms : cfg. ack_deadline_ms ,
83+ } )
8884 . make_dynamic ( )
8985 . build_producer ( )
9086 . await
0 commit comments