Skip to content

Commit f0cc454

Browse files
bors[bot]sanders41
andauthored
Merge #111
111: Adding the wait_for_pending_update method r=curquiza a=sanders41 Resolves #77 Co-authored-by: Paul Sanders <psanders1@gmail.com>
2 parents fa7667e + e51e18a commit f0cc454

File tree

2 files changed

+186
-0
lines changed

2 files changed

+186
-0
lines changed

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,11 @@ log = "0.4"
1414
serde = { version = "1.0", features = ["derive"] }
1515

1616
[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
17+
futures = "0.3"
1718
isahc = { version = "1.0", features = ["http2", "text-decoding"], default_features = false }
1819

1920
[target.'cfg(target_arch = "wasm32")'.dependencies]
21+
js-sys = "0.3.47"
2022
web-sys = { version = "0.3", features = ["RequestInit", "Headers", "Window", "Response", "console"] }
2123
wasm-bindgen = "0.2"
2224
wasm-bindgen-futures = "0.4"

src/progress.rs

Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
use crate::{errors::Error, indexes::Index, request::*};
44
use serde::Deserialize;
55
use std::collections::{BTreeMap, BTreeSet};
6+
use std::time::Duration;
67

78
#[derive(Deserialize)]
89
#[serde(rename_all = "camelCase")]
@@ -36,6 +37,7 @@ impl<'a> Progress<'a> {
3637
/// let mut movies_index = client.get_or_create("movies").await.unwrap();
3738
/// let progress = movies_index.delete_all_documents().await.unwrap();
3839
/// let status = progress.get_status().await.unwrap();
40+
/// # client.delete_index("movies").await.unwrap();
3941
/// # });
4042
/// ```
4143
pub async fn get_status(&self) -> Result<UpdateStatus, Error> {
@@ -50,6 +52,107 @@ impl<'a> Progress<'a> {
5052
)
5153
.await
5254
}
55+
56+
/// Wait until MeiliSearch processes an update, and get its status.
57+
///
58+
/// `interval` = The frequency at which the server should be polled. Default = 50ms
59+
/// `timeout` = The maximum time to wait for processing to complete. Default = 5000ms
60+
///
61+
/// If the waited time exceeds `timeout` then `None` will be returned.
62+
///
63+
/// # Example
64+
///
65+
/// ```
66+
/// # use meilisearch_sdk::{client::*, document, indexes::*, progress::*};
67+
/// # use serde::{Serialize, Deserialize};
68+
/// #
69+
/// # #[derive(Debug, Serialize, Deserialize, PartialEq)]
70+
/// # struct Document {
71+
/// # id: usize,
72+
/// # value: String,
73+
/// # kind: String,
74+
/// # }
75+
/// #
76+
/// # impl document::Document for Document {
77+
/// # type UIDType = usize;
78+
/// #
79+
/// # fn get_uid(&self) -> &Self::UIDType {
80+
/// # &self.id
81+
/// # }
82+
/// # }
83+
/// #
84+
/// # futures::executor::block_on(async move {
85+
/// let client = Client::new("http://localhost:7700", "masterKey");
86+
/// let movies = client.create_index("movies_wait_for_pending", None).await.unwrap();
87+
///
88+
/// let progress = movies.add_documents(&[
89+
/// Document { id: 0, kind: "title".into(), value: "The Social Network".to_string() },
90+
/// Document { id: 1, kind: "title".into(), value: "Harry Potter and the Sorcerer's Stone".to_string() },
91+
/// ], None).await.unwrap();
92+
///
93+
/// let status = progress.wait_for_pending_update(None, None).await.unwrap();
94+
///
95+
/// # client.delete_index("movies_wait_for_pending").await.unwrap();
96+
/// assert!(matches!(status.unwrap(), UpdateStatus::Processed { .. }));
97+
/// # });
98+
/// ```
99+
pub async fn wait_for_pending_update(
100+
&self,
101+
interval: Option<Duration>,
102+
timeout: Option<Duration>,
103+
) -> Option<Result<UpdateStatus, Error>> {
104+
let interval = interval.unwrap_or_else(|| Duration::from_millis(50));
105+
let timeout = timeout.unwrap_or_else(|| Duration::from_millis(5000));
106+
107+
let mut elapsed_time = Duration::new(0, 0);
108+
let mut status_result: Result<UpdateStatus, Error>;
109+
110+
while timeout > elapsed_time {
111+
status_result = self.get_status().await;
112+
113+
match status_result {
114+
Ok (status) => {
115+
match status {
116+
UpdateStatus::Failed { .. } | UpdateStatus::Processed { .. } => {
117+
return Some(self.get_status().await);
118+
},
119+
UpdateStatus::Enqueued { .. } => {
120+
elapsed_time += interval;
121+
async_sleep(interval).await;
122+
},
123+
}
124+
},
125+
Err (error) => return Some(Err(error)),
126+
};
127+
}
128+
129+
None
130+
}
131+
}
132+
133+
#[cfg(not(target_arch = "wasm32"))]
134+
pub(crate) async fn async_sleep(interval: Duration) {
135+
let (sender, receiver) = futures::channel::oneshot::channel::<()>();
136+
std::thread::spawn(move || {
137+
std::thread::sleep(interval);
138+
let _ = sender.send(());
139+
});
140+
let _ = receiver.await;
141+
}
142+
143+
#[cfg(target_arch = "wasm32")]
144+
pub(crate) async fn async_sleep(interval: Duration) {
145+
use wasm_bindgen_futures::JsFuture;
146+
147+
JsFuture::from(js_sys::Promise::new(&mut |yes, _| {
148+
web_sys::window()
149+
.unwrap()
150+
.set_timeout_with_callback_and_timeout_and_arguments_0(
151+
&yes,
152+
interval.as_millis().try_into().unwrap(),
153+
)
154+
.unwrap();
155+
})).await.unwrap();
53156
}
54157

55158
#[derive(Debug, Clone, Deserialize)]
@@ -133,3 +236,84 @@ pub enum UpdateStatus {
133236
content: ProcessedUpdateResult,
134237
},
135238
}
239+
240+
#[cfg(test)]
241+
mod test {
242+
use crate::{client::*, document, progress::*};
243+
use serde::{Serialize, Deserialize};
244+
use futures_await_test::async_test;
245+
use std::time;
246+
247+
#[derive(Debug, Serialize, Deserialize, PartialEq)]
248+
struct Document {
249+
id: usize,
250+
value: String,
251+
kind: String,
252+
}
253+
254+
impl document::Document for Document {
255+
type UIDType = usize;
256+
257+
fn get_uid(&self) -> &Self::UIDType {
258+
&self.id
259+
}
260+
}
261+
262+
#[async_test]
263+
async fn test_wait_for_pending_updates_with_args() {
264+
let client = Client::new("http://localhost:7700", "masterKey");
265+
let movies = client.create_index("movies_wait_for_pending_args", None).await.unwrap();
266+
let progress = movies.add_documents(&[
267+
Document {
268+
id: 0,
269+
kind: "title".into(),
270+
value: "The Social Network".to_string(),
271+
},
272+
Document {
273+
id: 1,
274+
kind: "title".into(),
275+
value: "Harry Potter and the Sorcerer's Stone".to_string(),
276+
},
277+
], None).await.unwrap();
278+
let status = progress.wait_for_pending_update(
279+
Some(Duration::from_millis(1)), Some(Duration::from_millis(6000))
280+
).await.unwrap();
281+
282+
client.delete_index("movies_wait_for_pending_args").await.unwrap();
283+
assert!(matches!(status.unwrap(), UpdateStatus::Processed { .. }));
284+
}
285+
286+
#[async_test]
287+
async fn test_wait_for_pending_updates_time_out() {
288+
let client = Client::new("http://localhost:7700", "masterKey");
289+
let movies = client.create_index("movies_wait_for_pending_timeout", None).await.unwrap();
290+
let progress = movies.add_documents(&[
291+
Document {
292+
id: 0,
293+
kind: "title".into(),
294+
value: "The Social Network".to_string(),
295+
},
296+
Document {
297+
id: 1,
298+
kind: "title".into(),
299+
value: "Harry Potter and the Sorcerer's Stone".to_string(),
300+
},
301+
], None).await.unwrap();
302+
let status = progress.wait_for_pending_update(
303+
Some(Duration::from_millis(1)), Some(Duration::from_nanos(1))
304+
).await;
305+
306+
client.delete_index("movies_wait_for_pending_timeout").await.unwrap();
307+
assert_eq!(status.is_none(), true);
308+
}
309+
310+
#[async_test]
311+
async fn test_async_sleep() {
312+
let sleep_duration = time::Duration::from_millis(10);
313+
let now = time::Instant::now();
314+
315+
async_sleep(sleep_duration).await;
316+
317+
assert!(now.elapsed() >= sleep_duration);
318+
}
319+
}

0 commit comments

Comments
 (0)