Skip to content

Commit

Permalink
feat: stop recv messages when subscribe drop (#159)
Browse files Browse the repository at this point in the history
* feat: stop recv messages when subscribe drop

* chore: update collab rev

* chore: fmt
  • Loading branch information
appflowy authored Nov 11, 2023
1 parent 487bc25 commit c7e90eb
Show file tree
Hide file tree
Showing 20 changed files with 68 additions and 79 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 6 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ actix-session = { version = "0.8", features = ["redis-rs-tls-session"] }
openssl = "0.10.45"

# serde
serde_json = "1.0.108"
serde = { version = "1.0", features = ["derive"] }
serde_json.workspace = true
serde.workspace = true
serde-aux = "4.1.2"

tokio = { version = "1.26.0", features = [
Expand Down Expand Up @@ -124,6 +124,8 @@ members = [
[workspace.dependencies]
realtime-entity = { path = "libs/realtime-entity" }
app-error = { path = "libs/app_error" }
serde_json = "1.0.108"
serde = { version = "1.0.108", features = ["derive"] }

[profile.release]
lto = true
Expand All @@ -140,8 +142,8 @@ lto = false
opt-level = 3

[patch.crates-io]
collab = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "508dd160a" }
collab-entity = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "508dd160a" }
collab = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "2644fba6" }
collab-entity = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "2644fba6" }

# Comment the above and uncomment the below to use local version of collab by cloning the repo and placing it in libs folder
#collab = { path = "libs/AppFlowy-Collab/collab" }
Expand Down
4 changes: 2 additions & 2 deletions admin_frontend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ axum = {version = "0.6.20", features = ["json"]}
tokio = { version = "1.32.0", features = ["rt-multi-thread", "macros"] }
askama = "0.12.1"
axum-extra = { version = "0.8.0", features = ["cookie"] }
serde = { version = "1.0.188", features = ["derive"] }
serde_json = "1.0.108"
serde.workspace = true
serde_json.workspace = true
redis = { version = "0.23.3", features = [ "aio", "tokio-comp", "connection-manager"] }
uuid = { version = "1.4.1", features = ["v4"] }
dotenv = "0.15.0"
Expand Down
4 changes: 2 additions & 2 deletions libs/app_error/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ edition = "2021"
[dependencies]
thiserror = "1.0.47"
serde_repr = "0.1.16"
serde = { version = "1.0"}
serde.workspace = true
anyhow = "1.0.75"
uuid = { version = "1.4.1", features = ["v4"] }
sqlx = { version = "0.7", default-features = false, features = ["postgres", "json"], optional = true }
Expand All @@ -17,7 +17,7 @@ rust-s3 = { version = "0.33.0", optional = true }
url = { version = "2.4.1"}
actix-web = { version = "4.3.1", optional = true }
reqwest = { version = "0.11" }
serde_json = "1.0.108"
serde_json.workspace = true

[features]
default = []
Expand Down
4 changes: 2 additions & 2 deletions libs/client-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ edition = "2021"
[dependencies]
reqwest = { version = "0.11.20", default-features = false, features = ["json","multipart"] }
anyhow = "1.0.75"
serde_json = "1.0.108"
serde_json.workspace = true
serde_repr = "0.1.16"
gotrue = { path = "../gotrue" }
gotrue-entity = { path = "../gotrue-entity" }
Expand All @@ -23,7 +23,7 @@ app-error = { workspace = true }
# ws
tracing = { version = "0.1" }
thiserror = "1.0.39"
serde = { version = "1.0", features = ["derive"] }
serde.workspace = true
tokio-tungstenite = { version = "0.20.1", features = ["native-tls"] }
tokio = { version = "1.26", features = ["full"] }
futures-util = "0.3.26"
Expand Down
66 changes: 32 additions & 34 deletions libs/client-api/src/collab_sync/sync.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
use bytes::Bytes;
use std::marker::PhantomData;
use std::ops::Deref;
use std::sync::{Arc, Weak};

use crate::collab_sync::{
CollabSink, CollabSinkRunner, DefaultMsgIdCounter, SinkConfig, SinkState, SyncError, SyncObject,
};
use bytes::Bytes;
use collab::core::collab::MutexCollab;
use collab::core::collab_state::SyncState;
use collab::core::origin::CollabOrigin;
Expand All @@ -15,11 +11,13 @@ use collab::sync_protocol::{handle_msg, ClientSyncProtocol, CollabSyncProtocol};
use futures_util::{SinkExt, StreamExt};
use lib0::decoding::Cursor;
use realtime_entity::collab_msg::{ClientCollabInit, CollabMessage, ServerCollabInit, UpdateSync};
use std::marker::PhantomData;
use std::ops::Deref;
use std::sync::{Arc, Weak};
use tokio::spawn;
use tokio::sync::watch;

