Skip to content

Commit

Permalink
mz_system is the system user that is responsible for executing commands
Browse files Browse the repository at this point in the history
that only an administrator should be doing. This commit prevents
logging into the mz_system user from an external port. You can only
login to the mz_system user via internal ports.

Works towards resolving: MaterializeInc/cloud#3292
  • Loading branch information
jkosh44 committed Aug 10, 2022
1 parent d370474 commit 1c8a30c
Show file tree
Hide file tree
Showing 16 changed files with 139 additions and 57 deletions.
3 changes: 2 additions & 1 deletion src/adapter/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ pub mod builtin;
pub mod storage;

pub const SYSTEM_CONN_ID: ConnectionId = 0;
const SYSTEM_USER: &str = "mz_system";
pub const SYSTEM_USER: &str = "mz_system";
pub const HTTP_DEFAULT_USER: &str = "mz_http_default_user";
const CREATE_SQL_TODO: &str = "TODO";

/// A `Catalog` keeps track of the SQL objects known to the planner.
Expand Down
11 changes: 9 additions & 2 deletions src/adapter/src/catalog/builtin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ use mz_compute_client::logging::{ComputeLog, DifferentialLog, LogVariant, Timely
use mz_repr::{RelationDesc, ScalarType};
use mz_sql::catalog::{CatalogType, CatalogTypeDetails, NameReference, TypeReference};

use crate::catalog::{HTTP_DEFAULT_USER, SYSTEM_USER};

pub const MZ_TEMP_SCHEMA: &str = "mz_temp";
pub const MZ_CATALOG_SCHEMA: &str = "mz_catalog";
pub const PG_CATALOG_SCHEMA: &str = "pg_catalog";
Expand Down Expand Up @@ -2105,7 +2107,11 @@ AS SELECT
FROM mz_catalog.mz_roles r",
};

pub const MZ_SYSTEM: BuiltinRole = BuiltinRole { name: "mz_system" };
pub const MZ_SYSTEM: BuiltinRole = BuiltinRole { name: SYSTEM_USER };

pub const MZ_HTTP_DEFAULT_USER: BuiltinRole = BuiltinRole {
name: HTTP_DEFAULT_USER,
};

