Skip to content

Commit 2590242

Browse files
committed
Rename connection and return cloned object
* The connection should be owned by the current user of said connection
1 parent 21b563d commit 2590242

File tree

1 file changed

+23
-21
lines changed

1 file changed

+23
-21
lines changed

elasticsearch/src/http/transport.rs

Lines changed: 23 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ use std::{
4444
io::{self, Write},
4545
sync::{
4646
atomic::{AtomicUsize, Ordering},
47-
Arc,
47+
Arc, RwLock,
4848
},
4949
time::Duration,
5050
};
@@ -348,7 +348,7 @@ impl Transport {
348348
.iter()
349349
.map(|url| Url::parse(url))
350350
.collect::<Result<Vec<_>, _>>()?;
351-
let conn_pool = StaticNodeListConnectionPool::round_robin(urls);
351+
let conn_pool = MultiNodeConnectionPool::round_robin(urls);
352352
let transport = TransportBuilder::new(conn_pool).build()?;
353353
Ok(transport)
354354
}
@@ -461,7 +461,7 @@ impl Default for Transport {
461461
/// dynamically at runtime, based upon the response to API calls.
462462
pub trait ConnectionPool: Debug + dyn_clone::DynClone + Sync + Send {
463463
/// Gets a reference to the next [Connection]
464-
fn next(&self) -> &Connection;
464+
fn next(&self) -> Connection;
465465
}
466466

467467
clone_trait_object!(ConnectionPool);
@@ -490,8 +490,8 @@ impl Default for SingleNodeConnectionPool {
490490

491491
impl ConnectionPool for SingleNodeConnectionPool {
492492
/// Gets a reference to the next [Connection]
493-
fn next(&self) -> &Connection {
494-
&self.connection
493+
fn next(&self) -> Connection {
494+
self.connection.clone()
495495
}
496496
}
497497

@@ -611,31 +611,33 @@ impl CloudConnectionPool {
611611

612612
impl ConnectionPool for CloudConnectionPool {
613613
/// Gets a reference to the next [Connection]
614-
fn next(&self) -> &Connection {
615-
&self.connection
614+
fn next(&self) -> Connection {
615+
self.connection.clone()
616616
}
617617
}
618618

619619
/// A Connection Pool that manages a static connection of nodes
620620
#[derive(Debug, Clone)]
621-
pub struct StaticNodeListConnectionPool<TStrategy = RoundRobin> {
622-
connections: Vec<Connection>,
621+
pub struct MultiNodeConnectionPool<TStrategy = RoundRobin> {
622+
connections: Arc<RwLock<Vec<Connection>>>,
623623
strategy: TStrategy,
624624
}
625625

626-
impl<TStrategy> ConnectionPool for StaticNodeListConnectionPool<TStrategy>
626+
impl<TStrategy> ConnectionPool for MultiNodeConnectionPool<TStrategy>
627627
where
628628
TStrategy: Strategy + Clone,
629629
{
630-
fn next(&self) -> &Connection {
631-
self.strategy.try_next(&self.connections).unwrap()
630+
fn next(&self) -> Connection {
631+
let inner = self.connections.read().expect("lock poisoned");
632+
self.strategy.try_next(&inner).unwrap()
632633
}
633634
}
634635

635-
impl StaticNodeListConnectionPool<RoundRobin> {
636+
impl MultiNodeConnectionPool<RoundRobin> {
636637
/** Use a round-robin strategy for balancing traffic over the given set of nodes. */
637638
pub fn round_robin(urls: Vec<Url>) -> Self {
638-
let connections: Vec<_> = urls.into_iter().map(Connection::new).collect();
639+
let connections: Arc<RwLock<Vec<_>>> =
640+
Arc::new(RwLock::new(urls.into_iter().map(Connection::new).collect()));
639641

640642
let strategy = RoundRobin::default();
641643

@@ -649,7 +651,7 @@ impl StaticNodeListConnectionPool<RoundRobin> {
649651
/** The strategy selects an address from a given collection. */
650652
pub trait Strategy: Send + Sync + Debug {
651653
/** Try get the next connection. */
652-
fn try_next<'a>(&self, connections: &'a [Connection]) -> Result<&'a Connection, Error>;
654+
fn try_next<'a>(&self, connections: &'a [Connection]) -> Result<Connection, Error>;
653655
}
654656

655657
/** A round-robin strategy cycles through nodes sequentially. */
@@ -667,12 +669,12 @@ impl Default for RoundRobin {
667669
}
668670

669671
impl Strategy for RoundRobin {
670-
fn try_next<'a>(&self, connections: &'a [Connection]) -> Result<&'a Connection, Error> {
672+
fn try_next<'a>(&self, connections: &'a [Connection]) -> Result<Connection, Error> {
671673
if connections.is_empty() {
672674
Err(crate::error::lib("Connection list empty"))
673675
} else {
674676
let i = self.index.fetch_add(1, Ordering::Relaxed) % connections.len();
675-
Ok(&connections[i])
677+
Ok(connections[i].clone())
676678
}
677679
}
678680
}
@@ -682,8 +684,8 @@ pub mod tests {
682684
#[cfg(any(feature = "native-tls", feature = "rustls-tls"))]
683685
use crate::auth::ClientCertificate;
684686
use crate::http::transport::{
685-
CloudId, Connection, ConnectionPool, RoundRobin, SingleNodeConnectionPool,
686-
StaticNodeListConnectionPool, TransportBuilder,
687+
CloudId, Connection, ConnectionPool, MultiNodeConnectionPool, RoundRobin,
688+
SingleNodeConnectionPool, TransportBuilder,
687689
};
688690
use url::Url;
689691

@@ -795,8 +797,8 @@ pub mod tests {
795797
assert_eq!(conn.url.as_str(), "http://10.1.2.3/");
796798
}
797799

798-
fn round_robin(addresses: Vec<Url>) -> StaticNodeListConnectionPool<RoundRobin> {
799-
StaticNodeListConnectionPool::round_robin(addresses)
800+
fn round_robin(addresses: Vec<Url>) -> MultiNodeConnectionPool<RoundRobin> {
801+
MultiNodeConnectionPool::round_robin(addresses)
800802
}
801803

802804
fn expected_addresses() -> Vec<Url> {

0 commit comments

Comments
 (0)