Skip to content

Commit 53ea47a

Browse files
authored
Release the read lock while creating connections inrefresh_connections (#191)
Changed refresh_connections to release the connections container lock while creating a new connection
1 parent 396536d commit 53ea47a

File tree

1 file changed

+47
-43
lines changed

1 file changed

+47
-43
lines changed

redis/src/cluster_async/mod.rs

Lines changed: 47 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -1309,51 +1309,55 @@ where
13091309
check_existing_conn: bool,
13101310
) {
13111311
info!("Started refreshing connections to {:?}", addresses);
1312-
let connections_container = inner.conn_lock.read().await;
1313-
let cluster_params = &inner.cluster_params;
1314-
let subscriptions_by_address = &inner.subscriptions_by_address;
1315-
let glide_connection_optons = &inner.glide_connection_options;
1312+
let mut tasks = FuturesUnordered::new();
1313+
let inner = inner.clone();
13161314

1317-
stream::iter(addresses.into_iter())
1318-
.fold(
1319-
&*connections_container,
1320-
|connections_container, address| async move {
1321-
let node_option = if check_existing_conn {
1322-
connections_container.remove_node(&address)
1323-
} else {
1324-
None
1325-
};
1315+
for address in addresses.into_iter() {
1316+
let inner = inner.clone();
13261317

1327-
// override subscriptions for this connection
1328-
let mut cluster_params = cluster_params.clone();
1329-
let subs_guard = subscriptions_by_address.read().await;
1330-
cluster_params.pubsub_subscriptions = subs_guard.get(&address).cloned();
1331-
drop(subs_guard);
1332-
let node = get_or_create_conn(
1333-
&address,
1334-
node_option,
1335-
&cluster_params,
1336-
conn_type,
1337-
glide_connection_optons.clone(),
1338-
)
1339-
.await;
1340-
match node {
1341-
Ok(node) => {
1342-
connections_container
1343-
.replace_or_add_connection_for_address(address, node);
1344-
}
1345-
Err(err) => {
1346-
warn!(
1347-
"Failed to refresh connection for node {}. Error: `{:?}`",
1348-
address, err
1349-
);
1350-
}
1351-
}
1352-
connections_container
1353-
},
1354-
)
1355-
.await;
1356-
info!("refresh connections completed");
1318+
tasks.push(async move {
1319+
let node_option = if check_existing_conn {
1320+
let connections_container = inner.conn_lock.read().await;
1321+
connections_container.remove_node(&address)
1322+
} else {
1323+
None
1324+
};
1325+
1326+
// Override subscriptions for this connection
1327+
let mut cluster_params = inner.cluster_params.clone();
1328+
let subs_guard = inner.subscriptions_by_address.read().await;
1329+
cluster_params.pubsub_subscriptions = subs_guard.get(&address).cloned();
1330+
drop(subs_guard);
1331+
1332+
let node = get_or_create_conn(
1333+
&address,
1334+
node_option,
1335+
&cluster_params,
1336+
conn_type,
1337+
inner.glide_connection_options.clone(),
1338+
)
1339+
.await;
1340+
1341+
(address, node)
1342+
});
1343+
}
1344+
1345+
// Poll connection tasks as soon as each one finishes
1346+
while let Some(result) = tasks.next().await {
1347+
match result {
1348+
(address, Ok(node)) => {
1349+
let connections_container = inner.conn_lock.read().await;
1350+
connections_container.replace_or_add_connection_for_address(address, node);
1351+
}
1352+
(address, Err(err)) => {
1353+
warn!(
1354+
"Failed to refresh connection for node {}. Error: `{:?}`",
1355+
address, err
1356+
);
1357+
}
1358+
}
1359+
}
1360+
debug!("refresh connections completed");
13571361
}
13581362

13591363
async fn aggregate_results(

0 commit comments

Comments
 (0)