Skip to content

Commit 0ccc20d

Browse files
committed
Begin implementing the sniffing of nodes
* make review edits
1 parent 2705663 commit 0ccc20d

File tree

1 file changed

+104
-24
lines changed

1 file changed

+104
-24
lines changed

elasticsearch/src/http/transport.rs

Lines changed: 104 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ use crate::{
3838
use base64::write::EncoderWriter as Base64Encoder;
3939
use bytes::BytesMut;
4040
use serde::Serialize;
41+
use serde_json::Value;
4142
use std::{
4243
error, fmt,
4344
fmt::Debug,
@@ -288,7 +289,7 @@ impl Default for TransportBuilder {
288289
/// A connection to an Elasticsearch node, used to send an API request
289290
#[derive(Debug, Clone)]
290291
pub struct Connection {
291-
url: Url,
292+
url: Arc<Url>,
292293
}
293294

294295
impl Connection {
@@ -303,8 +304,14 @@ impl Connection {
303304
url
304305
};
305306

307+
let url = Arc::new(url);
308+
306309
Self { url }
307310
}
311+
312+
pub fn url(&self) -> Arc<Url> {
313+
self.url.clone()
314+
}
308315
}
309316

310317
/// A HTTP transport responsible for making the API requests to Elasticsearch,
@@ -365,27 +372,22 @@ impl Transport {
365372
Ok(transport)
366373
}
367374

368-
/// Creates an asynchronous request that can be awaited
369-
pub async fn send<B, Q>(
375+
pub fn request_builder<B, Q>(
370376
&self,
377+
connection: &Connection,
371378
method: Method,
372379
path: &str,
373380
headers: HeaderMap,
374381
query_string: Option<&Q>,
375382
body: Option<B>,
376383
timeout: Option<Duration>,
377-
) -> Result<Response, Error>
384+
) -> Result<reqwest::RequestBuilder, Error>
378385
where
379386
B: Body,
380387
Q: Serialize + ?Sized,
381388
{
382-
if self.conn_pool.reseedable() {
383-
// Reseed nodes
384-
println!("Reseeding!");
385-
}
386-
let connection = self.conn_pool.next();
387-
let url = connection.url.join(path.trim_start_matches('/'))?;
388389
let reqwest_method = self.method(method);
390+
let url = connection.url.join(path.trim_start_matches('/'))?;
389391
let mut request_builder = self.client.request(reqwest_method, url);
390392

391393
if let Some(t) = timeout {
@@ -442,6 +444,71 @@ impl Transport {
442444
if let Some(q) = query_string {
443445
request_builder = request_builder.query(q);
444446
}
447+
Ok(request_builder)
448+
}
449+
450+
/// Creates an asynchronous request that can be awaited
451+
pub async fn send<B, Q>(
452+
&self,
453+
method: Method,
454+
path: &str,
455+
headers: HeaderMap,
456+
query_string: Option<&Q>,
457+
body: Option<B>,
458+
timeout: Option<Duration>,
459+
) -> Result<Response, Error>
460+
where
461+
B: Body,
462+
Q: Serialize + ?Sized,
463+
{
464+
let connection = self.conn_pool.next();
465+
466+
// Threads will execute against old connection pool during reseed
467+
if self.conn_pool.reseedable() {
468+
// Set as reseeding prevents another thread from attempting
469+
// to reseed during es request and reseed
470+
self.conn_pool.reseeding();
471+
472+
let scheme = &connection.url.scheme();
473+
// Build node info request
474+
let node_request = self.request_builder(
475+
&connection,
476+
Method::Get,
477+
"_nodes/_all/http",
478+
headers.clone(),
479+
None::<&Q>,
480+
None::<B>,
481+
timeout,
482+
)?;
483+
let resp = node_request.send().await?;
484+
let json: Value = resp.json().await?;
485+
let connections: Vec<Connection> = json["nodes"]
486+
.as_object()
487+
.unwrap()
488+
.iter()
489+
.map(|h| {
490+
let url = format!(
491+
"{}://{}",
492+
scheme,
493+
h.1["http"]["publish_address"].as_str().unwrap()
494+
);
495+
let url = Url::parse(&url).unwrap();
496+
Connection::new(url)
497+
})
498+
.collect();
499+
self.conn_pool.reseed(connections);
500+
}
501+
502+
let connection = self.conn_pool.next();
503+
let request_builder = self.request_builder(
504+
&connection,
505+
method,
506+
path,
507+
headers,
508+
query_string,
509+
body,
510+
timeout,
511+
)?;
445512

446513
let response = request_builder.send().await;
447514
match response {
@@ -471,6 +538,9 @@ pub trait ConnectionPool: Debug + dyn_clone::DynClone + Sync + Send {
471538
false
472539
}
473540

541+
// NOOP
542+
fn reseeding(&self) {}
543+
474544
// NOOP by default
475545
fn reseed(&self, _connection: Vec<Connection>) {}
476546
}
@@ -629,70 +699,80 @@ impl ConnectionPool for CloudConnectionPool {
629699

630700
/// A Connection Pool that manages a static connection of nodes
631701
#[derive(Debug, Clone)]
632-
pub struct MultiNodeConnectionPool<TStrategy = RoundRobin> {
702+
pub struct MultiNodeConnectionPool<LoadBalancingStrategy = RoundRobin> {
633703
inner: Arc<RwLock<MultiNodeConnectionPoolInner>>,
634-
wait: Option<Duration>,
635-
strategy: TStrategy,
704+
reseed_frequency: Option<Duration>,
705+
load_balancing_strategy: LoadBalancingStrategy,
636706
}
637707

638708
#[derive(Debug, Clone)]
639709
pub struct MultiNodeConnectionPoolInner {
710+
reseeding: bool,
640711
last_update: Option<Instant>,
641712
connections: Vec<Connection>,
642713
}
643714

644715
impl<TStrategy> ConnectionPool for MultiNodeConnectionPool<TStrategy>
645716
where
646-
TStrategy: Strategy + Clone,
717+
TStrategy: LoadBalancingStrategy + Clone,
647718
{
648719
fn next(&self) -> Connection {
649720
let inner = self.inner.read().expect("lock poisoned");
650-
self.strategy.try_next(&inner.connections).unwrap()
721+
self.load_balancing_strategy
722+
.try_next(&inner.connections)
723+
.unwrap()
651724
}
652725

653726
fn reseedable(&self) -> bool {
654727
let inner = self.inner.read().expect("lock poisoned");
655-
let wait = match self.wait {
728+
let reseed_frequency = match self.reseed_frequency {
656729
Some(wait) => wait,
657730
None => return false,
658731
};
659732
let last_update_is_stale = inner
660733
.last_update
661734
.as_ref()
662-
.map(|last_update| last_update.elapsed() > wait);
663-
last_update_is_stale.unwrap_or(true)
735+
.map(|last_update| last_update.elapsed() > reseed_frequency);
736+
last_update_is_stale.unwrap_or(true) && !inner.reseeding
737+
}
738+
739+
fn reseeding(&self) {
740+
let mut inner = self.inner.write().expect("Lock Poisoned");
741+
inner.reseeding = true
664742
}
665743

666744
fn reseed(&self, mut connection: Vec<Connection>) {
667745
let mut inner = self.inner.write().expect("lock poisoned");
668746
inner.last_update = Some(Instant::now());
669747
inner.connections.clear();
670748
inner.connections.append(&mut connection);
749+
inner.reseeding = false;
671750
}
672751
}
673752

674753
impl MultiNodeConnectionPool<RoundRobin> {
675754
/** Use a round-robin strategy for balancing traffic over the given set of nodes. */
676-
pub fn round_robin(urls: Vec<Url>, wait: Option<Duration>) -> Self {
755+
pub fn round_robin(urls: Vec<Url>, reseed_frequency: Option<Duration>) -> Self {
677756
let connections = urls.into_iter().map(Connection::new).collect();
678757

679758
let inner: Arc<RwLock<MultiNodeConnectionPoolInner>> =
680759
Arc::new(RwLock::new(MultiNodeConnectionPoolInner {
760+
reseeding: false,
681761
last_update: None,
682762
connections,
683763
}));
684764

685-
let strategy = RoundRobin::default();
765+
let load_balancing_strategy = RoundRobin::default();
686766
Self {
687767
inner,
688-
strategy,
689-
wait,
768+
load_balancing_strategy,
769+
reseed_frequency,
690770
}
691771
}
692772
}
693773

694774
/** The strategy selects an address from a given collection. */
695-
pub trait Strategy: Send + Sync + Debug {
775+
pub trait LoadBalancingStrategy: Send + Sync + Debug {
696776
/** Try get the next connection. */
697777
fn try_next<'a>(&self, connections: &'a [Connection]) -> Result<Connection, Error>;
698778
}
@@ -711,7 +791,7 @@ impl Default for RoundRobin {
711791
}
712792
}
713793

714-
impl Strategy for RoundRobin {
794+
impl LoadBalancingStrategy for RoundRobin {
715795
fn try_next<'a>(&self, connections: &'a [Connection]) -> Result<Connection, Error> {
716796
if connections.is_empty() {
717797
Err(crate::error::lib("Connection list empty"))

0 commit comments

Comments
 (0)