use tokio_stream::wrappers::WatchStream;
use tracing::{error, trace, warn};
use tracing::{error, trace, warn, Level};
use yrs::updates::decoder::DecoderV1;
use yrs::updates::encoder::{Encoder, EncoderV1};

Expand Down Expand Up @@ -238,33 +236,30 @@ where
) where
P: CollabSyncProtocol + Send + Sync + 'static,
{
loop {
while let Some(collab_message) = stream.next().await {
match collab_message {
Ok(msg) => match (weak_collab.upgrade(), weak_sink.upgrade()) {
(Some(awareness), Some(sink)) => {
if let Err(error) = SyncStream::<Sink, Stream>::process_message::<P>(
&origin, &object_id, &protocol, &awareness, &sink, msg,
)
.await
{
error!(
"Stop receive incoming changes. Failed to process message: {}",
error
);
break;
}
},
_ => {
warn!("Stop receive doc incoming changes.");
break;
},
while let Some(collab_message) = stream.next().await {
match collab_message {
Ok(msg) => match (weak_collab.upgrade(), weak_sink.upgrade()) {
(Some(collab), Some(sink)) => {
let span = tracing::span!(Level::TRACE, "doc_stream", object_id = %msg.object_id());
let _enter = span.enter();
if let Err(error) = SyncStream::<Sink, Stream>::process_message::<P>(
&origin, &object_id, &protocol, &collab, &sink, msg,
)
.await
{
error!("Error while processing message: {}", error);
}
},
Err(e) => {
warn!("Stream error: {},stop receive incoming changes", e.into());
_ => {
// The collab or sink is dropped, stop the stream.
warn!("Stop receive doc incoming changes.");
break;
},
}
},
Err(e) => {
warn!("Stream error: {},stop receive incoming changes", e.into());
break;
},
}
}
}
Expand All @@ -290,7 +285,7 @@ where
if should_process {
if let Some(payload) = msg.payload() {
if !payload.is_empty() {
trace!("start process message: {:?}", msg.msg_id());
trace!("start process message:{:?}", msg.msg_id());
SyncStream::<Sink, Stream>::process_payload(
origin, payload, object_id, protocol, collab, sink,
)
Expand Down Expand Up @@ -319,10 +314,13 @@ where
let msg = msg?;
trace!(" {}", msg);
let is_sync_step_1 = matches!(msg, Message::Sync(SyncMessage::SyncStep1(_)));
if let Some(payload) = handle_msg(&Some(origin), protocol, collab, msg).await? {
if let Some(payload) = handle_msg(&Some(origin), protocol, collab, msg)? {
if is_sync_step_1 {
// flush
collab.lock().flush()
match collab.try_lock() {
None => warn!("Failed to acquire lock for flushing collab"),
Some(collab_guard) => collab_guard.flush(),
}
}

let object_id = object_id.to_string();
Expand Down
6 changes: 3 additions & 3 deletions libs/client-api/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ impl Client {
token
.as_ref()
.ok_or(AppResponseError::from(AppError::NotLoggedIn(
"fail to get expires_at".to_string(),
"token is empty".to_string(),
)))?
.expires_at,
),
Expand Down Expand Up @@ -996,8 +996,8 @@ impl Client {
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs() as i64;
if time_now_sec + 60 > expires_at {
// Add 60 seconds buffer
if time_now_sec + 10 > expires_at {
// Add 10 seconds buffer
self.refresh_token().await?;
}

Expand Down
1 change: 1 addition & 0 deletions libs/client-api/src/ws/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ where
while let Ok(msg) = recv.recv().await {
if let Err(err) = tx.send(Ok(msg)) {
trace!("Failed to send message to channel stream: {}", err);
break;
}
}
trace!("WebSocketChannel {} stream closed", object_id);
Expand Down
4 changes: 2 additions & 2 deletions libs/database-entity/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
serde = { version = "1.0.130", features = ["derive"] }
serde_json = "1.0.108"
serde.workspace = true
serde_json.workspace = true
sqlx = { version = "0.7", default-features = false, features = ["macros"] }
collab-entity = { version = "0.1.0" }
validator = { version = "0.16", features = ["validator_derive", "derive"] }
Expand Down
4 changes: 2 additions & 2 deletions libs/database/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ app-error = { workspace = true, features = ["sqlx_error", "validation_error", "s
tokio = { version = "1.26", features = ["sync"] }
async-trait = "0.1.73"
anyhow = "1.0.75"
serde = { version = "1.0.130", features = ["derive"] }
serde_json = "1.0.68"
serde.workspace = true
serde_json.workspace = true

sqlx = { version = "0.7", default-features = false, features = ["postgres", "chrono", "uuid", "macros", "runtime-tokio-rustls", "rust_decimal"] }
tracing = { version = "0.1.37" }
Expand Down
4 changes: 2 additions & 2 deletions libs/gotrue-entity/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0.105"
serde.workspace = true
serde_json.workspace = true
anyhow = "1.0.75"
reqwest = "0.11.20"
lazy_static = "1.4.0"
Expand Down
2 changes: 1 addition & 1 deletion libs/gotrue-entity/src/dto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ pub struct GotrueTokenResponse {
pub token_type: String,
/// the access_token will remain valid before it expires and needs to be refreshed.
pub expires_in: i64,
/// a timestamp indicating the exact time at which the access_token will expire.
/// a timestamp in seconds indicating the exact time at which the access_token will expire.
pub expires_at: i64,
/// The refresh token is used to obtain a new access_token once the current access_token expires.
/// Refresh tokens are usually long-lived and are stored securely by the client.
Expand Down
4 changes: 2 additions & 2 deletions libs/gotrue/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0.105"
serde.workspace = true
serde_json.workspace = true
futures-util = "0.3.8"
anyhow = "1.0.75"
reqwest = { version = "0.11.20", default-features = false, features = ["json", "rustls-tls", "cookies"] }
Expand Down
4 changes: 2 additions & 2 deletions libs/infra/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ edition = "2021"
[dependencies]
reqwest = { version = "0.11.20", default-features = false }
anyhow = "1.0.75"
serde = { version = "1.0.130" }
serde_json = "1.0.105"
serde.workspace = true
serde_json ="1.0.105"
4 changes: 2 additions & 2 deletions libs/realtime-entity/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ edition = "2021"
[dependencies]
collab = { version = "0.1.0" }
collab-entity = { version = "0.1.0" }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
serde.workspace = true
serde_json.workspace = true
bytes = { version = "1.0", features = ["serde"] }
anyhow = "1.0.75"
actix = { version = "0.13", optional = true }
Expand Down
12 changes: 0 additions & 12 deletions libs/realtime-entity/src/collab_msg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,15 +110,9 @@ impl Ord for CollabMessage {
}

impl CollabMessage {
/// Currently, only have one business id. So just return 1.
pub fn business_id(&self) -> u8 {
1
}

pub fn is_init(&self) -> bool {
matches!(self, CollabMessage::ClientInit(_))
}

pub fn msg_id(&self) -> Option<MsgId> {
match self {
CollabMessage::ClientInit(value) => Some(value.msg_id),
Expand All @@ -130,22 +124,18 @@ impl CollabMessage {
CollabMessage::CloseCollab(_) => None,
}
}

pub fn to_vec(&self) -> Vec<u8> {
serde_json::to_vec(self).unwrap_or_default()
}

pub fn from_vec(data: &[u8]) -> Result<Self, serde_json::Error> {
serde_json::from_slice(data)
}

pub fn len(&self) -> usize {
self
.payload()
.map(|payload| payload.len())
.unwrap_or_default()
}

pub fn payload(&self) -> Option<&Bytes> {
match self {
CollabMessage::ClientInit(value) => Some(&value.payload),
Expand All @@ -157,11 +147,9 @@ impl CollabMessage {
CollabMessage::CloseCollab(_) => None,
}
}

pub fn is_empty(&self) -> bool {
self.len() == 0
}

pub fn origin(&self) -> Option<&CollabOrigin> {
match self {
CollabMessage::ClientInit(value) => Some(&value.origin),
Expand Down
4 changes: 2 additions & 2 deletions libs/realtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ edition = "2021"
[dependencies]
actix = "0.13"
actix-web-actors = { version = "4.2.0" }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0.108"
serde.workspace = true
serde_json.workspace = true
thiserror = "1.0.30"
bytes = { version = "1.0", features = ["serde"] }
parking_lot = { version = "0.12.1", features = ["arc_lock"] }
Expand Down
2 changes: 1 addition & 1 deletion libs/realtime/src/collaborate/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ impl CollabBroadcast {
match msg {
Ok(msg) => {
if let Ok(payload) =
handle_msg(&collab_msg_origin, &ServerSyncProtocol, &collab, msg).await {
handle_msg(&collab_msg_origin, &ServerSyncProtocol, &collab, msg) {
// Send the response to the corresponding client
match collab_msg_origin {
None => warn!("Client message does not have a origin"),
Expand Down
2 changes: 1 addition & 1 deletion libs/shared-entity/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ edition = "2021"
[dependencies]
anyhow = "1.0.75"
serde = "1.0.188"
serde_json = "1.0.105"
serde_json.workspace = true
serde_repr = "0.1.16"
thiserror = "1.0.47"
reqwest = "0.11.18"
Expand Down
2 changes: 1 addition & 1 deletion libs/token/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
serde = { version = "1.0", features = ["derive"] }
serde.workspace = true
chrono = { version = "0.4.23", features = ["serde", "clock"], default-features = false }
jwt = "0.16.0"
thiserror = "1.0.30"
Expand Down

0 comments on commit c7e90eb

Please sign in to comment.