From f0bf576adad8ac5ccacc542c04b79bfc78c02ea7 Mon Sep 17 00:00:00 2001 From: nathan Date: Sun, 16 Jun 2024 15:53:50 +0800 Subject: [PATCH] chore: remove workspace test --- .github/workflows/client_api_check.yml | 4 - libs/client-api/src/lib.rs | 2 +- libs/client-api/src/{ => native}/http_blob.rs | 2 +- libs/client-api/src/native/mod.rs | 2 + libs/client-api/src/wasm/http_wasm.rs | 156 ------------------ libs/client-api/src/wasm/mod.rs | 7 - libs/client-api/src/wasm/ping.rs | 49 ------ libs/client-api/src/wasm/retry.rs | 24 --- libs/database/src/file/file_storage.rs | 3 +- libs/database/src/file/s3_client_impl.rs | 25 ++- tests/file_test/delete_dir_test.rs | 99 +++++++++++ tests/file_test/mod.rs | 1 + tests/file_test/multiple_part_test.rs | 2 +- xtask/src/main.rs | 22 +-- 14 files changed, 141 insertions(+), 257 deletions(-) rename libs/client-api/src/{ => native}/http_blob.rs (99%) delete mode 100644 libs/client-api/src/wasm/http_wasm.rs delete mode 100644 libs/client-api/src/wasm/mod.rs delete mode 100644 libs/client-api/src/wasm/ping.rs delete mode 100644 libs/client-api/src/wasm/retry.rs create mode 100644 tests/file_test/delete_dir_test.rs diff --git a/.github/workflows/client_api_check.yml b/.github/workflows/client_api_check.yml index 0e82244a8..8ba9f4fca 100644 --- a/.github/workflows/client_api_check.yml +++ b/.github/workflows/client_api_check.yml @@ -29,10 +29,6 @@ jobs: working-directory: ./libs/client-api run: cargo build --features "enable_brotli" - - name: Build ClientAPI WASM - working-directory: ./libs/client-api - run: wasm-pack build - - name: Check ClientAPI Dependencies working-directory: ./libs/client-api run: bash ../../script/client_api_deps_check.sh diff --git a/libs/client-api/src/lib.rs b/libs/client-api/src/lib.rs index 64fbbce6b..d20dfc7f5 100644 --- a/libs/client-api/src/lib.rs +++ b/libs/client-api/src/lib.rs @@ -1,7 +1,7 @@ mod http; mod http_ai; mod http_billing; -pub mod http_blob; + mod http_collab; mod http_history; mod http_member; diff --git a/libs/client-api/src/http_blob.rs b/libs/client-api/src/native/http_blob.rs similarity index 99% rename from libs/client-api/src/http_blob.rs rename to libs/client-api/src/native/http_blob.rs index c0ad3bd41..831d46e47 100644 --- a/libs/client-api/src/http_blob.rs +++ b/libs/client-api/src/native/http_blob.rs @@ -403,7 +403,7 @@ impl FileDir for FileDirImpl<'_> { #[cfg(test)] mod tests { - use crate::http_blob::ChunkedBytes; + use crate::ChunkedBytes; use bytes::Bytes; use std::env::temp_dir; use tokio::io::AsyncWriteExt; diff --git a/libs/client-api/src/native/mod.rs b/libs/client-api/src/native/mod.rs index a12859fa1..38337cc46 100644 --- a/libs/client-api/src/native/mod.rs +++ b/libs/client-api/src/native/mod.rs @@ -1,7 +1,9 @@ +mod http_blob; mod http_native; mod ping; mod retry; +pub use http_blob::*; #[allow(unused_imports)] pub use http_native::*; pub(crate) use ping::*; diff --git a/libs/client-api/src/wasm/http_wasm.rs b/libs/client-api/src/wasm/http_wasm.rs deleted file mode 100644 index c0719db2e..000000000 --- a/libs/client-api/src/wasm/http_wasm.rs +++ /dev/null @@ -1,156 +0,0 @@ -use crate::http::log_request_id; -use crate::ws::{ConnectInfo, WSClientConnectURLProvider, WSClientHttpSender, WSError}; -use crate::Client; -use app_error::gotrue::GoTrueError; -use app_error::ErrorCode; -use async_trait::async_trait; -use database_entity::dto::{CollabParams, QueryCollabParams}; -use gotrue::grant::{Grant, RefreshTokenGrant}; -use reqwest::Method; -use shared_entity::dto::workspace_dto::{CollabResponse, CollabTypeParam}; -use shared_entity::response::{AppResponse, AppResponseError}; -use std::future::Future; -use std::sync::atomic::Ordering; -use tracing::{info, instrument}; - -impl Client { - pub async fn create_collab_list( - &self, - workspace_id: &str, - _params_list: Vec, - ) -> Result<(), AppResponseError> { - let _url = self.batch_create_collab_url(workspace_id); - Err(AppResponseError::new( - ErrorCode::Unhandled, - "not implemented", - )) - } - - #[instrument(level = "debug", skip_all)] - pub async fn get_collab( - &self, - params: QueryCollabParams, - ) -> Result { - let url = format!( - "{}/api/workspace/v1/{}/collab/{}", - self.base_url, ¶ms.workspace_id, ¶ms.object_id - ); - let collab_type = params.collab_type.clone(); - let resp = self - .http_client_with_auth(Method::GET, &url) - .await? - .query(&CollabTypeParam { collab_type }) - .send() - .await?; - log_request_id(&resp); - let resp = AppResponse::::from_response(resp).await?; - resp.into_data() - } - - #[instrument(level = "debug", skip_all, err)] - pub async fn refresh_token(&self, reason: &str) -> Result<(), AppResponseError> { - let (tx, rx) = tokio::sync::oneshot::channel(); - self.refresh_ret_txs.write().push(tx); - - if !self.is_refreshing_token.load(Ordering::SeqCst) { - self.is_refreshing_token.store(true, Ordering::SeqCst); - - info!("refresh token reason:{}", reason); - let txs = std::mem::take(&mut *self.refresh_ret_txs.write()); - let result = self.inner_refresh_token().await; - for tx in txs { - let _ = tx.send(result.clone()); - } - self.is_refreshing_token.store(false, Ordering::SeqCst); - } - - rx.await - .map_err(|err| AppResponseError::new(ErrorCode::Internal, err.to_string()))??; - Ok(()) - } - - async fn inner_refresh_token(&self) -> Result<(), AppResponseError> { - // let policy = RetryPolicy::fixed(Duration::from_secs(2)).with_max_retries(4).with_jitter(false); - // let refresh_token = self - // .token - // .read() - // .as_ref() - // .ok_or(GoTrueError::NotLoggedIn( - // "fail to refresh user token".to_owned(), - // ))? - // .refresh_token - // .as_str() - // .to_owned(); - // match policy.retry_if(move || { - // let grant = Grant::RefreshToken(RefreshTokenGrant { refresh_token: refresh_token.clone() }); - // async move { - // self - // .gotrue_client - // .token(&grant).await - // } - // - // }, RefreshTokenRetryCondition).await { - // Ok(new_token) => { - // event!(tracing::Level::INFO, "refresh token success"); - // self.token.write().set(new_token); - // Ok(()) - // }, - // Err(err) => { - // let err = AppError::from(err); - // event!(tracing::Level::ERROR, "refresh token failed: {}", err); - // - // // If the error is an OAuth error, unset the token. - // if err.is_unauthorized() { - // self.token.write().unset(); - // } - // Err(err.into()) - // }, - // } - let refresh_token = self - .token - .read() - .as_ref() - .ok_or(GoTrueError::NotLoggedIn( - "fail to refresh user token".to_owned(), - ))? - .refresh_token - .as_str() - .to_owned(); - let new_token = self - .gotrue_client - .token(&Grant::RefreshToken(RefreshTokenGrant { refresh_token })) - .await?; - self.token.write().set(new_token); - Ok(()) - } -} - -pub fn af_spawn(future: T) -> tokio::task::JoinHandle -where - T: Future + 'static, - T::Output: Send + 'static, -{ - tokio::task::spawn_local(future) -} - -#[async_trait] -impl WSClientHttpSender for Client { - async fn send_ws_msg( - &self, - _device_id: &str, - _message: client_websocket::Message, - ) -> Result<(), WSError> { - Err(WSError::Internal(anyhow::Error::msg("not supported"))) - } -} - -#[async_trait] -impl WSClientConnectURLProvider for Client { - fn connect_ws_url(&self) -> String { - self.ws_addr.clone() - } - - async fn connect_info(&self) -> Result { - Err(WSError::Internal(anyhow::Error::msg("not supported"))) - } -} diff --git a/libs/client-api/src/wasm/mod.rs b/libs/client-api/src/wasm/mod.rs deleted file mode 100644 index b3550a7e3..000000000 --- a/libs/client-api/src/wasm/mod.rs +++ /dev/null @@ -1,7 +0,0 @@ -mod http_wasm; -mod ping; -mod retry; - -pub use http_wasm::*; -pub(crate) use ping::*; -pub(crate) use retry::*; diff --git a/libs/client-api/src/wasm/ping.rs b/libs/client-api/src/wasm/ping.rs deleted file mode 100644 index e9ce4f779..000000000 --- a/libs/client-api/src/wasm/ping.rs +++ /dev/null @@ -1,49 +0,0 @@ -use crate::ws::ConnectStateNotify; -use client_websocket::Message; -use std::sync::Arc; -use std::time::Duration; -use tokio::sync::broadcast::Sender; -use tokio::sync::mpsc::Receiver; -use tokio::sync::Mutex; -#[allow(dead_code)] -pub(crate) struct ServerFixIntervalPing { - duration: Duration, - ping_sender: Option>, - pong_recv: Option>, - #[allow(dead_code)] - stop_tx: tokio::sync::mpsc::Sender<()>, - stop_rx: Option>, - state: Arc>, - ping_count: Arc>, - maximum_ping_count: u32, -} - -impl ServerFixIntervalPing { - pub(crate) fn new( - duration: Duration, - state: Arc>, - ping_sender: Sender, - pong_recv: Receiver<()>, - maximum_ping_count: u32, - ) -> Self { - let (tx, rx) = tokio::sync::mpsc::channel(1000); - Self { - duration, - stop_tx: tx, - stop_rx: Some(rx), - state, - ping_sender: Some(ping_sender), - pong_recv: Some(pong_recv), - ping_count: Arc::new(Mutex::new(0)), - maximum_ping_count, - } - } - - pub(crate) async fn stop(&self) { - let _ = self.stop_tx.send(()).await; - } - - pub(crate) fn run(&mut self) { - // TODO(nathan): implement the ping for wasm - } -} diff --git a/libs/client-api/src/wasm/retry.rs b/libs/client-api/src/wasm/retry.rs deleted file mode 100644 index 5dc35f6be..000000000 --- a/libs/client-api/src/wasm/retry.rs +++ /dev/null @@ -1,24 +0,0 @@ -use crate::ws::{StateNotify, WSClientConnectURLProvider, WSError}; -use again::Condition; -use app_error::gotrue::GoTrueError; -use client_websocket::{connect_async, WebSocketStream}; -use reqwest::header::HeaderMap; -use std::sync::{Arc, Weak}; - -pub(crate) struct RefreshTokenRetryCondition; - -impl Condition for RefreshTokenRetryCondition { - fn is_retryable(&mut self, error: &GoTrueError) -> bool { - error.is_network_error() - } -} -pub async fn retry_connect( - connect_provider: Arc, - _state_notify: Weak, -) -> Result { - let url = connect_provider.connect_ws_url(); - let connect_info = connect_provider.connect_info().await?; - let headers: HeaderMap = connect_info.into(); - let stream = connect_async(url, headers).await?; - Ok(stream) -} diff --git a/libs/database/src/file/file_storage.rs b/libs/database/src/file/file_storage.rs index 90d8c251a..c90ac372b 100644 --- a/libs/database/src/file/file_storage.rs +++ b/libs/database/src/file/file_storage.rs @@ -10,7 +10,7 @@ use database_entity::file_dto::{ CompleteUploadRequest, CreateUploadRequest, CreateUploadResponse, FileDir, UploadPartRequest, UploadPartResponse, }; -use tracing::{instrument, warn}; +use tracing::{info, instrument, warn}; use uuid::Uuid; pub trait ResponseBlob { @@ -58,6 +58,7 @@ where } pub async fn remove_dir(&self, dir: &str) -> Result<(), AppError> { + info!("removing dir: {}", dir); self.client.remove_dir(dir).await?; Ok(()) } diff --git a/libs/database/src/file/s3_client_impl.rs b/libs/database/src/file/s3_client_impl.rs index 03c069635..f87b542c1 100644 --- a/libs/database/src/file/s3_client_impl.rs +++ b/libs/database/src/file/s3_client_impl.rs @@ -279,6 +279,12 @@ impl BucketClient for AwsS3BucketClientImpl { }) .collect(); + trace!( + "objects_to_delete: {:?} at directory: {}", + objects_to_delete.len(), + dir + ); + // Step 2: Delete the listed objects in batches of 1000 while !objects_to_delete.is_empty() { let batch = if objects_to_delete.len() > 1000 { @@ -287,6 +293,15 @@ impl BucketClient for AwsS3BucketClientImpl { Vec::new() }; + trace!( + "Deleting {} objects: {:?}", + dir, + objects_to_delete + .iter() + .map(|object| &object.key) + .collect::>() + ); + let delete = Delete::builder() .set_objects(Some(objects_to_delete)) .build() @@ -314,9 +329,15 @@ impl BucketClient for AwsS3BucketClientImpl { objects_to_delete = batch; } - if list_objects.is_truncated.is_none() { - break; + match list_objects.is_truncated { + None => break, + Some(is_truncated) => { + if !is_truncated { + break; + } + }, } + continuation_token = list_objects.next_continuation_token; } diff --git a/tests/file_test/delete_dir_test.rs b/tests/file_test/delete_dir_test.rs new file mode 100644 index 000000000..c82cc3e11 --- /dev/null +++ b/tests/file_test/delete_dir_test.rs @@ -0,0 +1,99 @@ +use crate::collab::util::generate_random_string; +use app_error::ErrorCode; +use bytes::Bytes; +use client_api::ChunkedBytes; +use client_api_test::{generate_unique_registered_user_client, workspace_id_from_client}; +use database_entity::file_dto::{ + CompleteUploadRequest, CompletedPartRequest, CreateUploadRequest, UploadPartRequest, +}; +use uuid::Uuid; + +#[tokio::test] +async fn delete_dir_test() { + let (c1, _user1) = generate_unique_registered_user_client().await; + let workspace_id = workspace_id_from_client(&c1).await; + let mime = mime::TEXT_PLAIN_UTF_8; + let data = "hello world"; + let file_id = uuid::Uuid::new_v4().to_string(); + let url = c1.get_blob_url(&workspace_id, &file_id); + c1.put_blob(&url, data, &mime).await.unwrap(); + c1.delete_workspace(&workspace_id).await.unwrap(); + + let error = c1.get_blob(&url).await.unwrap_err(); + assert_eq!(error.code, ErrorCode::RecordNotFound); +} + +#[tokio::test] +async fn delete_dir_test2() { + let (c1, _user1) = generate_unique_registered_user_client().await; + let workspace_id = workspace_id_from_client(&c1).await; + let dir = workspace_id.clone(); + let mime = mime::TEXT_PLAIN_UTF_8; + let mut file_ids = vec![]; + + for i in 1..5 { + let text = generate_random_string(i * 2 * 1024 * 1024); + let file_id = Uuid::new_v4().to_string(); + file_ids.push(file_id.clone()); + let upload = c1 + .create_upload( + &workspace_id, + CreateUploadRequest { + file_id: file_id.clone(), + directory: dir.clone(), + content_type: mime.to_string(), + }, + ) + .await + .unwrap(); + + let chunked_bytes = ChunkedBytes::from_bytes(Bytes::from(text.clone())).unwrap(); + let mut completed_parts = Vec::new(); + let iter = chunked_bytes.iter().enumerate(); + for (index, next) in iter { + let resp = c1 + .upload_part( + &workspace_id, + UploadPartRequest { + file_id: file_id.clone(), + directory: dir.clone(), + upload_id: upload.upload_id.clone(), + part_number: index as i32 + 1, + body: next.to_vec(), + }, + ) + .await + .unwrap(); + + completed_parts.push(CompletedPartRequest { + e_tag: resp.e_tag, + part_number: resp.part_num, + }); + } + + let req = CompleteUploadRequest { + file_id: file_id.clone(), + directory: dir.clone(), + upload_id: upload.upload_id, + parts: completed_parts, + }; + c1.complete_upload(&workspace_id, req).await.unwrap(); + + let blob = c1 + .get_blob_v1(&workspace_id, &dir, &file_id) + .await + .unwrap() + .1; + let blob_text = String::from_utf8(blob.to_vec()).unwrap(); + assert_eq!(blob_text, text); + } + c1.delete_workspace(&workspace_id).await.unwrap(); + + for file_id in file_ids { + let error = c1 + .get_blob_v1(&workspace_id, &dir, &file_id) + .await + .unwrap_err(); + assert_eq!(error.code, ErrorCode::RecordNotFound); + } +} diff --git a/tests/file_test/mod.rs b/tests/file_test/mod.rs index 8a98cf15f..5e18da757 100644 --- a/tests/file_test/mod.rs +++ b/tests/file_test/mod.rs @@ -1,6 +1,7 @@ use std::borrow::Cow; use std::ops::Deref; +mod delete_dir_test; mod multiple_part_test; mod put_and_get; mod usage; diff --git a/tests/file_test/multiple_part_test.rs b/tests/file_test/multiple_part_test.rs index 548ef0aa6..a8694b328 100644 --- a/tests/file_test/multiple_part_test.rs +++ b/tests/file_test/multiple_part_test.rs @@ -3,7 +3,7 @@ use crate::collab::util::{generate_random_bytes, generate_random_string}; use app_error::ErrorCode; use aws_sdk_s3::types::CompletedPart; use bytes::Bytes; -use client_api::http_blob::ChunkedBytes; +use client_api::ChunkedBytes; use client_api_test::{generate_unique_registered_user_client, workspace_id_from_client}; use database::file::{BucketClient, ResponseBlob}; use database_entity::file_dto::{ diff --git a/xtask/src/main.rs b/xtask/src/main.rs index 5d5f468d2..10961e797 100644 --- a/xtask/src/main.rs +++ b/xtask/src/main.rs @@ -33,14 +33,14 @@ async fn main() -> Result<()> { .spawn() .context("Failed to start AppFlowy-History process")?; - // let mut appflowy_indexer_cmd = Command::new("cargo") - // .args([ - // "run", - // "--manifest-path", - // "./services/appflowy-indexer/Cargo.toml", - // ]) - // .spawn() - // .context("Failed to start AppFlowy-Indexer process")?; + let mut appflowy_indexer_cmd = Command::new("cargo") + .args([ + "run", + "--manifest-path", + "./services/appflowy-indexer/Cargo.toml", + ]) + .spawn() + .context("Failed to start AppFlowy-Indexer process")?; select! { status = appflowy_cloud_cmd.wait() => { @@ -49,9 +49,9 @@ async fn main() -> Result<()> { status = appflowy_history_cmd.wait() => { handle_process_exit(status?, appflowy_history_bin_name)?; }, - // status = appflowy_indexer_cmd.wait() => { - // handle_process_exit(status?, appflowy_indexer_bin_name)?; - // }, + status = appflowy_indexer_cmd.wait() => { + handle_process_exit(status?, appflowy_indexer_bin_name)?; + }, } Ok(())