Skip to content

Commit a57550d

Browse files
authored
Cosmetic fixes (#64)
* Cosmetic fixes * fix test
1 parent d791f06 commit a57550d

File tree

4 files changed

+20
-57
lines changed

4 files changed

+20
-57
lines changed

src/client.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1689,7 +1689,7 @@ where
16891689
address: &Address,
16901690
) -> Result<(), Error> {
16911691
// We want to update this in the LRU to know this was recently used and add it if it isn't there already
1692-
// This could be the case if it was evicted or if doesn't exist (ie. we reloaded and it go removed)
1692+
// This could be the case if it was evicted or if doesn't exist (ie. we reloaded and it got removed)
16931693
pool.register_parse_to_cache(hash, parse);
16941694

16951695
if let Err(err) = server
@@ -1704,8 +1704,7 @@ where
17041704
}
17051705

17061706
/// Register and rewrite the parse statement to the clients statement cache
1707-
/// and also the pool's statement cache. Add it to extended protocol data
1708-
/// Returns True if the parse was registered, False otherwise
1707+
/// and also the pool's statement cache. Add it to extended protocol data.
17091708
fn buffer_parse(&mut self, message: BytesMut, pool: &ConnectionPool) -> Result<(), Error> {
17101709
// Avoid parsing if prepared statements not enabled
17111710
let client_given_name = match self.prepared_statements_enabled {
@@ -1755,7 +1754,6 @@ where
17551754

17561755
/// Rewrite the Bind (F) message to use the prepared statement name
17571756
/// saved in the client cache.
1758-
/// Returns True if the bind is a named prepared statement, False otherwise
17591757
async fn buffer_bind(&mut self, message: BytesMut) -> Result<(), Error> {
17601758
// Avoid parsing if prepared statements not enabled
17611759
let client_given_name = match self.prepared_statements_enabled {

src/messages.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1322,7 +1322,7 @@ impl Display for PgErrorMsg {
13221322
}
13231323

13241324
impl PgErrorMsg {
1325-
pub fn parse(error_msg: Vec<u8>) -> Result<PgErrorMsg, Error> {
1325+
pub fn parse(error_msg: &[u8]) -> Result<PgErrorMsg, Error> {
13261326
let mut out = PgErrorMsg {
13271327
severity_localized: "".to_string(),
13281328
severity: "".to_string(),
@@ -1470,7 +1470,7 @@ mod tests {
14701470

14711471
info!(
14721472
"full message: {}",
1473-
PgErrorMsg::parse(complete_msg.clone()).unwrap()
1473+
PgErrorMsg::parse(&complete_msg).unwrap()
14741474
);
14751475
assert_eq!(
14761476
PgErrorMsg {
@@ -1493,7 +1493,7 @@ mod tests {
14931493
line: Some(335),
14941494
routine: Some(routine_msg.to_string()),
14951495
},
1496-
PgErrorMsg::parse(complete_msg).unwrap()
1496+
PgErrorMsg::parse(&complete_msg).unwrap()
14971497
);
14981498

14991499
let mut only_mandatory_msg = vec![];
@@ -1503,7 +1503,7 @@ mod tests {
15031503
only_mandatory_msg.extend(field('M', message));
15041504
only_mandatory_msg.extend(field('D', detail_msg));
15051505

1506-
let err_fields = PgErrorMsg::parse(only_mandatory_msg.clone()).unwrap();
1506+
let err_fields = PgErrorMsg::parse(&only_mandatory_msg).unwrap();
15071507
info!("only mandatory fields: {}", &err_fields);
15081508
error!(
15091509
"server error: {}: {}",
@@ -1530,7 +1530,7 @@ mod tests {
15301530
line: None,
15311531
routine: None,
15321532
},
1533-
PgErrorMsg::parse(only_mandatory_msg).unwrap()
1533+
PgErrorMsg::parse(&only_mandatory_msg).unwrap()
15341534
);
15351535
}
15361536
}

src/server.rs

Lines changed: 11 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ use once_cell::sync::Lazy;
88
use parking_lot::{Mutex, RwLock};
99
use postgres_protocol::message;
1010
use std::collections::{HashMap, HashSet};
11-
use std::io::Cursor;
1211
use std::mem;
1312
use std::net::IpAddr;
1413
use std::num::NonZeroUsize;
@@ -720,7 +719,7 @@ impl Server {
720719
}
721720
};
722721

723-
let fields = match PgErrorMsg::parse(error) {
722+
let fields = match PgErrorMsg::parse(&error) {
724723
Ok(f) => f,
725724
Err(err) => {
726725
return Err(err);
@@ -970,40 +969,18 @@ impl Server {
970969
if self.in_copy_mode {
971970
self.in_copy_mode = false;
972971
}
973-
// TODO: consider logging a warning here
974972

975973
if self.prepared_statement_enabled {
976-
// It's probably okay to parse this since we don't expect to get ErrorResponses too frequently
977-
let mut cursor = Cursor::new(&message);
978-
979-
loop {
980-
let code = cursor.get_u8();
981-
match cursor.read_string() {
982-
Ok(content) => {
983-
println!(
984-
"GOT CODE: {} with content: {}",
985-
code as char, content
986-
);
987-
// This is allowed to be what looks like an infinite loop
988-
// because the 'M' message is always present
989-
if code as char == 'M' {
990-
if content == "cached plan must not change result type" {
991-
// This will still result in an error to the client, but this server connection
992-
// will be dropped, and will not bleed into the pool.
993-
// TODO: Other ideas to solve issues with DDL changes when using prepared statements
994-
// - Recreate connection pool to force recreation of server connections
995-
// - Just close the prepared statement instead of dropping the connection
996-
// - Implement a retry so the client doesn't see an error
997-
self.mark_bad();
998-
}
999-
break;
1000-
}
1001-
}
1002-
Err(_) => {
1003-
warn!("Encountered an error while parsing ErrorResponse");
1004-
break;
1005-
}
1006-
}
974+
let error_message = crate::messages::PgErrorMsg::parse(&message)?;
975+
if error_message.message == "cached plan must not change result type" {
976+
warn!("Server {:?} changed schema, dropping connection to clean up prepared statements", self.address);
977+
// This will still result in an error to the client, but this server connection
978+
// will be dropped, and will not bleed into the pool.
979+
// TODO: Other ideas to solve issues with DDL changes when using prepared statements
980+
// - Recreate connection pool to force recreation of server connections
981+
// - Just close the prepared statement instead of dropping the connection
982+
// - Implement a retry so the client doesn't see an error
983+
self.mark_bad();
1007984
}
1008985
}
1009986
}

tests/pgbench/simple.sql

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,6 @@
1010

1111
\set shard random(0, 2)
1212

13-
SET SHARD TO :shard;
14-
15-
SET SERVER ROLE TO 'auto';
16-
1713
BEGIN;
1814

1915
UPDATE pgbench_accounts SET abalance = abalance + :delta WHERE aid = :aid;
@@ -26,14 +22,6 @@ UPDATE pgbench_branches SET bbalance = bbalance + :delta WHERE bid = :bid;
2622

2723
INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP);
2824

29-
END;
25+
SELECT * FROM pgbench_accounts;
3026

31-
SET SHARDING KEY TO :aid;
32-
33-
-- Read load balancing
34-
SELECT abalance FROM pgbench_accounts WHERE aid = :aid;
35-
36-
SET SERVER ROLE TO 'replica';
37-
38-
-- Read load balancing
39-
SELECT abalance FROM pgbench_accounts WHERE aid = :aid;
27+
END;

0 commit comments

Comments
 (0)