Skip to content

Commit

Permalink
fix: fix some bugs (#145)
Browse files Browse the repository at this point in the history
* chore: update

* chore: pub error code

* chore: update

* chore: update

* chore: update

* chore: update

* chore: update

* chore: update max frame size

* chore: update max frame size

* chore: ws buffer size

* chore: update
  • Loading branch information
appflowy authored Nov 3, 2023
1 parent 3745d93 commit 517275a
Show file tree
Hide file tree
Showing 13 changed files with 71 additions and 33 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ scraper = "0.17.1"
client-api = { path = "libs/client-api", features = ["collab-sync"] }
opener = "0.6.1"
image = "0.23.14"
tokio-tungstenite = { version = "0.20.1", features = ["native-tls"] }

[[bin]]
name = "appflowy_cloud"
Expand Down
10 changes: 5 additions & 5 deletions libs/app_error/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,16 +218,16 @@ impl ErrorCode {
}

#[derive(Serialize)]
struct AFErrorSerde {
struct AppErrorSerde {
code: ErrorCode,
msg: String,
message: String,
}

impl From<&AppError> for AFErrorSerde {
impl From<&AppError> for AppErrorSerde {
fn from(value: &AppError) -> Self {
Self {
code: value.code(),
msg: value.to_string(),
message: value.to_string(),
}
}
}
Expand All @@ -239,6 +239,6 @@ impl actix_web::error::ResponseError for AppError {
}

fn error_response(&self) -> actix_web::HttpResponse {
actix_web::HttpResponse::Ok().json(AFErrorSerde::from(self))
actix_web::HttpResponse::Ok().json(AppErrorSerde::from(self))
}
}
8 changes: 7 additions & 1 deletion libs/client-api/src/collab_sync/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use realtime_entity::collab_msg::{CollabSinkMessage, MsgId};
use tokio::spawn;
use tokio::sync::{mpsc, oneshot, watch, Mutex};
use tokio::time::{interval, Instant, Interval};
use tracing::{debug, error, trace, warn};
use tracing::{debug, error, event, trace, warn};

#[derive(Clone, Debug)]
pub enum SinkState {
Expand Down Expand Up @@ -251,6 +251,12 @@ where
while let Some(pending_msg) = pending_msg_queue.pop() {
if !sending_msg.merge(pending_msg, &self.config.maximum_payload_size) {
break;
} else {
event!(
tracing::Level::TRACE,
"Did merge message: {}",
sending_msg.get_msg()
);
}
}
}
Expand Down
29 changes: 15 additions & 14 deletions libs/client-api/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,9 @@ impl Client {
}

#[instrument(level = "debug", skip_all, err)]
pub fn restore_token(&self, token: &str) -> Result<(), AppError> {
pub fn restore_token(&self, token: &str) -> Result<(), AppResponseError> {
if token.is_empty() {
return Err(AppError::OAuthError("Empty token".to_string()));
return Err(AppError::OAuthError("Empty token".to_string()).into());
}
let token = serde_json::from_str::<AccessTokenResponse>(token)?;
self.token.write().set(token);
Expand All @@ -95,12 +95,12 @@ impl Client {
/// string representation of the access token. If the lock cannot be acquired or
/// the token is not present, an error is returned.
#[instrument(level = "debug", skip_all, err)]
pub fn get_token(&self) -> Result<String, AppError> {
pub fn get_token(&self) -> Result<String, AppResponseError> {
let token_str = self
.token
.read()
.try_get()
.map_err(|err| AppError::OAuthError(err.to_string()))?;
.map_err(|err| AppResponseError::from(AppError::OAuthError(err.to_string())))?;
Ok(token_str)
}

Expand Down Expand Up @@ -174,7 +174,7 @@ impl Client {
///
/// # Returns
/// - `Ok(String)`: A `String` containing the constructed authorization URL if the specified provider is available.
/// - `Err(AppError)`: An `AppError` indicating either the OAuth provider is invalid or other issues occurred while fetching settings.
/// - `Err(AppResponseError)`: An `AppResponseError` indicating either the OAuth provider is invalid or other issues occurred while fetching settings.
///
#[instrument(level = "debug", skip_all, err)]
pub async fn generate_oauth_url_with_provider(
Expand Down Expand Up @@ -340,13 +340,15 @@ impl Client {
/// - `Err(AppError)`: An `AppError` indicating either an inability to read the token or that the user is not logged in.
///
#[inline]
pub fn token_expires_at(&self) -> Result<i64, AppError> {
pub fn token_expires_at(&self) -> Result<i64, AppResponseError> {
match &self.token.try_read() {
None => Err(AppError::Unhandled("Failed to read token".to_string())),
None => Err(AppError::Unhandled("Failed to read token".to_string()).into()),
Some(token) => Ok(
token
.as_ref()
.ok_or(AppError::NotLoggedIn("fail to get expires_at".to_string()))?
.ok_or(AppResponseError::from(AppError::NotLoggedIn(
"fail to get expires_at".to_string(),
)))?
.expires_at,
),
}
Expand All @@ -358,17 +360,17 @@ impl Client {
///
/// # Returns
/// - `Ok(String)`: A `String` containing the access token.
/// - `Err(AppError)`: An `AppError` indicating either an inability to read the token or that the user is not logged in.
/// - `Err(AppResponseError)`: An `AppResponseError` indicating either an inability to read the token or that the user is not logged in.
///
pub fn access_token(&self) -> Result<String, AppError> {
pub fn access_token(&self) -> Result<String, AppResponseError> {
match &self.token.try_read_for(Duration::from_secs(2)) {
None => Err(AppError::Unhandled("Failed to read token".to_string())),
None => Err(AppError::Unhandled("Failed to read token".to_string()).into()),
Some(token) => Ok(
token
.as_ref()
.ok_or(AppError::NotLoggedIn(
.ok_or(AppResponseError::from(AppError::NotLoggedIn(
"fail to get access token. Token is empty".to_string(),
))?
)))?
.access_token
.clone(),
),
Expand Down Expand Up @@ -562,7 +564,6 @@ impl Client {
},
Err(err) => {
event!(tracing::Level::ERROR, "refresh token failed: {}", err);
self.token.write().unset();
Err(AppResponseError::from(err))
},
}
Expand Down
1 change: 1 addition & 0 deletions libs/client-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ pub use http::*;

pub mod error {
pub use shared_entity::response::AppResponseError;
pub use shared_entity::response::ErrorCode;
}

// Export all dto entities that will be used in the frontend application
Expand Down
11 changes: 10 additions & 1 deletion libs/client-api/src/ws/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,15 @@ impl WSClient {
}
}

pub fn send<M: Into<Message>>(&self, msg: M) -> Result<(), WSError> {
self.sender.send(msg.into()).unwrap();
Ok(())
}

pub fn sender(&self) -> Sender<Message> {
self.sender.clone()
}

async fn set_state(&self, state: ConnectState) {
self.state_notify.lock().set_state(state);
}
Expand All @@ -246,7 +255,7 @@ struct RetryCondition {
impl Condition<WSError> for RetryCondition {
fn should_retry(&mut self, error: &WSError) -> bool {
if let WSError::AuthError(err) = error {
debug!("WSClient auth error: {}, stop retry connn", err);
debug!("WSClient auth error: {}, stop retry connect", err);
return false;
}

Expand Down
15 changes: 13 additions & 2 deletions libs/client-api/src/ws/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ use std::pin::Pin;
use crate::ws::WSError;
use tokio::net::TcpStream;
use tokio_retry::Action;
use tokio_tungstenite::{connect_async, MaybeTlsStream, WebSocketStream};
use tokio_tungstenite::tungstenite::protocol::WebSocketConfig;
use tokio_tungstenite::{connect_async_with_config, MaybeTlsStream, WebSocketStream};
use tracing::{error, info};

pub(crate) struct ConnectAction {
Expand All @@ -26,7 +27,17 @@ impl Action for ConnectAction {
let cloned_addr = self.addr.clone();
Box::pin(async move {
info!("🔵websocket start connecting: {}", cloned_addr);
match connect_async(&cloned_addr).await {
match connect_async_with_config(
&cloned_addr,
Some(WebSocketConfig {
max_message_size: Some(65_536), // 64KB
max_frame_size: Some(65_536), // 64KB
..WebSocketConfig::default()
}),
false,
)
.await
{
Ok((stream, _response)) => {
info!("🟢websocket connect success");
Ok(stream)
Expand Down
3 changes: 2 additions & 1 deletion libs/shared-entity/src/response.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use serde::{Deserialize, Serialize};
use std::borrow::Cow;

use app_error::{AppError, ErrorCode};
use app_error::AppError;
pub use app_error::ErrorCode;
use std::fmt::{Debug, Display};

#[cfg(feature = "cloud")]
Expand Down
2 changes: 1 addition & 1 deletion src/api/ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ pub async fn establish_ws_connection(
);

match ws::WsResponseBuilder::new(client, &request, payload)
.frame_size(MAX_FRAME_SIZE)
.frame_size(MAX_FRAME_SIZE * 10)
.start()
{
Ok(response) => Ok(response),
Expand Down
2 changes: 1 addition & 1 deletion src/biz/collab/access_control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ where
oid: &str,
user_uuid: &Uuid,
method: Method,
_path: Path<Url>,
_path: &Path<Url>,
) -> Result<(), AppError> {
let can_access = self
.0
Expand Down
18 changes: 13 additions & 5 deletions src/middleware/access_control_mw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ pub trait HttpAccessControlService: Send + Sync {
oid: &str,
user_uuid: &Uuid,
method: Method,
path: Path<Url>,
path: &Path<Url>,
) -> Result<(), AppError> {
Ok(())
}
Expand Down Expand Up @@ -82,7 +82,7 @@ where
oid: &str,
user_uuid: &Uuid,
method: Method,
path: Path<Url>,
path: &Path<Url>,
) -> Result<(), AppError> {
self
.as_ref()
Expand Down Expand Up @@ -211,7 +211,11 @@ where
.check_workspace_permission(&workspace_id, &user_uuid, method.clone())
.await
{
error!("workspace access control: {:?}", err);
error!(
"workspace access control: {}, with path:{}",
err,
path.as_str()
);
return Err(Error::from(err));
}
};
Expand All @@ -221,10 +225,14 @@ where
if let Some(collab_object_id) = collab_object_id {
if let Some(acs) = services.get(&AccessResource::Collab) {
if let Err(err) = acs
.check_collab_permission(&collab_object_id, &user_uuid, method, path)
.check_collab_permission(&collab_object_id, &user_uuid, method, &path)
.await
{
error!("collab access control: {:?}", err);
error!(
"collab access control: {:?}, with path:{}",
err,
path.as_str()
);
return Err(Error::from(err));
}
};
Expand Down
3 changes: 1 addition & 2 deletions tests/websocket/connect.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use client_api::ws::{ConnectState, WSClient, WSClientConfig};

use crate::user::utils::generate_unique_registered_user_client;
use client_api::ws::{ConnectState, WSClient, WSClientConfig};

#[tokio::test]
async fn realtime_connect_test() {
Expand Down

0 comments on commit 517275a

Please sign in to comment.