Skip to content

Allow running on wasm32 targets using fetch() for http requests #200

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Aug 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
/target
target
pkg
**/*.rs.bk

.idea
Expand Down
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 9 additions & 2 deletions elasticsearch/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,24 @@ url = "2"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
serde_with = "3"
tokio = { version = "1", default-features = false, features = ["macros", "net", "time", "rt-multi-thread"] }


#tokio = { version = "1", default-features = false, features = ["macros", "net", "time", "rt-multi-thread"] }
void = "1"

[target.'cfg(not(target_arch = "wasm32"))'.dependencies.tokio]
version = "1.0"
default-features = false
features = ["macros", "net", "time", "rt-multi-thread"]

[dev-dependencies]
chrono = { version = "0.4", features = ["serde"] }
clap = { version = "4", features = ["env"]}
failure = "0.1"
futures = "0.3"
http = "1"
axum = "0.7"
hyper = { version = "1", features = ["server", "http1"] }
#hyper = { version = "1", features = ["server", "http1"] }
os_type = "2"
regex="1"
#sysinfo = "0.31"
Expand Down
121 changes: 77 additions & 44 deletions elasticsearch/src/http/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
*/
//! HTTP transport and connection components

#[cfg(all(target_arch = "wasm32", any(feature = "native-tls", feature = "rustls-tls")))]
compile_error!("TLS features are not compatible with the wasm target");

#[cfg(any(feature = "native-tls", feature = "rustls-tls"))]
use crate::auth::ClientCertificate;
#[cfg(any(feature = "native-tls", feature = "rustls-tls"))]
Expand Down Expand Up @@ -130,6 +133,8 @@ fn build_meta() -> String {
meta.push_str(",tls=n");
} else if cfg!(feature = "rustls-tls") {
meta.push_str(",tls=r");
} else if cfg!(target_arch = "wasm32") {
meta.push_str(",tls=w");
}

