Skip to content

Commit

Permalink
pgwire: add new send method which filters messages sent to clients
Browse files Browse the repository at this point in the history
Including the logic on how to map client-side severity levels to regular
pgwire severity levels.

This new `send` method is a wrapper inside StateMachine that always
checks if the message should be sent to a client or not, and if so,
dispatches to the old implementation in FramedConn.

The next commit updates all the callsites to use the new implementation.
  • Loading branch information
andrioni committed Feb 16, 2022
1 parent 8e185c9 commit 2eae0b8
Show file tree
Hide file tree
Showing 3 changed files with 150 additions and 0 deletions.
3 changes: 3 additions & 0 deletions src/pgwire/src/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ where
/// Note that the connection is not flushed after calling this method. You
/// must call [`FramedConn::flush`] explicitly. Returns an error if the
/// underlying connection is broken.
///
/// Please use `StateMachine::send` instead if calling from `StateMachine`,
/// as it applies session-based filters before calling this method.
pub async fn send<M>(&mut self, message: M) -> Result<(), io::Error>
where
M: Into<BackendMessage>,
Expand Down
111 changes: 111 additions & 0 deletions src/pgwire/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use std::collections::HashMap;
use itertools::Itertools;
use postgres::error::SqlState;

use mz_coord::session::ClientSeverity as CoordClientSeverity;
use mz_coord::session::TransactionStatus as CoordTransactionStatus;
use mz_coord::{CoordError, StartupMessage};
use mz_pgcopy::CopyErrorNotSupportedResponse;
Expand Down Expand Up @@ -447,6 +448,74 @@ impl Severity {
Severity::Log => "LOG",
}
}

/// Checks if a message of a given severity level should be sent to a client.
///
/// The ordering of severity levels used for client-level filtering differs from the
/// one used for server-side logging in two aspects: INFO messages are always sent,
/// and the LOG severity is considered as below NOTICE, while it is above ERROR for
/// server-side logs.
///
/// Postgres only considers the session setting after the client authentication
/// handshake is completed. Since this function is only called after client authentication
/// is done, we are not treating this case right now, but be aware if refactoring it.
pub fn should_output_to_client(&self, minimum_client_severity: &CoordClientSeverity) -> bool {
match (minimum_client_severity, self) {
// INFO messages are always sent
(_, Severity::Info) => true,
(CoordClientSeverity::Error, Severity::Error | Severity::Fatal | Severity::Panic) => {
true
}
(
CoordClientSeverity::Warning,
Severity::Error | Severity::Fatal | Severity::Panic | Severity::Warning,
) => true,
(
CoordClientSeverity::Notice,
Severity::Error
| Severity::Fatal
| Severity::Panic
| Severity::Warning
| Severity::Notice,
) => true,
(
CoordClientSeverity::Info,
Severity::Error
| Severity::Fatal
| Severity::Panic
| Severity::Warning
| Severity::Notice,
) => true,
(
CoordClientSeverity::Log,
Severity::Error
| Severity::Fatal
| Severity::Panic
| Severity::Warning
| Severity::Notice
| Severity::Log,
) => true,
(
CoordClientSeverity::Debug1
| CoordClientSeverity::Debug2
| CoordClientSeverity::Debug3
| CoordClientSeverity::Debug4
| CoordClientSeverity::Debug5,
_,
) => true,

(
CoordClientSeverity::Error,
Severity::Warning | Severity::Notice | Severity::Log | Severity::Debug,
) => false,
(CoordClientSeverity::Warning, Severity::Notice | Severity::Log | Severity::Debug) => {
false
}
(CoordClientSeverity::Notice, Severity::Log | Severity::Debug) => false,
(CoordClientSeverity::Info, Severity::Log | Severity::Debug) => false,
(CoordClientSeverity::Log, Severity::Debug) => false,
}
}
}

