Skip to content

Commit

Permalink
Remove custom AsyncMutex logic
Browse files Browse the repository at this point in the history
Pausing with a debugger and inspecting the backtrace is the way to go.
  • Loading branch information
Lonami committed Oct 28, 2023
1 parent 2759194 commit 17d23e1
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 109 deletions.
4 changes: 2 additions & 2 deletions lib/grammers-client/src/client/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ impl Client {
Err(InvocationError::Rpc(err)) if err.code == 303 => {
let dc_id = err.value.unwrap() as i32;
let (sender, request_tx) = connect_sender(dc_id, &self.0.config).await?;
*self.0.conn.sender.lock("client.bot_sign_in").await = sender;
*self.0.conn.sender.lock().await = sender;
*self.0.conn.request_tx.write().unwrap() = request_tx;
{
let mut state = self.0.state.write().unwrap();
Expand Down Expand Up @@ -233,7 +233,7 @@ impl Client {
// before trying again.
let dc_id = err.value.unwrap() as i32;
let (sender, request_tx) = connect_sender(dc_id, &self.0.config).await?;
*self.0.conn.sender.lock("client.request_login_code").await = sender;
*self.0.conn.sender.lock().await = sender;
*self.0.conn.request_tx.write().unwrap() = request_tx;
{
let mut state = self.0.state.write().unwrap();
Expand Down
16 changes: 7 additions & 9 deletions lib/grammers-client/src/client/files.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
// except according to those terms.

use crate::types::{Downloadable, Media, Uploaded};
use crate::utils::{generate_random_id, AsyncMutex};
use crate::utils::generate_random_id;
use crate::Client;
use futures_util::stream::{FuturesUnordered, StreamExt as _};
use grammers_mtsender::InvocationError;
Expand All @@ -17,6 +17,7 @@ use tokio::sync::mpsc::unbounded_channel;
use tokio::{
fs,
io::{self, AsyncRead, AsyncReadExt, AsyncSeekExt, AsyncWriteExt},
sync::Mutex as AsyncMutex,
};

pub const MIN_CHUNK_SIZE: i32 = 4 * 1024;
Expand Down Expand Up @@ -482,13 +483,10 @@ impl<'a, S: AsyncRead + Unpin> PartStream<'a, S> {
fn new(stream: &'a mut S, size: usize) -> Self {
let total_parts = ((size + MAX_CHUNK_SIZE as usize - 1) / MAX_CHUNK_SIZE as usize) as i32;
Self {
inner: AsyncMutex::new(
"upload_stream",
PartStreamInner {
stream,
current_part: 0,
},
),
inner: AsyncMutex::new(PartStreamInner {
stream,
current_part: 0,
}),
total_parts,
}
}
Expand All @@ -498,7 +496,7 @@ impl<'a, S: AsyncRead + Unpin> PartStream<'a, S> {
}

async fn next_part(&self) -> Result<Option<(i32, Vec<u8>)>, io::Error> {
let mut lock = self.inner.lock("read part").await;
let mut lock = self.inner.lock().await;
if lock.current_part >= self.total_parts {
return Ok(None);
}
Expand Down
8 changes: 4 additions & 4 deletions lib/grammers-client/src/client/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use super::client::ClientState;
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use super::{Client, ClientInner, Config};
use crate::utils::{self, AsyncMutex};
use crate::utils;
use grammers_mtproto::mtp::{self, RpcError};
use grammers_mtproto::transport;
use grammers_mtsender::{self as sender, AuthorizationError, InvocationError, Sender};
Expand All @@ -19,7 +19,7 @@ use std::collections::{HashMap, VecDeque};
use std::net::{Ipv4Addr, SocketAddr};
use std::sync::{Arc, RwLock};
use tokio::sync::oneshot::error::TryRecvError;
use tokio::sync::{Notify, RwLock as AsyncRwLock};
use tokio::sync::{Mutex as AsyncMutex, Notify, RwLock as AsyncRwLock};

/// Socket addresses to Telegram datacenters, where the index into this array
/// represents the data center ID.
Expand Down Expand Up @@ -366,7 +366,7 @@ impl Client {
impl Connection {
fn new(sender: Sender<transport::Full, mtp::Encrypted>, request_tx: Enqueuer) -> Self {
Self {
sender: AsyncMutex::new("client.sender", sender),
sender: AsyncMutex::new(sender),
request_tx: RwLock::new(request_tx),
stepping_done: Notify::new(),
}
Expand Down Expand Up @@ -415,7 +415,7 @@ impl Connection {
}

async fn step(&self) -> Result<Vec<tl::enums::Updates>, sender::ReadError> {
match self.sender.try_lock("client.step") {
match self.sender.try_lock() {
Ok(mut sender) => {
// Sender was unlocked, we're the ones that will perform the network step.
let updates = sender.step().await?;
Expand Down
94 changes: 0 additions & 94 deletions lib/grammers-client/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use crate::types;
use chrono::{DateTime, NaiveDateTime, TimeZone, Utc};
use grammers_session::{PackedChat, PackedType};
use grammers_tl_types as tl;
use log::trace;
use std::sync::atomic::{AtomicI64, Ordering};
use std::time::SystemTime;

Expand Down Expand Up @@ -104,96 +103,3 @@ pub(crate) fn always_find_entity(
None => types::Chat::unpack(get_packed()),
}
}

pub(crate) struct MutexGuard<'a, T: ?Sized> {
name: &'static str,
reason: &'static str,
guard: std::sync::MutexGuard<'a, T>,
}

impl<T: ?Sized> std::ops::Deref for MutexGuard<'_, T> {
type Target = T;

fn deref(&self) -> &T {
self.guard.deref()
}
}

impl<T: ?Sized> std::ops::DerefMut for MutexGuard<'_, T> {
fn deref_mut(&mut self) -> &mut T {
self.guard.deref_mut()
}
}

impl<'a, T: ?Sized> Drop for MutexGuard<'a, T> {
fn drop(&mut self) {
trace!("unlocking {} for {}", self.name, self.reason);
}
}

pub(crate) struct AsyncMutex<T: ?Sized> {
name: &'static str,
mutex: tokio::sync::Mutex<T>,
}

pub(crate) struct AsyncMutexGuard<'a, T: ?Sized> {
name: &'static str,
reason: &'static str,
guard: tokio::sync::MutexGuard<'a, T>,
}

impl<T> AsyncMutex<T> {
pub fn new(name: &'static str, value: T) -> Self {
Self {
name,
mutex: tokio::sync::Mutex::new(value),
}
}

pub fn try_lock<'a>(
&'a self,
reason: &'static str,
) -> Result<AsyncMutexGuard<'a, T>, tokio::sync::TryLockError> {
let guard = self.mutex.try_lock();
trace!(
"try-async-locking {} for {} ({})",
self.name,
reason,
if guard.is_ok() { "success" } else { "failure" }
);
guard.map(|guard| AsyncMutexGuard {
name: self.name,
reason,
guard,
})
}

pub async fn lock<'a>(&'a self, reason: &'static str) -> AsyncMutexGuard<'a, T> {
trace!("async-locking {} for {}", self.name, reason);
AsyncMutexGuard {
name: self.name,
reason,
guard: self.mutex.lock().await,
}
}
}

impl<T: ?Sized> std::ops::Deref for AsyncMutexGuard<'_, T> {
type Target = T;

fn deref(&self) -> &T {
self.guard.deref()
}
}

impl<T: ?Sized> std::ops::DerefMut for AsyncMutexGuard<'_, T> {
fn deref_mut(&mut self) -> &mut T {
self.guard.deref_mut()
}
}

impl<'a, T: ?Sized> Drop for AsyncMutexGuard<'a, T> {
fn drop(&mut self) {
trace!("async-unlocking {} for {}", self.name, self.reason);
}
}

0 comments on commit 17d23e1

Please sign in to comment.