Skip to content

Commit 2fbcf0c

Browse files
committed
Allow reseeding of nodes on MultiNodeConnection
1 parent 2590242 commit 2fbcf0c

File tree

1 file changed

+53
-10
lines changed

1 file changed

+53
-10
lines changed

elasticsearch/src/http/transport.rs

Lines changed: 53 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ use std::{
4646
atomic::{AtomicUsize, Ordering},
4747
Arc, RwLock,
4848
},
49-
time::Duration,
49+
time::{Duration, Instant},
5050
};
5151
use url::Url;
5252

@@ -348,7 +348,7 @@ impl Transport {
348348
.iter()
349349
.map(|url| Url::parse(url))
350350
.collect::<Result<Vec<_>, _>>()?;
351-
let conn_pool = MultiNodeConnectionPool::round_robin(urls);
351+
let conn_pool = MultiNodeConnectionPool::round_robin(urls, None);
352352
let transport = TransportBuilder::new(conn_pool).build()?;
353353
Ok(transport)
354354
}
@@ -379,6 +379,10 @@ impl Transport {
379379
B: Body,
380380
Q: Serialize + ?Sized,
381381
{
382+
if self.conn_pool.reseedable() {
383+
// Reseed nodes
384+
println!("Reseeding!");
385+
}
382386
let connection = self.conn_pool.next();
383387
let url = connection.url.join(path.trim_start_matches('/'))?;
384388
let reqwest_method = self.method(method);
@@ -462,6 +466,13 @@ impl Default for Transport {
462466
pub trait ConnectionPool: Debug + dyn_clone::DynClone + Sync + Send {
463467
/// Gets a reference to the next [Connection]
464468
fn next(&self) -> Connection;
469+
470+
fn reseedable(&self) -> bool {
471+
false
472+
}
473+
474+
// NOOP by default
475+
fn reseed(&self, _connection: Vec<Connection>) {}
465476
}
466477

467478
clone_trait_object!(ConnectionPool);
@@ -619,31 +630,63 @@ impl ConnectionPool for CloudConnectionPool {
619630
/// A Connection Pool that manages a static connection of nodes
620631
#[derive(Debug, Clone)]
621632
pub struct MultiNodeConnectionPool<TStrategy = RoundRobin> {
622-
connections: Arc<RwLock<Vec<Connection>>>,
633+
inner: Arc<RwLock<MultiNodeConnectionPoolInner>>,
634+
wait: Option<Duration>,
623635
strategy: TStrategy,
624636
}
625637

638+
#[derive(Debug, Clone)]
639+
pub struct MultiNodeConnectionPoolInner {
640+
last_update: Option<Instant>,
641+
connections: Vec<Connection>,
642+
}
643+
626644
impl<TStrategy> ConnectionPool for MultiNodeConnectionPool<TStrategy>
627645
where
628646
TStrategy: Strategy + Clone,
629647
{
630648
fn next(&self) -> Connection {
631-
let inner = self.connections.read().expect("lock poisoned");
632-
self.strategy.try_next(&inner).unwrap()
649+
let inner = self.inner.read().expect("lock poisoned");
650+
self.strategy.try_next(&inner.connections).unwrap()
651+
}
652+
653+
fn reseedable(&self) -> bool {
654+
let inner = self.inner.read().expect("lock poisoned");
655+
let wait = match self.wait {
656+
Some(wait) => wait,
657+
None => return false,
658+
};
659+
let last_update_is_stale = inner
660+
.last_update
661+
.as_ref()
662+
.map(|last_update| last_update.elapsed() > wait);
663+
last_update_is_stale.unwrap_or(true)
664+
}
665+
666+
fn reseed(&self, mut connection: Vec<Connection>) {
667+
let mut inner = self.inner.write().expect("lock poisoned");
668+
inner.last_update = Some(Instant::now());
669+
inner.connections.clear();
670+
inner.connections.append(&mut connection);
633671
}
634672
}
635673

636674
impl MultiNodeConnectionPool<RoundRobin> {
637675
/** Use a round-robin strategy for balancing traffic over the given set of nodes. */
638-
pub fn round_robin(urls: Vec<Url>) -> Self {
639-
let connections: Arc<RwLock<Vec<_>>> =
640-
Arc::new(RwLock::new(urls.into_iter().map(Connection::new).collect()));
676+
pub fn round_robin(urls: Vec<Url>, wait: Option<Duration>) -> Self {
677+
let connections = urls.into_iter().map(Connection::new).collect();
641678

642-
let strategy = RoundRobin::default();
679+
let inner: Arc<RwLock<MultiNodeConnectionPoolInner>> =
680+
Arc::new(RwLock::new(MultiNodeConnectionPoolInner {
681+
last_update: None,
682+
connections,
683+
}));
643684

685+
let strategy = RoundRobin::default();
644686
Self {
645-
connections,
687+
inner,
646688
strategy,
689+
wait,
647690
}
648691
}
649692
}

0 commit comments

Comments
 (0)