#[derive(Debug)]
Expand Down Expand Up @@ -480,3 +549,45 @@ pub fn encode_row_description(
})
.collect()
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_should_output_to_client() {
#[rustfmt::skip]
let test_cases = [
(CoordClientSeverity::Debug1, vec![Severity::Debug, Severity::Log, Severity::Notice, Severity::Warning, Severity::Error, Severity::Fatal, Severity:: Panic, Severity::Info], true),
(CoordClientSeverity::Debug2, vec![Severity::Debug, Severity::Log, Severity::Notice, Severity::Warning, Severity::Error, Severity::Fatal, Severity:: Panic, Severity::Info], true),
(CoordClientSeverity::Debug3, vec![Severity::Debug, Severity::Log, Severity::Notice, Severity::Warning, Severity::Error, Severity::Fatal, Severity:: Panic, Severity::Info], true),
(CoordClientSeverity::Debug4, vec![Severity::Debug, Severity::Log, Severity::Notice, Severity::Warning, Severity::Error, Severity::Fatal, Severity:: Panic, Severity::Info], true),
(CoordClientSeverity::Debug5, vec![Severity::Debug, Severity::Log, Severity::Notice, Severity::Warning, Severity::Error, Severity::Fatal, Severity:: Panic, Severity::Info], true),
(CoordClientSeverity::Log, vec![Severity::Notice, Severity::Warning, Severity::Error, Severity::Fatal, Severity:: Panic, Severity::Info], true),
(CoordClientSeverity::Log, vec![Severity::Debug], false),
(CoordClientSeverity::Info, vec![Severity::Notice, Severity::Warning, Severity::Error, Severity::Fatal, Severity:: Panic, Severity::Info], true),
(CoordClientSeverity::Info, vec![Severity::Debug, Severity::Log], false),
(CoordClientSeverity::Notice, vec![Severity::Notice, Severity::Warning, Severity::Error, Severity::Fatal, Severity:: Panic, Severity::Info], true),
(CoordClientSeverity::Notice, vec![Severity::Debug, Severity::Log], false),
(CoordClientSeverity::Warning, vec![Severity::Warning, Severity::Error, Severity::Fatal, Severity:: Panic, Severity::Info], true),
(CoordClientSeverity::Warning, vec![Severity::Debug, Severity::Log, Severity::Notice], false),
(CoordClientSeverity::Error, vec![Severity::Error, Severity::Fatal, Severity:: Panic, Severity::Info], true),
(CoordClientSeverity::Error, vec![Severity::Debug, Severity::Log, Severity::Notice, Severity::Warning], false),
];

for test_case in test_cases {
run_test(test_case)
}

fn run_test(test_case: (CoordClientSeverity, Vec<Severity>, bool)) {
let client_min_messages_setting = test_case.0;
let expected = test_case.2;
for message_severity in test_case.1 {
assert!(
message_severity.should_output_to_client(&client_min_messages_setting)
== expected
)
}
}
}
}
36 changes: 36 additions & 0 deletions src/pgwire/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -949,6 +949,42 @@ where
Ok(State::Ready)
}

/// Sends a backend message to the client, after applying a severity filter.
///
/// The message is only sent if its severity is above the severity set
/// in the session, with the default value being NOTICE.
async fn send<M>(&mut self, message: M) -> Result<(), io::Error>
where
M: Into<BackendMessage>,
{
let message: BackendMessage = message.into();
match message {
BackendMessage::ErrorResponse(ref err) => {
let minimum_client_severity =
self.coord_client.session().vars().client_min_messages();
if err
.severity
.should_output_to_client(minimum_client_severity)
{
self.conn.send(message).await
} else {
Ok(())
}
}
_ => self.conn.send(message).await,
}
}

pub async fn send_all(
&mut self,
messages: impl IntoIterator<Item = BackendMessage>,
) -> Result<(), io::Error> {
for m in messages {
self.send(m).await?;
}
Ok(())
}

async fn sync(&mut self) -> Result<State, io::Error> {
// Close the current transaction if we are in an implicit transaction.
if self.coord_client.session().transaction().is_implicit() {
Expand Down

0 comments on commit 2eae0b8

Please sign in to comment.