meta
Expand All @@ -138,15 +143,19 @@ fn build_meta() -> String {
/// Builds a HTTP transport to make API calls to Elasticsearch
pub struct TransportBuilder {
client_builder: reqwest::ClientBuilder,
conn_pool: Box<dyn ConnectionPool>,
conn_pool: Arc<dyn ConnectionPool>,
credentials: Option<Credentials>,
#[cfg(any(feature = "native-tls", feature = "rustls-tls"))]
cert_validation: Option<CertificateValidation>,
#[cfg(not(target_arch = "wasm32"))]
proxy: Option<Url>,
#[cfg(not(target_arch = "wasm32"))]
proxy_credentials: Option<Credentials>,
#[cfg(not(target_arch = "wasm32"))]
disable_proxy: bool,
headers: HeaderMap,
meta_header: bool,
#[cfg(not(target_arch = "wasm32"))]
timeout: Option<Duration>,
}

Expand All @@ -159,15 +168,19 @@ impl TransportBuilder {
{
Self {
client_builder: reqwest::ClientBuilder::new(),
conn_pool: Box::new(conn_pool),
conn_pool: Arc::new(conn_pool),
credentials: None,
#[cfg(any(feature = "native-tls", feature = "rustls-tls"))]
cert_validation: None,
#[cfg(not(target_arch = "wasm32"))]
proxy: None,
#[cfg(not(target_arch = "wasm32"))]
proxy_credentials: None,
#[cfg(not(target_arch = "wasm32"))]
disable_proxy: false,
headers: HeaderMap::new(),
meta_header: true,
#[cfg(not(target_arch = "wasm32"))]
timeout: None,
}
}
Expand All @@ -176,6 +189,7 @@ impl TransportBuilder {
///
/// An optional username and password will be used to set the
/// `Proxy-Authorization` header using Basic Authentication.
#[cfg(not(target_arch = "wasm32"))]
pub fn proxy(mut self, url: Url, username: Option<&str>, password: Option<&str>) -> Self {
self.proxy = Some(url);
if let Some(u) = username {
Expand All @@ -189,6 +203,7 @@ impl TransportBuilder {
/// Whether to disable proxies, including system proxies.
///
/// NOTE: System proxies are enabled by default.
#[cfg(not(target_arch = "wasm32"))]
pub fn disable_proxy(mut self) -> Self {
self.disable_proxy = true;
self
Expand Down Expand Up @@ -241,6 +256,7 @@ impl TransportBuilder {
///
/// The timeout is applied from when the request starts connecting until the response body has finished.
/// Default is no timeout.
#[cfg(not(target_arch = "wasm32"))]
pub fn timeout(mut self, timeout: Duration) -> Self {
self.timeout = Some(timeout);
self
Expand All @@ -254,6 +270,7 @@ impl TransportBuilder {
client_builder = client_builder.default_headers(self.headers);
}

#[cfg(not(target_arch = "wasm32"))]
if let Some(t) = self.timeout {
client_builder = client_builder.timeout(t);
}
Expand Down Expand Up @@ -300,6 +317,7 @@ impl TransportBuilder {
}
}

#[cfg(not(target_arch = "wasm32"))]
if self.disable_proxy {
client_builder = client_builder.no_proxy();
} else if let Some(url) = self.proxy {
Expand All @@ -316,7 +334,7 @@ impl TransportBuilder {
let client = client_builder.build()?;
Ok(Transport {
client,
conn_pool: Arc::new(self.conn_pool),
conn_pool: self.conn_pool,
credentials: self.credentials,
send_meta: self.meta_header,
})
Expand Down Expand Up @@ -363,7 +381,7 @@ impl Connection {
pub struct Transport {
client: reqwest::Client,
credentials: Option<Credentials>,
conn_pool: Arc<Box<dyn ConnectionPool>>,
conn_pool: Arc<dyn ConnectionPool>,
send_meta: bool,
}

Expand Down Expand Up @@ -463,6 +481,7 @@ impl Transport {
headers: HeaderMap,
query_string: Option<&Q>,
body: Option<B>,
#[allow(unused_variables)]
timeout: Option<Duration>,
) -> Result<reqwest::RequestBuilder, Error>
where
Expand All @@ -473,6 +492,7 @@ impl Transport {
let url = connection.url.join(path.trim_start_matches('/'))?;
let mut request_builder = self.client.request(reqwest_method, url);

#[cfg(not(target_arch = "wasm32"))]
if let Some(t) = timeout {
request_builder = request_builder.timeout(t);
}
Expand Down Expand Up @@ -564,6 +584,47 @@ impl Transport {
)?)
}

async fn reseed(&self) {
// Requests will execute against old connection pool during reseed
let connection = self.conn_pool.next();

// Build node info request
let node_request = self.request_builder(
&connection,
Method::Get,
"_nodes/http?filter_path=nodes.*.http",
HeaderMap::default(),
None::<&()>,
None::<()>,
None,
).unwrap();

let scheme = connection.url.scheme();
let resp = node_request.send().await.unwrap();
let json: Value = resp.json().await.unwrap();
let connections: Vec<Connection> = json["nodes"]
.as_object()
.unwrap()
.iter()
.map(|(_, node)| {
let address = node["http"]["publish_address"]
.as_str()
.or_else(|| {
Some(
node["http"]["bound_address"].as_array().unwrap()[0]
.as_str()
.unwrap(),
)
})
.unwrap();
let url = Self::parse_to_url(address, scheme).unwrap();
Connection::new(url)
})
.collect();

self.conn_pool.reseed(connections);
}

/// Creates an asynchronous request that can be awaited
pub async fn send<B, Q>(
&self,
Expand All @@ -578,47 +639,19 @@ impl Transport {
B: Body,
Q: Serialize + ?Sized,
{
// Requests will execute against old connection pool during reseed
if self.conn_pool.reseedable() {
let conn_pool = self.conn_pool.clone();
let connection = conn_pool.next();

// Build node info request
let node_request = self.request_builder(
&connection,
Method::Get,
"_nodes/http?filter_path=nodes.*.http",
headers.clone(),
None::<&Q>,
None::<B>,
timeout,
)?;

tokio::spawn(async move {
let scheme = connection.url.scheme();
let resp = node_request.send().await.unwrap();
let json: Value = resp.json().await.unwrap();
let connections: Vec<Connection> = json["nodes"]
.as_object()
.unwrap()
.iter()
.map(|(_, node)| {
let address = node["http"]["publish_address"]
.as_str()
.or_else(|| {
Some(
node["http"]["bound_address"].as_array().unwrap()[0]
.as_str()
.unwrap(),
)
})
.unwrap();
let url = Self::parse_to_url(address, scheme).unwrap();
Connection::new(url)
})
.collect();
conn_pool.reseed(connections);
});
#[cfg(not(target_arch = "wasm32"))]
{
let transport = self.clone();
tokio::spawn(async move { transport.reseed().await });
}
#[cfg(target_arch = "wasm32")]
{
// Reseed synchronously (i.e. do not spawn a background task) in WASM.
// Running in the background is platform-dependent (web-sys / wasi), we'll
// address this if synchronous reseed is an issue.
self.reseed().await
}
}

let connection = self.conn_pool.next();
Expand Down
4 changes: 3 additions & 1 deletion elasticsearch/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -371,10 +371,12 @@ mod readme {
extern crate dyn_clone;

pub mod auth;
pub mod cert;
pub mod http;
pub mod params;

#[cfg(not(target_arch = "wasm32"))]
pub mod cert;

// GENERATED-BEGIN:namespace-modules
// Generated code - do not edit until the next GENERATED-END marker

Expand Down
3 changes: 3 additions & 0 deletions examples/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
This directory contains standalone examples that need their own independent build configuration.

Other examples can also be found in [`elasticsearch/examples`](../elasticsearch/examples/).
Loading