Skip to content

Commit 1e8c4df

Browse files
committed
Begin implementing the sniffing of nodes
WIP
1 parent 2fbcf0c commit 1e8c4df

File tree

1 file changed

+19
-4
lines changed

1 file changed

+19
-4
lines changed

elasticsearch/src/http/transport.rs

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ use crate::{
3434
response::Response,
3535
Method,
3636
},
37+
nodes::{NodesInfo, NodesInfoParts},
3738
};
3839
use base64::write::EncoderWriter as Base64Encoder;
3940
use bytes::BytesMut;
@@ -380,10 +381,13 @@ impl Transport {
380381
Q: Serialize + ?Sized,
381382
{
382383
if self.conn_pool.reseedable() {
383-
// Reseed nodes
384-
println!("Reseeding!");
384+
self.conn_pool.reseeding();
385+
// NodesInfo::new(&self, NodesInfoParts::None)
386+
// .send()
387+
// .await
388+
// .expect("Could not retrieve nodes for refresh");
385389
}
386-
let connection = self.conn_pool.next();
390+
let connection = self.conn_pool.next()
387391
let url = connection.url.join(path.trim_start_matches('/'))?;
388392
let reqwest_method = self.method(method);
389393
let mut request_builder = self.client.request(reqwest_method, url);
@@ -471,6 +475,9 @@ pub trait ConnectionPool: Debug + dyn_clone::DynClone + Sync + Send {
471475
false
472476
}
473477

478+
// NOOP
479+
fn reseeding(&self) {}
480+
474481
// NOOP by default
475482
fn reseed(&self, _connection: Vec<Connection>) {}
476483
}
@@ -637,6 +644,7 @@ pub struct MultiNodeConnectionPool<TStrategy = RoundRobin> {
637644

638645
#[derive(Debug, Clone)]
639646
pub struct MultiNodeConnectionPoolInner {
647+
reseeding: bool,
640648
last_update: Option<Instant>,
641649
connections: Vec<Connection>,
642650
}
@@ -660,14 +668,20 @@ where
660668
.last_update
661669
.as_ref()
662670
.map(|last_update| last_update.elapsed() > wait);
663-
last_update_is_stale.unwrap_or(true)
671+
last_update_is_stale.unwrap_or(true) && !inner.reseeding
672+
}
673+
674+
fn reseeding(&self) {
675+
let mut inner = self.inner.write().expect("Lock Poisoned");
676+
inner.reseeding = true
664677
}
665678

666679
fn reseed(&self, mut connection: Vec<Connection>) {
667680
let mut inner = self.inner.write().expect("lock poisoned");
668681
inner.last_update = Some(Instant::now());
669682
inner.connections.clear();
670683
inner.connections.append(&mut connection);
684+
inner.reseeding = false;
671685
}
672686
}
673687

@@ -678,6 +692,7 @@ impl MultiNodeConnectionPool<RoundRobin> {
678692

679693
let inner: Arc<RwLock<MultiNodeConnectionPoolInner>> =
680694
Arc::new(RwLock::new(MultiNodeConnectionPoolInner {
695+
reseeding: false,
681696
last_update: None,
682697
connections,
683698
}));

0 commit comments

Comments
 (0)