Skip to content

Commit e86b422

Browse files
committed
Rebase and update
1 parent cbf3e72 commit e86b422

File tree

7 files changed

+1379
-94
lines changed

7 files changed

+1379
-94
lines changed

Cargo.lock

Lines changed: 0 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

elasticsearch/Cargo.toml

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,17 +36,24 @@ url = "2"
3636
serde = { version = "1", features = ["derive"] }
3737
serde_json = "1"
3838
serde_with = "3"
39-
tokio = { version = "1", default-features = false, features = ["macros", "net", "time", "rt-multi-thread"] }
39+
40+
41+
#tokio = { version = "1", default-features = false, features = ["macros", "net", "time", "rt-multi-thread"] }
4042
void = "1"
4143

44+
[target.'cfg(not(target_arch = "wasm32"))'.dependencies.tokio]
45+
version = "1.0"
46+
default-features = false
47+
features = ["macros", "net", "time", "rt-multi-thread"]
48+
4249
[dev-dependencies]
4350
chrono = { version = "0.4", features = ["serde"] }
4451
clap = { version = "4", features = ["env"]}
4552
failure = "0.1"
4653
futures = "0.3"
4754
http = "1"
4855
axum = "0.7"
49-
hyper = { version = "1", features = ["server", "http1"] }
56+
#hyper = { version = "1", features = ["server", "http1"] }
5057
os_type = "2"
5158
regex="1"
5259
#sysinfo = "0.31"

elasticsearch/src/http/transport.rs