pub static BUILTINS_STATIC: Lazy<Vec<Builtin<NameReference>>> = Lazy::new(|| {
let mut builtins = vec![
Expand Down Expand Up @@ -2289,7 +2295,8 @@ pub static BUILTINS_STATIC: Lazy<Vec<Builtin<NameReference>>> = Lazy::new(|| {

builtins
});
pub static BUILTIN_ROLES: Lazy<Vec<BuiltinRole>> = Lazy::new(|| vec![MZ_SYSTEM]);
pub static BUILTIN_ROLES: Lazy<Vec<BuiltinRole>> =
Lazy::new(|| vec![MZ_SYSTEM, MZ_HTTP_DEFAULT_USER]);

#[allow(non_snake_case)]
pub mod BUILTINS {
Expand Down
27 changes: 25 additions & 2 deletions src/adapter/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use mz_ore::thread::JoinOnDropHandle;
use mz_repr::{Datum, GlobalId, Row, ScalarType};
use mz_sql::ast::{Raw, Statement};

use crate::catalog::SYSTEM_USER;
use crate::command::{
Canceled, Command, ExecuteResponse, Response, SimpleExecuteResponse, SimpleResult,
StartupResponse,
Expand Down Expand Up @@ -67,6 +68,15 @@ impl Handle {
}
}

/// Distinguishes between external and internal clients.
#[derive(Debug, Clone)]
pub enum ClientType {
/// Client that has connected over an external port.
External,
/// Client that has connected over an internal port.
Internal,
}

/// A coordinator client.
///
/// A coordinator client is a simple handle to a communication channel with the
Expand All @@ -78,13 +88,15 @@ impl Handle {
pub struct Client {
cmd_tx: mpsc::UnboundedSender<Command>,
id_alloc: Arc<IdAllocator<ConnectionId>>,
client_type: ClientType,
}

impl Client {
pub(crate) fn new(cmd_tx: mpsc::UnboundedSender<Command>) -> Client {
pub(crate) fn new(cmd_tx: mpsc::UnboundedSender<Command>, client_type: ClientType) -> Client {
Client {
cmd_tx,
id_alloc: Arc::new(IdAllocator::new(1, 1 << 16)),
client_type,
}
}

Expand All @@ -103,7 +115,7 @@ impl Client {
/// a system user.
pub async fn system_execute(&self, stmts: &str) -> Result<SimpleExecuteResponse, AdapterError> {
let conn_client = self.new_conn()?;
let session = Session::new(conn_client.conn_id(), "mz_system".into());
let session = Session::new(conn_client.conn_id(), SYSTEM_USER.into());
let (mut session_client, _) = conn_client.startup(session, false).await?;
session_client.simple_execute(stmts).await
}
Expand Down Expand Up @@ -152,6 +164,8 @@ impl ConnClient {
session: Session,
create_user_if_not_exists: bool,
) -> Result<(SessionClient, StartupResponse), AdapterError> {
self.validate_user(session.user())?;

// Cancellation works by creating a watch channel (which remembers only
// the last value sent to it) and sharing it between the coordinator and
// connection. The coordinator will send a canceled message on it if a
Expand Down Expand Up @@ -186,6 +200,15 @@ impl ConnClient {
}
}

/// Validate that the user is allowed to login.
fn validate_user(&self, user: &str) -> Result<(), AdapterError> {
if user == SYSTEM_USER && !matches!(self.inner.client_type, ClientType::Internal) {
return Err(AdapterError::UnauthorizedLogin(user.to_string()));
}

Ok(())
}

/// Cancels the query currently running on another connection.
pub async fn cancel_request(&mut self, conn_id: ConnectionId, secret_key: u32) {
self.inner
Expand Down
11 changes: 6 additions & 5 deletions src/adapter/src/coord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ use chrono::{DateTime, Utc};
use derivative::Derivative;
use futures::StreamExt;
use itertools::Itertools;
use mz_ore::tracing::OpenTelemetryContext;
use rand::seq::SliceRandom;
use timely::progress::{Antichain, Timestamp as TimelyTimestamp};
use tokio::runtime::Handle as TokioHandle;
Expand All @@ -94,6 +93,7 @@ use mz_ore::metrics::MetricsRegistry;
use mz_ore::now::NowFn;
use mz_ore::stack;
use mz_ore::thread::JoinHandleExt;
use mz_ore::tracing::OpenTelemetryContext;
use mz_persist_client::usage::StorageUsageClient;
use mz_repr::{Datum, Diff, GlobalId, Row, Timestamp};
use mz_secrets::SecretsController;
Expand All @@ -112,7 +112,7 @@ use crate::catalog::{
self, storage, BuiltinMigrationMetadata, BuiltinTableUpdate, Catalog, CatalogItem,
ClusterReplicaSizeMap, Sink, SinkConnectionState, StorageHostSizeMap,
};
use crate::client::{Client, ConnectionId, Handle};
use crate::client::{Client, ClientType, ConnectionId, Handle};
use crate::command::{Canceled, Command, ExecuteResponse};
use crate::coord::appends::{AdvanceLocalInput, Deferred, PendingWriteTxn};
use crate::coord::id_bundle::CollectionIdBundle;
Expand Down Expand Up @@ -840,7 +840,7 @@ pub async fn serve<S: Append + 'static>(
connection_context,
storage_usage_client,
}: Config<S>,
) -> Result<(Handle, Client), AdapterError> {
) -> Result<(Handle, Client, Client), AdapterError> {
let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
let (internal_cmd_tx, internal_cmd_rx) = mpsc::unbounded_channel();
let (strict_serializable_reads_tx, strict_serializable_reads_rx) = mpsc::unbounded_channel();
Expand Down Expand Up @@ -957,8 +957,9 @@ pub async fn serve<S: Append + 'static>(
start_instant,
_thread: thread.join_on_drop(),
};
let client = Client::new(cmd_tx);
Ok((handle, client))
let external_client = Client::new(cmd_tx.clone(), ClientType::External);
let internal_client = Client::new(cmd_tx, ClientType::Internal);
Ok((handle, external_client, internal_client))
}
Err(e) => Err(e),
}
Expand Down
5 changes: 5 additions & 0 deletions src/adapter/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ pub enum AdapterError {
TailOnlyTransaction,
/// An error occurred in the MIR stage of the optimizer.
Transform(TransformError),
/// Unauthorized to login using the provided user.
UnauthorizedLogin(String),
/// The specified function cannot be called
UncallableFunction {
func: UnmaterializableFunc,
Expand Down Expand Up @@ -364,6 +366,9 @@ impl fmt::Display for AdapterError {
f.write_str("TAIL in transactions must be the only read statement")
}
AdapterError::Transform(e) => e.fmt(f),
AdapterError::UnauthorizedLogin(user) => {
write!(f, "unauthorized login to user '{user}'")
}
AdapterError::UncallableFunction { func, context } => {
write!(f, "cannot call {} in {}", func, context)
}
Expand Down
7 changes: 4 additions & 3 deletions src/adapter/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,20 @@ use mz_sql::ast::{Raw, Statement, TransactionAccessMode};
use mz_sql::plan::{Params, PlanContext, StatementDesc};
use mz_sql_parser::ast::TransactionIsolationLevel;

use crate::catalog::SYSTEM_USER;
use crate::client::ConnectionId;
use crate::coord::peek::PeekResponseUnary;
use crate::coord::CoordTimestamp;
use crate::error::AdapterError;
use crate::session::vars::IsolationLevel;

pub(crate) mod vars;

pub use self::vars::{
ClientSeverity, SessionVars, Var, DEFAULT_DATABASE_NAME, SERVER_MAJOR_VERSION,
SERVER_MINOR_VERSION, SERVER_PATCH_VERSION,
};

pub(crate) mod vars;

const DUMMY_CONNECTION_ID: ConnectionId = 0;

/// A session holds per-connection state.
Expand Down Expand Up @@ -67,7 +68,7 @@ impl<T: CoordTimestamp> Session<T> {
/// Dummy sessions are intended for use when executing queries on behalf of
/// the system itself, rather than on behalf of a user.
pub fn dummy() -> Session<T> {
Self::new_internal(DUMMY_CONNECTION_ID, "mz_system".into())
Self::new_internal(DUMMY_CONNECTION_ID, SYSTEM_USER.into())
}

fn new_internal(conn_id: ConnectionId, user: String) -> Session<T> {
Expand Down
11 changes: 5 additions & 6 deletions src/environmentd/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,7 @@ use http::header::{AUTHORIZATION, CONTENT_TYPE};
use http::{Request, StatusCode};
use hyper::server::conn::AddrIncoming;
use hyper_openssl::MaybeHttpsStream;
use mz_adapter::SessionClient;
use mz_ore::metrics::MetricsRegistry;
use mz_ore::tracing::OpenTelemetryEnableCallback;
use mz_adapter::catalog::HTTP_DEFAULT_USER;
use openssl::nid::Nid;
use openssl::ssl::{Ssl, SslContext};
use openssl::x509::X509;
Expand All @@ -48,7 +46,10 @@ use tower_http::cors::{AllowOrigin, Any, CorsLayer};
use tracing::{error, warn};

use mz_adapter::session::Session;
use mz_adapter::SessionClient;
use mz_frontegg_auth::{FronteggAuthentication, FronteggError};
use mz_ore::metrics::MetricsRegistry;
use mz_ore::tracing::OpenTelemetryEnableCallback;

use crate::BUILD_INFO;

Expand All @@ -57,8 +58,6 @@ mod memory;
mod root;
mod sql;

const SYSTEM_USER: &str = "mz_system";

#[derive(Debug, Clone)]
pub struct Config {
pub tls: Option<TlsConfig>,
Expand Down Expand Up @@ -278,7 +277,7 @@ async fn auth<B>(
let user = match frontegg {
// If no Frontegg authentication, we can use the cert's username if
// present, otherwise the system user.
None => user.unwrap_or_else(|| SYSTEM_USER.to_string()),
None => user.unwrap_or_else(|| HTTP_DEFAULT_USER.to_string()),
// If we require Frontegg auth, fetch credentials from the HTTP auth
// header. Basic auth comes with a username/password, where the password
// is the client+secret pair. Bearer auth is an existing JWT that must
Expand Down
41 changes: 21 additions & 20 deletions src/environmentd/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use tokio::net::TcpListener;
use tokio::sync::oneshot;
use tokio_stream::wrappers::TcpListenerStream;
use tower_http::cors::AllowOrigin;
use tracing::{error, info};

use mz_adapter::catalog::storage::BootstrapArgs;
use mz_adapter::catalog::{ClusterReplicaSizeMap, StorageHostSizeMap};
Expand All @@ -39,7 +40,6 @@ use mz_ore::tracing::OpenTelemetryEnableCallback;
use mz_persist_client::usage::StorageUsageClient;
use mz_secrets::SecretsController;
use mz_storage::types::connections::ConnectionContext;
use tracing::{error, info};

use crate::tcp_connection::ConnectionHandler;

Expand Down Expand Up @@ -259,22 +259,23 @@ pub async fn serve(config: Config) -> Result<Server, anyhow::Error> {
// Initialize controller.
let controller = mz_controller::Controller::new(config.controller).await;
// Initialize adapter.
let (adapter_handle, adapter_client) = mz_adapter::serve(mz_adapter::Config {
dataflow_client: controller,
storage: adapter_storage,
unsafe_mode: config.unsafe_mode,
build_info: &BUILD_INFO,
metrics_registry: config.metrics_registry.clone(),
now: config.now,
secrets_controller: config.secrets_controller,
cluster_replica_sizes: config.cluster_replica_sizes,
storage_host_sizes: config.storage_host_sizes,
default_storage_host_size: config.default_storage_host_size,
availability_zones: config.availability_zones,
connection_context: config.connection_context,
storage_usage_client,
})
.await?;
let (adapter_handle, external_adapter_client, internal_adapter_client) =
mz_adapter::serve(mz_adapter::Config {
dataflow_client: controller,
storage: adapter_storage,
unsafe_mode: config.unsafe_mode,
build_info: &BUILD_INFO,
metrics_registry: config.metrics_registry.clone(),
now: config.now,
secrets_controller: config.secrets_controller,
cluster_replica_sizes: config.cluster_replica_sizes,
storage_host_sizes: config.storage_host_sizes,
default_storage_host_size: config.default_storage_host_size,
availability_zones: config.availability_zones,
connection_context: config.connection_context,
storage_usage_client,
})
.await?;

// TODO(benesch): replace both `TCPListenerStream`s below with
// `<type>_listener.incoming()` if that is
Expand All @@ -291,7 +292,7 @@ pub async fn serve(config: Config) -> Result<Server, anyhow::Error> {
task::spawn(|| "pgwire_server", {
let pgwire_server = mz_pgwire::Server::new(mz_pgwire::Config {
tls: pgwire_tls,
adapter_client: adapter_client.clone(),
adapter_client: external_adapter_client.clone(),
frontegg: config.frontegg.clone(),
});

Expand All @@ -311,7 +312,7 @@ pub async fn serve(config: Config) -> Result<Server, anyhow::Error> {
task::spawn(|| "internal_pgwire_server", {
let internal_pgwire_server = mz_pgwire::Server::new(mz_pgwire::Config {
tls: None,
adapter_client: adapter_client.clone(),
adapter_client: internal_adapter_client.clone(),
frontegg: None,
});
let mut incoming = TcpListenerStream::new(internal_sql_listener);
Expand All @@ -330,7 +331,7 @@ pub async fn serve(config: Config) -> Result<Server, anyhow::Error> {
let http_server = http::Server::new(http::Config {
tls: http_tls,
frontegg: config.frontegg,
adapter_client,
adapter_client: external_adapter_client,
allowed_origin: config.cors_allowed_origin,
});
let mut incoming = TcpListenerStream::new(http_listener);
Expand Down
Loading

0 comments on commit 1c8a30c

Please sign in to comment.