Skip to content

Commit

Permalink
bruh
Browse files Browse the repository at this point in the history
  • Loading branch information
quetz committed Mar 6, 2024
1 parent 59ed77a commit 3674dad
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 20 deletions.
2 changes: 1 addition & 1 deletion lib/grammers-client/examples/reconnection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ async fn async_main() -> Result {
api_id: 1, // not actually logging in, but has to look real
api_hash: "".to_string(),
params: InitParams {
retry_policy: &MyPolicy,
reconnection_policy: &MyPolicy,
..Default::default()
},
})
Expand Down
10 changes: 5 additions & 5 deletions lib/grammers-client/src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,10 @@ pub struct InitParams {
///
/// for more details refer to [`examples`](lib/grammers-client/examples/reconnection.rs)
///
/// [`NoReconnect`]: grammers_mtsender::NoReconnect
/// [`FixedReconnect`]: grammers_mtsender::FixedReconnect
/// [`ReconnectionPolicy`]: grammers_mtsender::ReconnectionPolicy
pub retry_policy: &'static dyn retry::RetryPolicy,
/// [`NoReconnect`]: grammers_mtsender::retry::NoReconnect
/// [`FixedReconnect`]: grammers_mtsender::retry::FixedReconnect
/// [`ReconnectionPolicy`]: grammers_mtsender::retry::RetryPolicy
pub reconnection_policy: &'static dyn retry::RetryPolicy,
}

