Skip to content

Commit

Permalink
Clippy
Browse files Browse the repository at this point in the history
  • Loading branch information
blackbeam committed Nov 8, 2023
1 parent 9d08837 commit f8e0bd5
Show file tree
Hide file tree
Showing 20 changed files with 113 additions and 132 deletions.
4 changes: 2 additions & 2 deletions src/buffer_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
// modified, or distributed except according to those terms.

use crossbeam::queue::ArrayQueue;
use std::{mem::replace, ops::Deref, sync::Arc};
use std::{mem::take, ops::Deref, sync::Arc};

#[derive(Debug)]
pub struct BufferPool {
Expand Down Expand Up @@ -93,6 +93,6 @@ impl Deref for PooledBuf {

impl Drop for PooledBuf {
fn drop(&mut self) {
self.1.put(replace(&mut self.0, vec![]))
self.1.put(take(&mut self.0))
}
}
46 changes: 20 additions & 26 deletions src/conn/binlog_stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,25 +139,25 @@ impl futures_core::stream::Stream for BinlogStream {
Err(err) => return Poll::Ready(Some(Err(err.into()))),
};

let first_byte = packet.get(0).copied();
let first_byte = packet.first().copied();

if first_byte == Some(255) {
if let Ok(ErrPacket::Error(err)) =
ParseBuf(&*packet).parse(self.read_packet.conn_ref().capabilities())
ParseBuf(&packet).parse(self.read_packet.conn_ref().capabilities())
{
return Poll::Ready(Some(Err(From::from(err))));
}
}

if first_byte == Some(254) && packet.len() < 8 {
if ParseBuf(&*packet)
if first_byte == Some(254)
&& packet.len() < 8
&& ParseBuf(&packet)
.parse::<OkPacketDeserializer<NetworkStreamTerminator>>(
self.read_packet.conn_ref().capabilities(),
)
.is_ok()
{
return Poll::Ready(None);
}
{
return Poll::Ready(None);
}

if first_byte == Some(0) {
Expand All @@ -171,16 +171,16 @@ impl futures_core::stream::Stream for BinlogStream {
Err(_) => (/* TODO: Log the error */),
}
}
return Poll::Ready(Some(Ok(event)));
Poll::Ready(Some(Ok(event)))
}
Ok(None) => return Poll::Ready(None),
Err(err) => return Poll::Ready(Some(Err(err.into()))),
Ok(None) => Poll::Ready(None),
Err(err) => Poll::Ready(Some(Err(err.into()))),
}
} else {
return Poll::Ready(Some(Err(DriverError::UnexpectedPacket {
Poll::Ready(Some(Err(DriverError::UnexpectedPacket {
payload: packet.to_vec(),
}
.into())));
.into())))
}
}
}
Expand Down Expand Up @@ -294,14 +294,11 @@ mod tests {
event.header().event_type().unwrap();

// iterate over rows of an event
match event.read_data()?.unwrap() {
EventData::RowsEvent(re) => {
let tme = binlog_stream.get_tme(re.table_id());
for row in re.rows(tme.unwrap()) {
row.unwrap();
}
if let EventData::RowsEvent(re) = event.read_data()?.unwrap() {
let tme = binlog_stream.get_tme(re.table_id());
for row in re.rows(tme.unwrap()) {
row.unwrap();
}
_ => (),
}
}
assert!(events_num > 0);
Expand Down Expand Up @@ -334,14 +331,11 @@ mod tests {
event.header().event_type().unwrap();

// iterate over rows of an event
match event.read_data()?.unwrap() {
EventData::RowsEvent(re) => {
let tme = binlog_stream.get_tme(re.table_id());
for row in re.rows(tme.unwrap()) {
row.unwrap();
}
if let EventData::RowsEvent(re) = event.read_data()?.unwrap() {
let tme = binlog_stream.get_tme(re.table_id());
for row in re.rows(tme.unwrap()) {
row.unwrap();
}
_ => (),
}
}
assert!(events_num > 0);
Expand Down
61 changes: 27 additions & 34 deletions src/conn/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ impl Conn {
if let Err(ref e) = self.inner.pending_result {
let e = e.clone();
self.inner.pending_result = Ok(None);
return Err(e);
Err(e)
} else {
Ok(self.inner.pending_result.as_ref().unwrap().as_ref())
}
Expand All @@ -322,8 +322,7 @@ impl Conn {
}

pub(crate) fn has_pending_result(&self) -> bool {
matches!(self.inner.pending_result, Err(_))
|| matches!(self.inner.pending_result, Ok(Some(_)))
self.inner.pending_result.is_err() || matches!(self.inner.pending_result, Ok(Some(_)))
}

/// Sets the given pening result metadata for this connection. Returns the previous value.
Expand Down Expand Up @@ -487,7 +486,7 @@ impl Conn {

async fn handle_handshake(&mut self) -> Result<()> {
let packet = self.read_packet().await?;
let handshake = ParseBuf(&*packet).parse::<HandshakePacket>(())?;
let handshake = ParseBuf(&packet).parse::<HandshakePacket>(())?;

// Handshake scramble is always 21 bytes length (20 + zero terminator)
self.inner.nonce = {
Expand Down Expand Up @@ -563,7 +562,7 @@ impl Conn {
let auth_data = self
.inner
.auth_plugin
.gen_data(self.inner.opts.pass(), &*self.inner.nonce);
.gen_data(self.inner.opts.pass(), &self.inner.nonce);

let handshake_response = HandshakeResponse::new(
auth_data.as_deref(),
Expand Down Expand Up @@ -594,10 +593,9 @@ impl Conn {
if matches!(
auth_switch_request.auth_plugin(),
AuthPlugin::MysqlOldPassword
) {
if self.inner.opts.secure_auth() {
return Err(DriverError::MysqlOldPasswordDisabled.into());
}
) && self.inner.opts.secure_auth()
{
return Err(DriverError::MysqlOldPasswordDisabled.into());
}

self.inner.auth_plugin = auth_switch_request.auth_plugin().clone().into_owned();
Expand Down Expand Up @@ -685,7 +683,7 @@ impl Conn {

async fn continue_caching_sha2_password_auth(&mut self) -> Result<()> {
let packet = self.read_packet().await?;
match packet.get(0) {
match packet.first() {
Some(0x00) => {
// ok packet for empty password
Ok(())
Expand All @@ -712,10 +710,10 @@ impl Conn {
*byte ^= self.inner.nonce[i % self.inner.nonce.len()];
}
let encrypted_pass = crypto::encrypt(
&*pass,
&pass,
self.inner.server_key.as_deref().expect("unreachable"),
);
self.write_bytes(&*encrypted_pass).await?;
self.write_bytes(&encrypted_pass).await?;
};
self.drop_packet().await?;
Ok(())
Expand All @@ -726,7 +724,7 @@ impl Conn {
.into()),
},
Some(0xfe) if !self.inner.auth_switched => {
let auth_switch_request = ParseBuf(&*packet).parse::<AuthSwitchRequest>(())?;
let auth_switch_request = ParseBuf(&packet).parse::<AuthSwitchRequest>(())?;
self.perform_auth_switch(auth_switch_request).await?;
Ok(())
}
Expand All @@ -739,13 +737,13 @@ impl Conn {

async fn continue_mysql_native_password_auth(&mut self) -> Result<()> {
let packet = self.read_packet().await?;
match packet.get(0) {
match packet.first() {
Some(0x00) => Ok(()),
Some(0xfe) if !self.inner.auth_switched => {
let auth_switch = if packet.len() > 1 {
ParseBuf(&*packet).parse(())?
ParseBuf(&packet).parse(())?
} else {
let _ = ParseBuf(&*packet).parse::<OldAuthSwitchRequest>(())?;
let _ = ParseBuf(&packet).parse::<OldAuthSwitchRequest>(())?;
// map OldAuthSwitch to AuthSwitch with mysql_old_password plugin
AuthSwitchRequest::new(
"mysql_old_password".as_bytes(),
Expand All @@ -768,24 +766,24 @@ impl Conn {
.capabilities()
.contains(CapabilityFlags::CLIENT_DEPRECATE_EOF)
{
ParseBuf(&*packet)
ParseBuf(packet)
.parse::<OkPacketDeserializer<ResultSetTerminator>>(self.capabilities())
.map(|x| x.into_inner())
} else {
ParseBuf(&*packet)
ParseBuf(packet)
.parse::<OkPacketDeserializer<OldEofPacket>>(self.capabilities())
.map(|x| x.into_inner())
}
} else {
ParseBuf(&*packet)
ParseBuf(packet)
.parse::<OkPacketDeserializer<CommonOkPacket>>(self.capabilities())
.map(|x| x.into_inner())
};

if let Ok(ok_packet) = ok_packet {
self.handle_ok(ok_packet.into_owned());
} else {
let err_packet = ParseBuf(&*packet).parse::<ErrPacket>(self.capabilities());
let err_packet = ParseBuf(packet).parse::<ErrPacket>(self.capabilities());
if let Ok(err_packet) = err_packet {
self.handle_err(err_packet)?;
return Ok(true);
Expand Down Expand Up @@ -1011,23 +1009,21 @@ impl Conn {
fn apply(&self, conn: &mut Conn, value: Option<crate::Value>) {
match self {
Cfg::Socket => {
conn.inner.socket = value.map(crate::from_value).flatten();
conn.inner.socket = value.and_then(crate::from_value);
}
Cfg::MaxAllowedPacket => {
if let Some(stream) = conn.inner.stream.as_mut() {
stream.set_max_allowed_packet(
value
.map(crate::from_value)
.flatten()
.and_then(crate::from_value)
.unwrap_or(DEFAULT_MAX_ALLOWED_PACKET),
);
}
}
Cfg::WaitTimeout => {
conn.inner.wait_timeout = Duration::from_secs(
value
.map(crate::from_value)
.flatten()
.and_then(crate::from_value)
.unwrap_or(DEFAULT_WAIT_TIMEOUT) as u64,
);
}
Expand Down Expand Up @@ -1329,6 +1325,7 @@ mod test {
#[test]
fn opts_should_satisfy_send_and_sync() {
struct A<T: Sync + Send>(T);
#[allow(clippy::unnecessary_operation)]
A(get_opts());
}

Expand Down Expand Up @@ -1672,7 +1669,7 @@ mod test {
async fn should_perform_queries() -> super::Result<()> {
let mut conn = Conn::new(get_opts()).await?;
for x in (MAX_PAYLOAD_LEN - 2)..=(MAX_PAYLOAD_LEN + 2) {
let long_string = ::std::iter::repeat('A').take(x).collect::<String>();
let long_string = "A".repeat(x);
let result: Vec<(String, u8)> = conn
.query(format!(r"SELECT '{}', 231", long_string))
.await?;
Expand Down Expand Up @@ -1724,15 +1721,11 @@ mod test {

#[tokio::test]
async fn should_execute_statement() -> super::Result<()> {
let long_string = ::std::iter::repeat('A')
.take(18 * 1024 * 1024)
.collect::<String>();
let long_string = "A".repeat(18 * 1024 * 1024);
let mut conn = Conn::new(get_opts()).await?;
let stmt = conn.prep(r"SELECT ?").await?;
let result = conn.exec_iter(&stmt, (&long_string,)).await?;
let mut mapped = result
.map_and_drop(|row| from_row::<(String,)>(row))
.await?;
let mut mapped = result.map_and_drop(from_row::<(String,)>).await?;
assert_eq!(mapped.len(), 1);
assert_eq!(mapped.pop(), Some((long_string,)));
let result = conn.exec_iter(&stmt, (42_u8,)).await?;
Expand All @@ -1755,7 +1748,7 @@ mod test {
.exec_iter(&stmt, params! { "foo" => "quux", "bar" => "baz" })
.await?;
let mut mapped = result
.map_and_drop(|row| from_row::<(String, String, String, u8)>(row))
.map_and_drop(from_row::<(String, String, String, u8)>)
.await?;
assert_eq!(mapped.len(), 1);
assert_eq!(
Expand Down Expand Up @@ -1847,7 +1840,7 @@ mod test {
let result = conn.query_iter(q).await?;

let loaded_structs = result
.map_and_drop(|row| crate::from_row::<(Vec<u8>, Vec<u8>, u64, Vec<u8>)>(row))
.map_and_drop(crate::from_row::<(Vec<u8>, Vec<u8>, u64, Vec<u8>)>)
.await?;

conn.disconnect().await?;
Expand Down
2 changes: 1 addition & 1 deletion src/conn/pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -606,7 +606,7 @@ mod test {
}
drop(tx);
// see that all the tx's eventually complete
while let Some(_) = rx.recv().await {}
while (rx.recv().await).is_some() {}
}
drop(pool);
}
Expand Down
7 changes: 3 additions & 4 deletions src/conn/pool/ttl_check_inerval.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,9 @@ impl TtlCheckInterval {
VecDeque::<_>::with_capacity(self.pool_opts.constraints().max());

while let Some(conn) = exchange.available.pop_front() {
if conn.expired() {
to_be_dropped.push(conn);
} else if to_be_dropped.len() < num_to_drop
&& conn.elapsed() > self.pool_opts.inactive_connection_ttl()
if conn.expired()
|| (to_be_dropped.len() < num_to_drop
&& conn.elapsed() > self.pool_opts.inactive_connection_ttl())
{
to_be_dropped.push(conn);
} else {
Expand Down
2 changes: 1 addition & 1 deletion src/conn/routines/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ impl Routine<()> for ExecRoutine<'_> {
}

let (body, as_long_data) =
ComStmtExecuteRequestBuilder::new(self.stmt.id()).build(&*params);
ComStmtExecuteRequestBuilder::new(self.stmt.id()).build(params);

if as_long_data {
conn.send_long_data(self.stmt.id(), params.iter()).await?;
Expand Down
8 changes: 4 additions & 4 deletions src/conn/routines/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,14 @@ impl Conn {
}
};

match packet.get(0) {
match packet.first() {
Some(0x00) => {
self.set_pending_result(Some(P::result_set_meta(Arc::from(
Vec::new().into_boxed_slice(),
))))?;
}
Some(0xFB) => self.handle_local_infile::<P>(&*packet).await?,
_ => self.handle_result_set::<P>(&*packet).await?,
Some(0xFB) => self.handle_local_infile::<P>(&packet).await?,
_ => self.handle_result_set::<P>(&packet).await?,
}

Ok(())
Expand All @@ -98,7 +98,7 @@ impl Conn {
match bytes {
Ok(bytes) => {
// We'll skip empty chunks to stay compliant with the protocol.
if bytes.len() > 0 {
if !bytes.is_empty() {
self.write_bytes(&bytes).await?;
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/conn/routines/prepare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ impl Routine<Arc<StmtInner>> for PrepareRoutine {
.await?;

let packet = conn.read_packet().await?;
let mut inner_stmt = StmtInner::from_payload(&*packet, conn.id(), self.query.clone())?;
let mut inner_stmt = StmtInner::from_payload(&packet, conn.id(), self.query.clone())?;

#[cfg(feature = "tracing")]
Span::current().record("mysql_async.statement.id", inner_stmt.id());
Expand Down
Loading

0 comments on commit f8e0bd5

Please sign in to comment.