From 2eae0b8252fbd4f66ad4817692eb61566a96ab58 Mon Sep 17 00:00:00 2001 From: Alessandro Andrioni Date: Tue, 15 Feb 2022 23:02:36 +0100 Subject: [PATCH] pgwire: add new send method which filters messages sent to clients Including the logic on how to map client-side severity levels to regular pgwire severity levels. This new `send` method is a wrapper inside StateMachine that always checks if the message should be sent to a client or not, and if so, dispatches to the old implementation in FramedConn. The next commit updates all the callsites to use the new implementation. --- src/pgwire/src/codec.rs | 3 + src/pgwire/src/message.rs | 111 +++++++++++++++++++++++++++++++++++++ src/pgwire/src/protocol.rs | 36 ++++++++++++ 3 files changed, 150 insertions(+) diff --git a/src/pgwire/src/codec.rs b/src/pgwire/src/codec.rs index 459e15c58096..7e7847c6eb3f 100644 --- a/src/pgwire/src/codec.rs +++ b/src/pgwire/src/codec.rs @@ -104,6 +104,9 @@ where /// Note that the connection is not flushed after calling this method. You /// must call [`FramedConn::flush`] explicitly. Returns an error if the /// underlying connection is broken. + /// + /// Please use `StateMachine::send` instead if calling from `StateMachine`, + /// as it applies session-based filters before calling this method. pub async fn send(&mut self, message: M) -> Result<(), io::Error> where M: Into, diff --git a/src/pgwire/src/message.rs b/src/pgwire/src/message.rs index e213c715625a..a8de02b78ab4 100644 --- a/src/pgwire/src/message.rs +++ b/src/pgwire/src/message.rs @@ -12,6 +12,7 @@ use std::collections::HashMap; use itertools::Itertools; use postgres::error::SqlState; +use mz_coord::session::ClientSeverity as CoordClientSeverity; use mz_coord::session::TransactionStatus as CoordTransactionStatus; use mz_coord::{CoordError, StartupMessage}; use mz_pgcopy::CopyErrorNotSupportedResponse; @@ -447,6 +448,74 @@ impl Severity { Severity::Log => "LOG", } } + + /// Checks if a message of a given severity level should be sent to a client. + /// + /// The ordering of severity levels used for client-level filtering differs from the + /// one used for server-side logging in two aspects: INFO messages are always sent, + /// and the LOG severity is considered as below NOTICE, while it is above ERROR for + /// server-side logs. + /// + /// Postgres only considers the session setting after the client authentication + /// handshake is completed. Since this function is only called after client authentication + /// is done, we are not treating this case right now, but be aware if refactoring it. + pub fn should_output_to_client(&self, minimum_client_severity: &CoordClientSeverity) -> bool { + match (minimum_client_severity, self) { + // INFO messages are always sent + (_, Severity::Info) => true, + (CoordClientSeverity::Error, Severity::Error | Severity::Fatal | Severity::Panic) => { + true + } + ( + CoordClientSeverity::Warning, + Severity::Error | Severity::Fatal | Severity::Panic | Severity::Warning, + ) => true, + ( + CoordClientSeverity::Notice, + Severity::Error + | Severity::Fatal + | Severity::Panic + | Severity::Warning + | Severity::Notice, + ) => true, + ( + CoordClientSeverity::Info, + Severity::Error + | Severity::Fatal + | Severity::Panic + | Severity::Warning + | Severity::Notice, + ) => true, + ( + CoordClientSeverity::Log, + Severity::Error + | Severity::Fatal + | Severity::Panic + | Severity::Warning + | Severity::Notice + | Severity::Log, + ) => true, + ( + CoordClientSeverity::Debug1 + | CoordClientSeverity::Debug2 + | CoordClientSeverity::Debug3 + | CoordClientSeverity::Debug4 + | CoordClientSeverity::Debug5, + _, + ) => true, + + ( + CoordClientSeverity::Error, + Severity::Warning | Severity::Notice | Severity::Log | Severity::Debug, + ) => false, + (CoordClientSeverity::Warning, Severity::Notice | Severity::Log | Severity::Debug) => { + false + } + (CoordClientSeverity::Notice, Severity::Log | Severity::Debug) => false, + (CoordClientSeverity::Info, Severity::Log | Severity::Debug) => false, + (CoordClientSeverity::Log, Severity::Debug) => false, + } + } } #[derive(Debug)] @@ -480,3 +549,45 @@ pub fn encode_row_description( }) .collect() } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_should_output_to_client() { + #[rustfmt::skip] + let test_cases = [ + (CoordClientSeverity::Debug1, vec![Severity::Debug, Severity::Log, Severity::Notice, Severity::Warning, Severity::Error, Severity::Fatal, Severity:: Panic, Severity::Info], true), + (CoordClientSeverity::Debug2, vec![Severity::Debug, Severity::Log, Severity::Notice, Severity::Warning, Severity::Error, Severity::Fatal, Severity:: Panic, Severity::Info], true), + (CoordClientSeverity::Debug3, vec![Severity::Debug, Severity::Log, Severity::Notice, Severity::Warning, Severity::Error, Severity::Fatal, Severity:: Panic, Severity::Info], true), + (CoordClientSeverity::Debug4, vec![Severity::Debug, Severity::Log, Severity::Notice, Severity::Warning, Severity::Error, Severity::Fatal, Severity:: Panic, Severity::Info], true), + (CoordClientSeverity::Debug5, vec![Severity::Debug, Severity::Log, Severity::Notice, Severity::Warning, Severity::Error, Severity::Fatal, Severity:: Panic, Severity::Info], true), + (CoordClientSeverity::Log, vec![Severity::Notice, Severity::Warning, Severity::Error, Severity::Fatal, Severity:: Panic, Severity::Info], true), + (CoordClientSeverity::Log, vec![Severity::Debug], false), + (CoordClientSeverity::Info, vec![Severity::Notice, Severity::Warning, Severity::Error, Severity::Fatal, Severity:: Panic, Severity::Info], true), + (CoordClientSeverity::Info, vec![Severity::Debug, Severity::Log], false), + (CoordClientSeverity::Notice, vec![Severity::Notice, Severity::Warning, Severity::Error, Severity::Fatal, Severity:: Panic, Severity::Info], true), + (CoordClientSeverity::Notice, vec![Severity::Debug, Severity::Log], false), + (CoordClientSeverity::Warning, vec![Severity::Warning, Severity::Error, Severity::Fatal, Severity:: Panic, Severity::Info], true), + (CoordClientSeverity::Warning, vec![Severity::Debug, Severity::Log, Severity::Notice], false), + (CoordClientSeverity::Error, vec![Severity::Error, Severity::Fatal, Severity:: Panic, Severity::Info], true), + (CoordClientSeverity::Error, vec![Severity::Debug, Severity::Log, Severity::Notice, Severity::Warning], false), + ]; + + for test_case in test_cases { + run_test(test_case) + } + + fn run_test(test_case: (CoordClientSeverity, Vec, bool)) { + let client_min_messages_setting = test_case.0; + let expected = test_case.2; + for message_severity in test_case.1 { + assert!( + message_severity.should_output_to_client(&client_min_messages_setting) + == expected + ) + } + } + } +} diff --git a/src/pgwire/src/protocol.rs b/src/pgwire/src/protocol.rs index 62b20e1271c8..a9a0120111a0 100644 --- a/src/pgwire/src/protocol.rs +++ b/src/pgwire/src/protocol.rs @@ -949,6 +949,42 @@ where Ok(State::Ready) } + /// Sends a backend message to the client, after applying a severity filter. + /// + /// The message is only sent if its severity is above the severity set + /// in the session, with the default value being NOTICE. + async fn send(&mut self, message: M) -> Result<(), io::Error> + where + M: Into, + { + let message: BackendMessage = message.into(); + match message { + BackendMessage::ErrorResponse(ref err) => { + let minimum_client_severity = + self.coord_client.session().vars().client_min_messages(); + if err + .severity + .should_output_to_client(minimum_client_severity) + { + self.conn.send(message).await + } else { + Ok(()) + } + } + _ => self.conn.send(message).await, + } + } + + pub async fn send_all( + &mut self, + messages: impl IntoIterator, + ) -> Result<(), io::Error> { + for m in messages { + self.send(m).await?; + } + Ok(()) + } + async fn sync(&mut self) -> Result { // Close the current transaction if we are in an implicit transaction. if self.coord_client.session().transaction().is_implicit() {