Lines changed: 73 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,12 @@
1818
*/
1919
//! HTTP transport and connection components
2020
21-
#[cfg(all(any(feature = "native-tls", feature = "rustls-tls"), not(target_arch = "wasm32")))]
21+
#[cfg(all(target_arch = "wasm32", any(feature = "native-tls", feature = "rustls-tls")))]
22+
compile_error!("TLS features are not compatible with the wasm target");
23+
24+
#[cfg(any(feature = "native-tls", feature = "rustls-tls"))]
2225
use crate::auth::ClientCertificate;
23-
#[cfg(all(any(feature = "native-tls", feature = "rustls-tls"), not(target_arch = "wasm32")))]
26+
#[cfg(any(feature = "native-tls", feature = "rustls-tls"))]
2427
use crate::cert::CertificateValidation;
2528
use crate::{
2629
auth::Credentials,
@@ -130,6 +133,8 @@ fn build_meta() -> String {
130133
meta.push_str(",tls=n");
131134
} else if cfg!(feature = "rustls-tls") {
132135
meta.push_str(",tls=r");
136+
} else if cfg!(target_arch = "wasm32") {
137+
meta.push_str(",tls=w");
133138
}
134139

135140
meta
@@ -138,9 +143,9 @@ fn build_meta() -> String {
138143
/// Builds a HTTP transport to make API calls to Elasticsearch
139144
pub struct TransportBuilder {
140145
client_builder: reqwest::ClientBuilder,
141-
conn_pool: Box<dyn ConnectionPool>,
146+
conn_pool: Arc<dyn ConnectionPool>,
142147
credentials: Option<Credentials>,
143-
#[cfg(all(any(feature = "native-tls", feature = "rustls-tls"), not(target_arch = "wasm32")))]
148+
#[cfg(any(feature = "native-tls", feature = "rustls-tls"))]
144149
cert_validation: Option<CertificateValidation>,
145150
#[cfg(not(target_arch = "wasm32"))]
146151
proxy: Option<Url>,
@@ -150,6 +155,7 @@ pub struct TransportBuilder {
150155
disable_proxy: bool,
151156
headers: HeaderMap,
152157
meta_header: bool,
158+
#[cfg(not(target_arch = "wasm32"))]
153159
timeout: Option<Duration>,
154160
}
155161

@@ -162,9 +168,9 @@ impl TransportBuilder {
162168
{
163169
Self {
164170
client_builder: reqwest::ClientBuilder::new(),
165-
conn_pool: Box::new(conn_pool),
171+
conn_pool: Arc::new(conn_pool),
166172
credentials: None,
167-
#[cfg(all(any(feature = "native-tls", feature = "rustls-tls"), not(target_arch = "wasm32")))]
173+
#[cfg(any(feature = "native-tls", feature = "rustls-tls"))]
168174
cert_validation: None,
169175
#[cfg(not(target_arch = "wasm32"))]
170176
proxy: None,
@@ -174,6 +180,7 @@ impl TransportBuilder {
174180
disable_proxy: false,
175181
headers: HeaderMap::new(),
176182
meta_header: true,
183+
#[cfg(not(target_arch = "wasm32"))]
177184
timeout: None,
178185
}
179186
}
@@ -211,7 +218,7 @@ impl TransportBuilder {
211218
/// Validation applied to the certificate provided to establish a HTTPS connection.
212219
/// By default, full validation is applied. When using a self-signed certificate,
213220
/// different validation can be applied.
214-
#[cfg(all(any(feature = "native-tls", feature = "rustls-tls"), not(target_arch = "wasm32")))]
221+
#[cfg(any(feature = "native-tls", feature = "rustls-tls"))]
215222
pub fn cert_validation(mut self, validation: CertificateValidation) -> Self {
216223
self.cert_validation = Some(validation);
217224
self
@@ -249,6 +256,7 @@ impl TransportBuilder {
249256
///
250257
/// The timeout is applied from when the request starts connecting until the response body has finished.
251258
/// Default is no timeout.
259+
#[cfg(not(target_arch = "wasm32"))]
252260
pub fn timeout(mut self, timeout: Duration) -> Self {
253261
self.timeout = Some(timeout);
254262
self
@@ -267,7 +275,7 @@ impl TransportBuilder {
267275
client_builder = client_builder.timeout(t);
268276
}
269277

270-
#[cfg(all(any(feature = "native-tls", feature = "rustls-tls"), not(target_arch = "wasm32")))]
278+
#[cfg(any(feature = "native-tls", feature = "rustls-tls"))]
271279
{
272280
if let Some(Credentials::Certificate(cert)) = &self.credentials {
273281
client_builder = match cert {
@@ -289,7 +297,7 @@ impl TransportBuilder {
289297
};
290298
}
291299

292-
#[cfg(all(any(feature = "native-tls", feature = "rustls-tls"), not(target_arch = "wasm32")))]
300+
#[cfg(any(feature = "native-tls", feature = "rustls-tls"))]
293301
if let Some(v) = self.cert_validation {
294302
client_builder = match v {
295303
CertificateValidation::Default => client_builder,
@@ -326,7 +334,7 @@ impl TransportBuilder {
326334
let client = client_builder.build()?;
327335
Ok(Transport {
328336
client,
329-
conn_pool: Arc::new(self.conn_pool),
337+
conn_pool: self.conn_pool,
330338
credentials: self.credentials,
331339
send_meta: self.meta_header,
332340
})
@@ -373,7 +381,7 @@ impl Connection {
373381
pub struct Transport {
374382
client: reqwest::Client,
375383
credentials: Option<Credentials>,
376-
conn_pool: Arc<Box<dyn ConnectionPool>>,
384+
conn_pool: Arc<dyn ConnectionPool>,
377385
send_meta: bool,
378386
}
379387

@@ -494,23 +502,7 @@ impl Transport {
494502
// on a specific request, we want it to overwrite.
495503
if let Some(c) = &self.credentials {
496504
request_builder = match c {
497-
Credentials::Basic(u, p) => {
498-
#[cfg(not(target_arch = "wasm32"))]
499-
{
500-
request_builder.basic_auth(u, Some(p))
501-
}
502-
#[cfg(target_arch = "wasm32")]
503-
{
504-
// Missing basic_auth in the wasm32 target
505-
let mut header_value = b"Basic ".to_vec();
506-
{
507-
let mut encoder = Base64Encoder::new(&mut header_value, base64::STANDARD);
508-
// The unwraps here are fine because Vec::write* is infallible.
509-
write!(encoder, "{}:{}", u, p).unwrap();
510-
}
511-
request_builder.header(reqwest::header::AUTHORIZATION, header_value)
512-
}
513-
},
505+
Credentials::Basic(u, p) => request_builder.basic_auth(u, Some(p)),
514506
Credentials::Bearer(t) => request_builder.bearer_auth(t),
515507
#[cfg(any(feature = "native-tls", feature = "rustls-tls"))]
516508
Credentials::Certificate(_) => request_builder,
@@ -592,6 +584,47 @@ impl Transport {
592584
)?)
593585
}
594586

587+
async fn reseed(&self) {
588+
// Requests will execute against old connection pool during reseed
589+
let connection = self.conn_pool.next();
590+
591+
// Build node info request
592+
let node_request = self.request_builder(
593+
&connection,
594+
Method::Get,
595+
"_nodes/http?filter_path=nodes.*.http",
596+
HeaderMap::default(),
597+
None::<&()>,
598+
None::<()>,
599+
None,
600+
).unwrap();
601+
602+
let scheme = connection.url.scheme();
603+
let resp = node_request.send().await.unwrap();
604+
let json: Value = resp.json().await.unwrap();
605+
let connections: Vec<Connection> = json["nodes"]
606+
.as_object()
607+
.unwrap()
608+
.iter()
609+
.map(|(_, node)| {
610+
let address = node["http"]["publish_address"]
611+
.as_str()
612+
.or_else(|| {
613+
Some(
614+
node["http"]["bound_address"].as_array().unwrap()[0]
615+
.as_str()
616+
.unwrap(),
617+
)
618+
})
619+
.unwrap();
620+
let url = Self::parse_to_url(address, scheme).unwrap();
621+
Connection::new(url)
622+
})
623+
.collect();
624+
625+
self.conn_pool.reseed(connections);
626+
}
627+
595628
/// Creates an asynchronous request that can be awaited
596629
pub async fn send<B, Q>(
597630
&self,
@@ -606,47 +639,19 @@ impl Transport {
606639
B: Body,
607640
Q: Serialize + ?Sized,
608641
{
609-
// Requests will execute against old connection pool during reseed
610642
if self.conn_pool.reseedable() {
611-
let conn_pool = self.conn_pool.clone();
612-
let connection = conn_pool.next();
613-
614-
// Build node info request
615-
let node_request = self.request_builder(
616-
&connection,
617-
Method::Get,
618-
"_nodes/http?filter_path=nodes.*.http",
619-
headers.clone(),
620-
None::<&Q>,
621-
None::<B>,
622-
timeout,
623-
)?;
624-
625-
tokio::spawn(async move {
626-
let scheme = connection.url.scheme();
627-
let resp = node_request.send().await.unwrap();
628-
let json: Value = resp.json().await.unwrap();
629-
let connections: Vec<Connection> = json["nodes"]
630-
.as_object()
631-
.unwrap()
632-
.iter()
633-
.map(|(_, node)| {
634-
let address = node["http"]["publish_address"]
635-
.as_str()
636-
.or_else(|| {
637-
Some(
638-
node["http"]["bound_address"].as_array().unwrap()[0]
639-
.as_str()
640-
.unwrap(),
641-
)
642-
})
643-
.unwrap();
644-
let url = Self::parse_to_url(address, scheme).unwrap();
645-
Connection::new(url)
646-
})
647-
.collect();
648-
conn_pool.reseed(connections);
649-
});
643+
#[cfg(not(target_arch = "wasm32"))]
644+
{
645+
let transport = self.clone();
646+
tokio::spawn(async move { transport.reseed().await });
647+
}
648+
#[cfg(target_arch = "wasm32")]
649+
{
650+
// Reseed synchronously (i.e. do not spawn a background task) in WASM.
651+
// Running in the background is platform-dependent (web-sys / wasi), we'll
652+
// address this if synchronous reseed is an issue.
653+
self.reseed().await
654+
}
650655
}
651656

652657
let connection = self.conn_pool.next();

examples/cloudflare_worker/Cargo.toml

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,14 @@ crate-type = ["cdylib", "rlib"]
1414
default = ["console_error_panic_hook"]
1515

1616
[dependencies]
17-
cfg-if = "0.1.2"
18-
worker = "0.0.9"
19-
serde_json = "1.0"
17+
cfg-if = "0.1"
18+
serde = "1"
19+
serde_json = "1"
20+
url = "2"
2021
web-sys = { version = "0.3", features = ["console"] }
22+
worker = "0.3"
2123

2224
elasticsearch = { path = "../../elasticsearch" }
23-
serde = "1.0"
24-
url = "2.2"
2525

2626
# The `console_error_panic_hook` crate provides better debugging of panics by
2727
# logging them with `console.error`. This is great for development, but requires

0 commit comments

Comments
 (0)