Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: ws ping #66

Merged
merged 7 commits into from
Sep 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions .github/workflows/client_api_check.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
name: ClientAPI Check

on:
push:
branches: [ main ]
pull_request:
types: [ opened, synchronize, reopened ]
branches: [ main ]

jobs:
test:
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v3
- uses: dtolnay/rust-toolchain@stable
- uses: Swatinem/rust-cache@v2
with:
workspaces: |
AppFlowy-Cloud

- name: Check
working-directory: ./libs/client-api
run: cargo check

14 changes: 9 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion libs/client-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ parking_lot = "0.12.1"
tracing = { version = "0.1" }
thiserror = "1.0.39"
serde = { version = "1.0", features = ["derive"] }
tokio-tungstenite = { version = "0.18" }
tokio-tungstenite = { version = "0.20" }
tokio = { version = "1.26", features = ["full"] }
futures-util = "0.3.26"
tokio-retry = "0.3"
Expand Down
155 changes: 15 additions & 140 deletions libs/client-api/src/ws/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ use std::net::SocketAddr;
use std::sync::{Arc, Weak};
use std::time::Duration;

use crate::ws::ping::ServerFixIntervalPing;
use crate::ws::retry::ConnectAction;
use crate::ws::state::{ConnectState, ConnectStateNotify};
use crate::ws::{BusinessID, ClientRealtimeMessage, WSError, WebSocketChannel};
use tokio::sync::broadcast::{channel, Receiver, Sender};
use tokio::sync::{Mutex, RwLock};
use tokio_retry::strategy::FixedInterval;
use tokio_retry::strategy::FibonacciBackoff;
use tokio_retry::{Condition, RetryIf};
use tokio_tungstenite::tungstenite::Message;
use tokio_tungstenite::MaybeTlsStream;
Expand All @@ -29,7 +31,7 @@ impl Default for WSClientConfig {
Self {
buffer_capacity: 1000,
ping_per_secs: 8,
retry_connect_per_pings: 10,
retry_connect_per_pings: 20,
}
}
}
Expand All @@ -39,7 +41,7 @@ type ChannelByObjectId = HashMap<String, Weak<WebSocketChannel>>;
pub struct WSClient {
addr: Arc<parking_lot::Mutex<Option<String>>>,
config: WSClientConfig,
state: Arc<Mutex<ConnectStateNotify>>,
state_notify: Arc<Mutex<ConnectStateNotify>>,
sender: Sender<Message>,
channels: Arc<RwLock<HashMap<BusinessID, ChannelByObjectId>>>,
ping: Arc<Mutex<Option<ServerFixIntervalPing>>>,
Expand All @@ -54,7 +56,7 @@ impl WSClient {
WSClient {
addr: Arc::new(parking_lot::Mutex::new(None)),
config,
state,
state_notify: state,
sender,
channels,
ping,
Expand All @@ -63,9 +65,12 @@ impl WSClient {

pub async fn connect(&self, addr: String) -> Result<Option<SocketAddr>, WSError> {
*self.addr.lock() = Some(addr.clone());

if let Some(old_ping) = self.ping.lock().await.as_ref() {
old_ping.stop().await;
}
self.set_state(ConnectState::Connecting).await;
let retry_strategy = FixedInterval::new(Duration::from_secs(2)).take(3);

let retry_strategy = FibonacciBackoff::from_millis(2000).max_delay(Duration::from_secs(5 * 60));
let action = ConnectAction::new(addr.clone());
let cond = RetryCondition {
connecting_addr: addr,
Expand All @@ -84,7 +89,7 @@ impl WSClient {

let mut ping = ServerFixIntervalPing::new(
Duration::from_secs(self.config.ping_per_secs),
self.state.clone(),
self.state_notify.clone(),
sender.clone(),
self.config.retry_connect_per_pings,
);
Expand Down Expand Up @@ -160,11 +165,11 @@ impl WSClient {
}

pub async fn subscribe_connect_state(&self) -> Receiver<ConnectState> {
self.state.lock().await.subscribe()
self.state_notify.lock().await.subscribe()
}

pub async fn is_connected(&self) -> bool {
self.state.lock().await.state.is_connected()
self.state_notify.lock().await.state.is_connected()
}

pub async fn disconnect(&self) {
Expand All @@ -174,137 +179,7 @@ impl WSClient {
}

async fn set_state(&self, state: ConnectState) {
self.state.lock().await.set_state(state);
}
}

struct ServerFixIntervalPing {
duration: Duration,
sender: Option<Sender<Message>>,
#[allow(dead_code)]
stop_tx: tokio::sync::mpsc::Sender<()>,
stop_rx: Option<tokio::sync::mpsc::Receiver<()>>,
state: Arc<Mutex<ConnectStateNotify>>,
ping_count: Arc<Mutex<u32>>,
retry_connect_per_pings: u32,
}

impl ServerFixIntervalPing {
fn new(
duration: Duration,
state: Arc<Mutex<ConnectStateNotify>>,
sender: Sender<Message>,
retry_connect_per_pings: u32,
) -> Self {
let (tx, rx) = tokio::sync::mpsc::channel(1000);
Self {
duration,
stop_tx: tx,
stop_rx: Some(rx),
state,
sender: Some(sender),
ping_count: Arc::new(Mutex::new(0)),
retry_connect_per_pings,
}
}

fn run(&mut self) {
let mut stop_rx = self.stop_rx.take().expect("Only take once");
let mut interval = tokio::time::interval(self.duration);
let sender = self.sender.take().expect("Only take once");
let mut receiver = sender.subscribe();
let weak_ping_count = Arc::downgrade(&self.ping_count);
let weak_state = Arc::downgrade(&self.state);
let reconnect_per_ping = self.retry_connect_per_pings;
tokio::spawn(async move {
loop {
tokio::select! {
_ = interval.tick() => {
// Send the ping
tracing::trace!("🙂ping");
let _ = sender.send(Message::Ping(vec![]));
if let Some(ping_count) = weak_ping_count.upgrade() {
let mut lock = ping_count.lock().await;
// After ten ping were sent, mark the connection as disconnected
if *lock >= reconnect_per_ping {
if let Some(state) =weak_state.upgrade() {
state.lock().await.set_state(ConnectState::Disconnected);
}
} else {
*lock +=1;
}
}
},
msg = receiver.recv() => {
if let Ok(Message::Pong(_)) = msg {
tracing::trace!("🟢Receive pong from server");
if let Some(ping_count) = weak_ping_count.upgrade() {
let mut lock = ping_count.lock().await;
*lock = 0;

if let Some(state) =weak_state.upgrade() {
state.lock().await.set_state(ConnectState::Connected);
}
}
}
},
_ = stop_rx.recv() => {
break;
}
}
}
});
}
}

pub struct ConnectStateNotify {
state: ConnectState,
sender: Sender<ConnectState>,
}

impl ConnectStateNotify {
fn new() -> Self {
let (sender, _) = channel(100);
Self {
state: ConnectState::Disconnected,
sender,
}
}

fn set_state(&mut self, state: ConnectState) {
if self.state != state {
tracing::trace!("[🙂Client]: connect state changed to {:?}", state);
self.state = state.clone();
let _ = self.sender.send(state);
}
}

fn subscribe(&self) -> Receiver<ConnectState> {
self.sender.subscribe()
}
}

#[derive(Clone, Eq, PartialEq, Debug)]
pub enum ConnectState {
Connecting,
Connected,
Disconnected,
}

impl ConnectState {
#[allow(dead_code)]
fn is_connecting(&self) -> bool {
matches!(self, ConnectState::Connecting)
}

#[allow(dead_code)]
fn is_connected(&self) -> bool {
matches!(self, ConnectState::Connected)
}

#[allow(dead_code)]
fn is_disconnected(&self) -> bool {
matches!(self, ConnectState::Disconnected)
self.state_notify.lock().await.set_state(state);
}
}

Expand Down
4 changes: 0 additions & 4 deletions libs/client-api/src/ws/error.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use crate::ws::ClientRealtimeMessage;
use tokio_stream::wrappers::errors::BroadcastStreamRecvError;

#[derive(Debug, thiserror::Error)]
pub enum WSError {
Expand All @@ -15,9 +14,6 @@ pub enum WSError {
#[error(transparent)]
SenderError(#[from] tokio::sync::broadcast::error::SendError<ClientRealtimeMessage>),

#[error(transparent)]
BroadcastStreamRecvError(#[from] BroadcastStreamRecvError),

#[error("Internal failure: {0}")]
Internal(#[from] Box<dyn std::error::Error + Send + Sync>),
}
3 changes: 3 additions & 0 deletions libs/client-api/src/ws/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@ mod client;
mod error;
mod handler;
mod msg;
pub(crate) mod ping;
mod retry;
mod state;

pub use client::*;
pub use error::*;
pub use handler::*;
pub use msg::*;
pub use state::*;
Loading