pub(crate) struct ClientInner {
Expand Down Expand Up @@ -190,7 +190,7 @@ impl Default for InitParams {
update_queue_limit: Some(100),
#[cfg(feature = "proxy")]
proxy_url: None,
retry_policy: &grammers_mtsender::retry::NoRetry,
reconnection_policy: &grammers_mtsender::retry::NoRetry,
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion lib/grammers-client/src/client/files.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ impl Client {
file.set_len(size as u64).await?;
file.seek(SeekFrom::Start(0)).await?;

let policy = self.0.config.params.retry_policy;
let policy = self.0.config.params.reconnection_policy;

// Start workers
let (tx, mut rx) = unbounded_channel();
Expand Down
13 changes: 8 additions & 5 deletions lib/grammers-client/src/client/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::collections::{HashMap, VecDeque};
use std::net::{Ipv4Addr, SocketAddr};
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::{Arc, RwLock};
use std::time::Duration;
use tokio::sync::oneshot::error::TryRecvError;
use tokio::sync::{Mutex as AsyncMutex, RwLock as AsyncRwLock};

Expand Down Expand Up @@ -72,7 +73,8 @@ pub(crate) async fn connect_sender(
}

#[cfg(not(feature = "proxy"))]
sender::connect_with_auth(transport, addr, auth_key, config.params.retry_policy).await?
sender::connect_with_auth(transport, addr, auth_key, config.params.reconnection_policy)
.await?
} else {
info!(
"creating a new sender and auth key in dc {} {:?}",
Expand All @@ -88,7 +90,8 @@ pub(crate) async fn connect_sender(
};

#[cfg(not(feature = "proxy"))]
let (sender, tx) = sender::connect(transport, addr, config.params.retry_policy).await?;
let (sender, tx) =
sender::connect(transport, addr, config.params.reconnection_policy).await?;

config.session.insert_dc(dc_id, addr, sender.auth_key());
(sender, tx)
Expand Down Expand Up @@ -375,7 +378,7 @@ impl Connection {
flood_sleep_threshold: u32,
on_updates: F,
) -> Result<R::Return, InvocationError> {
const GENERIC_ERROR_TIMEOUT: Duration = Duration::from_secs(5);
const GENERIC_ERROR_TIMEOUT: u64 = 5;

let mut exp_backoff = 0;

Expand All @@ -392,7 +395,7 @@ impl Connection {
name, code, value, ..
})) if code == 500 || code == -503 || code == 420 => {
let delay = if code == 420 {
value.unwrap_or(GENERIC_ERROR_TIMEOUT)
value.map(|v| v as u64).unwrap_or(GENERIC_ERROR_TIMEOUT)
} else {
GENERIC_ERROR_TIMEOUT
} * (1 << exp_backoff);
Expand All @@ -402,7 +405,7 @@ impl Connection {
delay,
std::any::type_name::<R>()
);
tokio::time::sleep(delay).await;
tokio::time::sleep(Duration::from_secs(delay)).await;
rx = self.request_tx.read().unwrap().enqueue(request);
exp_backoff += 1;

Expand Down
18 changes: 10 additions & 8 deletions lib/grammers-mtsender/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ pub struct Sender<T: Transport, M: Mtp> {
containers: HashMap<MsgId, HashSet<MsgId>>,

next_ping: Instant,
retry_policy: &'static dyn retry::RetryPolicy,
reconnection_policy: &'static dyn retry::RetryPolicy,

// Transport-level buffers and positions
read_buffer: RingBuffer<u8>,
Expand Down Expand Up @@ -199,7 +199,7 @@ impl<T: Transport, M: Mtp> Sender<T, M> {
containers: HashMap::new(),

next_ping: Instant::now() + PING_DELAY,
retry_policy: reconnection_policy,
reconnection_policy,

read_buffer,
read_index: 0,
Expand All @@ -216,7 +216,7 @@ impl<T: Transport, M: Mtp> Sender<T, M> {
mtp: M,
addr: SocketAddr,
proxy_url: &str,
reconnection_policy: &'static dyn RetryPolicy,
reconnection_policy: &'static dyn retry::RetryPolicy,
) -> Result<(Self, Enqueuer), io::Error> {
info!("connecting...");

Expand All @@ -236,6 +236,8 @@ impl<T: Transport, M: Mtp> Sender<T, M> {
next_ping: Instant::now() + PING_DELAY,
reconnection_policy,

containers: HashMap::new(),

read_buffer,
read_index: 0,
write_buffer: RingBuffer::with_capacity(MAXIMUM_DATA, LEADING_BUFFER_SPACE),
Expand Down Expand Up @@ -427,7 +429,7 @@ impl<T: Transport, M: Mtp> Sender<T, M> {

attempts += 1;

match self.retry_policy.should_retry(attempts) {
match self.reconnection_policy.should_retry(attempts) {
ControlFlow::Break(_) => {
log::error!(
"attempted more than {} times for reconnection and failed",
Expand Down Expand Up @@ -738,7 +740,7 @@ impl<T: Transport, M: Mtp> Sender<T, M> {
}

pub fn retry_policy(&self) -> &'static dyn retry::RetryPolicy {
self.retry_policy
self.reconnection_policy
}
}

Expand All @@ -762,7 +764,7 @@ pub async fn connect_via_proxy<'a, T: Transport>(
transport: T,
addr: std::net::SocketAddr,
proxy_url: &str,
rc_policy: &'static dyn RetryPolicy,
rc_policy: &'static dyn retry::RetryPolicy,
) -> Result<(Sender<T, mtp::Encrypted>, Enqueuer), AuthorizationError> {
let (sender, enqueuer) =
Sender::connect_via_proxy(transport, mtp::Plain::new(), addr, proxy_url, rc_policy).await?;
Expand Down Expand Up @@ -874,7 +876,7 @@ pub async fn generate_auth_key<T: Transport>(
addr: sender.addr,
#[cfg(feature = "proxy")]
proxy_url: sender.proxy_url,
retry_policy: sender.retry_policy,
reconnection_policy: sender.reconnection_policy,
},
enqueuer,
))
Expand All @@ -901,7 +903,7 @@ pub async fn connect_via_proxy_with_auth<'a, T: Transport>(
addr: std::net::SocketAddr,
auth_key: [u8; 256],
proxy_url: &str,
rc_policy: &'static dyn RetryPolicy,
rc_policy: &'static dyn retry::RetryPolicy,
) -> Result<(Sender<T, mtp::Encrypted>, Enqueuer), io::Error> {
Sender::connect_via_proxy(
transport,
Expand Down

0 comments on commit 3674dad

Please sign in to comment.