Skip to content

Commit

Permalink
refactor: ws ping (#66)
Browse files Browse the repository at this point in the history
* chore: ws client

* chore: update ws ping

* chore: fix test

* chore: public funcs

* chore: fix test

* chore: check clent-api compile
  • Loading branch information
appflowy authored Sep 20, 2023
1 parent df920ef commit 21c221b
Show file tree
Hide file tree
Showing 13 changed files with 213 additions and 162 deletions.
25 changes: 25 additions & 0 deletions .github/workflows/client_api_check.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
name: ClientAPI Check

on:
push:
branches: [ main ]
pull_request:
types: [ opened, synchronize, reopened ]
branches: [ main ]

jobs:
test:
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v3
- uses: dtolnay/rust-toolchain@stable
- uses: Swatinem/rust-cache@v2
with:
workspaces: |
AppFlowy-Cloud
- name: Check
working-directory: ./libs/client-api
run: cargo check

14 changes: 9 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion libs/client-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ parking_lot = "0.12.1"
tracing = { version = "0.1" }
thiserror = "1.0.39"
serde = { version = "1.0", features = ["derive"] }
tokio-tungstenite = { version = "0.18" }
tokio-tungstenite = { version = "0.20" }
tokio = { version = "1.26", features = ["full"] }
futures-util = "0.3.26"
tokio-retry = "0.3"
Expand Down
155 changes: 15 additions & 140 deletions libs/client-api/src/ws/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ use std::net::SocketAddr;
use std::sync::{Arc, Weak};
use std::time::Duration;

use crate::ws::ping::ServerFixIntervalPing;
use crate::ws::retry::ConnectAction;
use crate::ws::state::{ConnectState, ConnectStateNotify};
use crate::ws::{BusinessID, ClientRealtimeMessage, WSError, WebSocketChannel};
use tokio::sync::broadcast::{channel, Receiver, Sender};
use tokio::sync::{Mutex, RwLock};
use tokio_retry::strategy::FixedInterval;
use tokio_retry::strategy::FibonacciBackoff;
use tokio_retry::{Condition, RetryIf};
use tokio_tungstenite::tungstenite::Message;
use tokio_tungstenite::MaybeTlsStream;
Expand All @@ -29,7 +31,7 @@ impl Default for WSClientConfig {
Self {
buffer_capacity: 1000,
ping_per_secs: 8,
retry_connect_per_pings: 10,
retry_connect_per_pings: 20,
}
}
}
Expand All @@ -39,7 +41,7 @@ type ChannelByObjectId = HashMap<String, Weak<WebSocketChannel>>;
pub struct WSClient {
addr: Arc<parking_lot::Mutex<Option<String>>>,
config: WSClientConfig,
state: Arc<Mutex<ConnectStateNotify>>,
state_notify: Arc<Mutex<ConnectStateNotify>>,
sender: Sender<Message>,
channels: Arc<RwLock<HashMap<BusinessID, ChannelByObjectId>>>,
ping: Arc<Mutex<Option<ServerFixIntervalPing>>>,
Expand All @@ -54,7 +56,7 @@ impl WSClient {
WSClient {
addr: Arc::new(parking_lot::Mutex::new(None)),
config,
state,
state_notify: state,
sender,
channels,
ping,
Expand All @@ -63,9 +65,12 @@ impl WSClient {

pub async fn connect(&self, addr: String) -> Result<Option<SocketAddr>, WSError> {
*self.addr.lock() = Some(addr.clone());

if let Some(old_ping) = self.ping.lock().await.as_ref() {
old_ping.stop().await;
}
self.set_state(ConnectState::Connecting).await;
let retry_strategy = FixedInterval::new(Duration::from_secs(2)).take(3);

let retry_strategy = FibonacciBackoff::from_millis(2000).max_delay(Duration::from_secs(5 * 60));
let action = ConnectAction::new(addr.clone());
let cond = RetryCondition {
connecting_addr: addr,
Expand All @@ -84,7 +89,7 @@ impl WSClient {

let mut ping = ServerFixIntervalPing::new(
Duration::from_secs(self.config.ping_per_secs),
self.state.clone(),
self.state_notify.clone(),
sender.clone(),
self.config.retry_connect_per_pings,
);
Expand Down Expand Up @@ -160,11 +165,11 @@ impl WSClient {
}

pub async fn subscribe_connect_state(&self) -> Receiver<ConnectState> {
self.state.lock().await.subscribe()
self.state_notify.lock().await.subscribe()
}

pub async fn is_connected(&self) -> bool {
self.state.lock().await.state.is_connected()
self.state_notify.lock().await.state.is_connected()
}

pub async fn disconnect(&self) {
Expand All @@ -174,137 +179,7 @@ impl WSClient {
}

async fn set_state(&self, state: ConnectState) {
self.state.lock().await.set_state(state);
}
}

struct ServerFixIntervalPing {
duration: Duration,
sender: Option<Sender<Message>>,
#[allow(dead_code)]
stop_tx: tokio::sync::mpsc::Sender<()>,
stop_rx: Option<tokio::sync::mpsc::Receiver<()>>,
state: Arc<Mutex<ConnectStateNotify>>,
ping_count: Arc<Mutex<u32>>,
retry_connect_per_pings: u32,
}

impl ServerFixIntervalPing {
fn new(
duration: Duration,
state: Arc<Mutex<ConnectStateNotify>>,
sender: Sender<Message>,
retry_connect_per_pings: u32,
) -> Self {
let (tx, rx) = tokio::sync::mpsc::channel(1000);
Self {
duration,
stop_tx: tx,
stop_rx: Some(rx),
state,
sender: Some(sender),
ping_count: Arc::new(Mutex::new(0)),
retry_connect_per_pings,
}
}

fn run(&mut self) {
let mut stop_rx = self.stop_rx.take().expect("Only take once");
let mut interval = tokio::time::interval(self.duration);
let sender = self.sender.take().expect("Only take once");
let mut receiver = sender.subscribe();
let weak_ping_count = Arc::downgrade(&self.ping_count);
let weak_state = Arc::downgrade(&self.state);
let reconnect_per_ping = self.retry_connect_per_pings;
tokio::spawn(async move {
loop {
tokio::select! {
_ = interval.tick() => {
// Send the ping
tracing::trace!("🙂ping");
let _ = sender.send(Message::Ping(vec![]));
if let Some(ping_count) = weak_ping_count.upgrade() {
let mut lock = ping_count.lock().await;
// After ten ping were sent, mark the connection as disconnected
if *lock >= reconnect_per_ping {
if let Some(state) =weak_state.upgrade() {
state.lock().await.set_state(ConnectState::Disconnected);
}
} else {
*lock +=1;
}
}
},
msg = receiver.recv() => {
if let Ok(Message::Pong(_)) = msg {
tracing::trace!("🟢Receive pong from server");
if let Some(ping_count) = weak_ping_count.upgrade() {
let mut lock = ping_count.lock().await;
*lock = 0;

if let Some(state) =weak_state.upgrade() {
state.lock().await.set_state(ConnectState::Connected);
}
}
}
},
_ = stop_rx.recv() => {
break;
}
}
}
});
}
}

pub struct ConnectStateNotify {
state: ConnectState,
sender: Sender<ConnectState>,
}

impl ConnectStateNotify {
fn new() -> Self {
let (sender, _) = channel(100);
Self {
state: ConnectState::Disconnected,
sender,
}
}

fn set_state(&mut self, state: ConnectState) {
if self.state != state {
tracing::trace!("[🙂Client]: connect state changed to {:?}", state);
self.state = state.clone();
let _ = self.sender.send(state);
}
}

fn subscribe(&self) -> Receiver<ConnectState> {
self.sender.subscribe()
}
}

#[derive(Clone, Eq, PartialEq, Debug)]
pub enum ConnectState {
Connecting,
Connected,
Disconnected,
}

impl ConnectState {
#[allow(dead_code)]
fn is_connecting(&self) -> bool {
matches!(self, ConnectState::Connecting)
}

#[allow(dead_code)]
fn is_connected(&self) -> bool {
matches!(self, ConnectState::Connected)
}

#[allow(dead_code)]
fn is_disconnected(&self) -> bool {
matches!(self, ConnectState::Disconnected)
self.state_notify.lock().await.set_state(state);
}
}

Expand Down
4 changes: 0 additions & 4 deletions libs/client-api/src/ws/error.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use crate::ws::ClientRealtimeMessage;
use tokio_stream::wrappers::errors::BroadcastStreamRecvError;

#[derive(Debug, thiserror::Error)]
pub enum WSError {
Expand All @@ -15,9 +14,6 @@ pub enum WSError {
#[error(transparent)]
SenderError(#[from] tokio::sync::broadcast::error::SendError<ClientRealtimeMessage>),

#[error(transparent)]
BroadcastStreamRecvError(#[from] BroadcastStreamRecvError),

#[error("Internal failure: {0}")]
Internal(#[from] Box<dyn std::error::Error + Send + Sync>),
}
3 changes: 3 additions & 0 deletions libs/client-api/src/ws/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@ mod client;
mod error;
mod handler;
mod msg;
pub(crate) mod ping;
mod retry;
mod state;

pub use client::*;
pub use error::*;
pub use handler::*;
pub use msg::*;
pub use state::*;
Loading

0 comments on commit 21c221b

Please sign in to comment.