Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adapter: Prevent login to mz_system externally #14093

Merged
merged 14 commits into from
Aug 15, 2022
3 changes: 2 additions & 1 deletion src/adapter/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ pub mod builtin;
pub mod storage;

pub const SYSTEM_CONN_ID: ConnectionId = 0;
const SYSTEM_USER: &str = "mz_system";
pub const SYSTEM_USER: &str = "mz_system";
pub const HTTP_DEFAULT_USER: &str = "anonymous_http_user";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should start with mz_ because that's guaranteed to be a protected username suffix.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is possible because the way this works is that anonymous_http_user doesn't exist unless someone tries to use the HTTP API with a username. Then we will create anonymous_http_user on the spot as a user Role. However we don't allow any user roles to be created with the prefix mz_ because it's a reserved prefix.

const CREATE_SQL_TODO: &str = "TODO";

/// A `Catalog` keeps track of the SQL objects known to the planner.
Expand Down
4 changes: 3 additions & 1 deletion src/adapter/src/catalog/builtin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ use mz_compute_client::logging::{ComputeLog, DifferentialLog, LogVariant, Timely
use mz_repr::{RelationDesc, ScalarType};
use mz_sql::catalog::{CatalogType, CatalogTypeDetails, NameReference, TypeReference};

use crate::catalog::SYSTEM_USER;

pub const MZ_TEMP_SCHEMA: &str = "mz_temp";
pub const MZ_CATALOG_SCHEMA: &str = "mz_catalog";
pub const PG_CATALOG_SCHEMA: &str = "pg_catalog";
Expand Down Expand Up @@ -2104,7 +2106,7 @@ AS SELECT
FROM mz_catalog.mz_roles r",
};

pub const MZ_SYSTEM: BuiltinRole = BuiltinRole { name: "mz_system" };
pub const MZ_SYSTEM: BuiltinRole = BuiltinRole { name: SYSTEM_USER };

pub static BUILTINS_STATIC: Lazy<Vec<Builtin<NameReference>>> = Lazy::new(|| {
let mut builtins = vec![
Expand Down
26 changes: 24 additions & 2 deletions src/adapter/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use mz_ore::thread::JoinOnDropHandle;
use mz_repr::{Datum, GlobalId, Row, ScalarType};
use mz_sql::ast::{Raw, Statement};

use crate::catalog::SYSTEM_USER;
use crate::command::{
Canceled, Command, ExecuteResponse, Response, SimpleExecuteResponse, SimpleResult,
StartupResponse,
Expand Down Expand Up @@ -67,6 +68,15 @@ impl Handle {
}
}

/// Distinguishes between external and internal clients.
#[derive(Debug, Clone)]
pub enum ClientType {
/// Client that has connected over an external port.
External,
/// Client that has connected over an internal port.
Internal,
}

/// A coordinator client.
///
/// A coordinator client is a simple handle to a communication channel with the
Expand All @@ -78,13 +88,15 @@ impl Handle {
pub struct Client {
cmd_tx: mpsc::UnboundedSender<Command>,
id_alloc: Arc<IdAllocator<ConnectionId>>,
client_type: ClientType,
}

impl Client {
pub(crate) fn new(cmd_tx: mpsc::UnboundedSender<Command>) -> Client {
pub(crate) fn new(cmd_tx: mpsc::UnboundedSender<Command>, client_type: ClientType) -> Client {
Client {
cmd_tx,
id_alloc: Arc::new(IdAllocator::new(1, 1 << 16)),
client_type,
}
}

Expand All @@ -103,7 +115,7 @@ impl Client {
/// a system user.
pub async fn system_execute(&self, stmts: &str) -> Result<SimpleExecuteResponse, AdapterError> {
let conn_client = self.new_conn()?;
let session = Session::new(conn_client.conn_id(), "mz_system".into());
let session = Session::new(conn_client.conn_id(), SYSTEM_USER.into());
let (mut session_client, _) = conn_client.startup(session, false).await?;
session_client.simple_execute(stmts).await
}
Expand All @@ -118,6 +130,11 @@ impl Client {
let response = self.system_execute(stmt).await?;
Ok(response.results.into_element())
}

/// Returns the `ClientType` associated with this client.
pub fn client_type(&self) -> &ClientType {
&self.client_type
}
}

/// A coordinator client that is bound to a connection.
Expand All @@ -138,6 +155,11 @@ impl ConnClient {
self.conn_id
}

/// Returns the `ClientType` associated with this client.
pub fn client_type(&self) -> &ClientType {
self.inner.client_type()
}

/// Upgrades this connection client to a session client.
///
/// A session is a connection that has successfully negotiated parameters,
Expand Down
11 changes: 6 additions & 5 deletions src/adapter/src/coord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ use chrono::{DateTime, Utc};
use derivative::Derivative;
use futures::StreamExt;
use itertools::Itertools;
use mz_ore::tracing::OpenTelemetryContext;
use rand::seq::SliceRandom;
use timely::progress::Timestamp as _;
use tokio::runtime::Handle as TokioHandle;
Expand All @@ -95,6 +94,7 @@ use mz_ore::metrics::MetricsRegistry;
use mz_ore::now::NowFn;
use mz_ore::stack;
use mz_ore::thread::JoinHandleExt;
use mz_ore::tracing::OpenTelemetryContext;
use mz_persist_client::usage::StorageUsageClient;
use mz_persist_client::ShardId;
use mz_repr::{Datum, Diff, GlobalId, Row, Timestamp};
Expand All @@ -114,7 +114,7 @@ use crate::catalog::{
self, storage, BuiltinMigrationMetadata, BuiltinTableUpdate, Catalog, CatalogItem,
ClusterReplicaSizeMap, StorageHostSizeMap, StorageSinkConnectionState,
};
use crate::client::{Client, ConnectionId, Handle};
use crate::client::{Client, ClientType, ConnectionId, Handle};
use crate::command::{Canceled, Command, ExecuteResponse};
use crate::coord::appends::{BuiltinTableUpdateSource, Deferred, PendingWriteTxn};
use crate::coord::id_bundle::CollectionIdBundle;
Expand Down Expand Up @@ -815,7 +815,7 @@ pub async fn serve<S: Append + 'static>(
connection_context,
storage_usage_client,
}: Config<S>,
) -> Result<(Handle, Client), AdapterError> {
) -> Result<(Handle, Client, Client), AdapterError> {
let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
let (internal_cmd_tx, internal_cmd_rx) = mpsc::unbounded_channel();
let (strict_serializable_reads_tx, strict_serializable_reads_rx) = mpsc::unbounded_channel();
Expand Down Expand Up @@ -933,8 +933,9 @@ pub async fn serve<S: Append + 'static>(
start_instant,
_thread: thread.join_on_drop(),
};
let client = Client::new(cmd_tx);
Ok((handle, client))
let external_client = Client::new(cmd_tx.clone(), ClientType::External);
let internal_client = Client::new(cmd_tx, ClientType::Internal);
Ok((handle, external_client, internal_client))
}
Err(e) => Err(e),
}
Expand Down
2 changes: 1 addition & 1 deletion src/adapter/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ macro_rules! coord_bail {
}
}

mod client;
mod command;
mod coord;
mod error;
Expand All @@ -42,6 +41,7 @@ mod tail;
mod util;

pub mod catalog;
pub mod client;
pub mod session;

pub use crate::client::{Client, ConnClient, Handle, SessionClient};
Expand Down
7 changes: 4 additions & 3 deletions src/adapter/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,20 @@ use mz_sql::ast::{Raw, Statement, TransactionAccessMode};
use mz_sql::plan::{Params, PlanContext, StatementDesc};
use mz_sql_parser::ast::TransactionIsolationLevel;

use crate::catalog::SYSTEM_USER;
use crate::client::ConnectionId;
use crate::coord::peek::PeekResponseUnary;
use crate::coord::CoordTimestamp;
use crate::error::AdapterError;
use crate::session::vars::IsolationLevel;

pub(crate) mod vars;

pub use self::vars::{
ClientSeverity, SessionVars, Var, DEFAULT_DATABASE_NAME, SERVER_MAJOR_VERSION,
SERVER_MINOR_VERSION, SERVER_PATCH_VERSION,
};

pub(crate) mod vars;

const DUMMY_CONNECTION_ID: ConnectionId = 0;

/// A session holds per-connection state.
Expand Down Expand Up @@ -67,7 +68,7 @@ impl<T: CoordTimestamp> Session<T> {
/// Dummy sessions are intended for use when executing queries on behalf of
/// the system itself, rather than on behalf of a user.
pub fn dummy() -> Session<T> {
Self::new_internal(DUMMY_CONNECTION_ID, "mz_system".into())
Self::new_internal(DUMMY_CONNECTION_ID, SYSTEM_USER.into())
}

fn new_internal(conn_id: ConnectionId, user: String) -> Session<T> {
Expand Down
29 changes: 20 additions & 9 deletions src/environmentd/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,6 @@ use http::header::{AUTHORIZATION, CONTENT_TYPE};
use http::{Request, StatusCode};
use hyper::server::conn::AddrIncoming;
use hyper_openssl::MaybeHttpsStream;
use mz_adapter::SessionClient;
use mz_ore::metrics::MetricsRegistry;
use mz_ore::tracing::OpenTelemetryEnableCallback;
use openssl::nid::Nid;
use openssl::ssl::{Ssl, SslContext};
use openssl::x509::X509;
Expand All @@ -47,8 +44,12 @@ use tokio_openssl::SslStream;
use tower_http::cors::{AllowOrigin, Any, CorsLayer};
use tracing::{error, warn};

use mz_adapter::catalog::HTTP_DEFAULT_USER;
use mz_adapter::session::Session;
use mz_adapter::SessionClient;
use mz_frontegg_auth::{FronteggAuthentication, FronteggError};
use mz_ore::metrics::MetricsRegistry;
use mz_ore::tracing::OpenTelemetryEnableCallback;

use crate::BUILD_INFO;

Expand All @@ -57,8 +58,6 @@ mod memory;
mod root;
mod sql;

const SYSTEM_USER: &str = "mz_system";

#[derive(Debug, Clone)]
pub struct Config {
pub tls: Option<TlsConfig>,
Expand Down Expand Up @@ -113,11 +112,11 @@ impl Server {
)
.nest("/prof/", mz_prof::http::router(&BUILD_INFO))
.route("/static/*path", routing::get(root::handle_static))
.layer(Extension(adapter_client))
.layer(middleware::from_fn(move |req, next| {
let frontegg = Arc::clone(&frontegg);
async move { auth(req, next, tls_mode, &frontegg).await }
}))
.layer(Extension(adapter_client))
.layer(
CorsLayer::new()
.allow_credentials(false)
Expand Down Expand Up @@ -221,6 +220,8 @@ enum AuthError {
HttpsRequired,
#[error("invalid username in client certificate")]
InvalidCertUserName,
#[error("unauthorized login to user '{0}'")]
InvalidLogin(String),
#[error("{0}")]
Frontegg(#[from] FronteggError),
#[error("missing authorization header")]
Expand Down Expand Up @@ -277,8 +278,8 @@ async fn auth<B>(
// Then, handle Frontegg authentication if required.
let user = match frontegg {
// If no Frontegg authentication, we can use the cert's username if
// present, otherwise the system user.
None => user.unwrap_or_else(|| SYSTEM_USER.to_string()),
// present, otherwise the default HTTP user.
None => user.unwrap_or_else(|| HTTP_DEFAULT_USER.to_string()),
// If we require Frontegg auth, fetch credentials from the HTTP auth
// header. Basic auth comes with a username/password, where the password
// is the client+secret pair. Bearer auth is an existing JWT that must
Expand Down Expand Up @@ -308,11 +309,21 @@ async fn auth<B>(
}
};

// Validate that mz_system only logs in via an internal port.
let adapter_client = req.extensions().get::<mz_adapter::Client>().unwrap();
match (adapter_client.client_type(), user.as_str()) {
(mz_adapter::client::ClientType::External, mz_adapter::catalog::SYSTEM_USER) => {
return Err(AuthError::InvalidLogin(user))
}
(mz_adapter::client::ClientType::Internal, _)
| (mz_adapter::client::ClientType::External, _) => {}
};

// Add the authenticated user as an extension so downstream handlers can
// inspect it if necessary.
req.extensions_mut().insert(AuthedUser {
user,
create_if_not_exists: frontegg.is_some(),
create_if_not_exists: frontegg.is_some() || !matches!(tls_mode, Some(TlsMode::AssumeUser)),
});

// Run the request.
Expand Down
41 changes: 21 additions & 20 deletions src/environmentd/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use tokio::net::TcpListener;
use tokio::sync::oneshot;
use tokio_stream::wrappers::TcpListenerStream;
use tower_http::cors::AllowOrigin;
use tracing::error;

use mz_adapter::catalog::storage::BootstrapArgs;
use mz_adapter::catalog::{ClusterReplicaSizeMap, StorageHostSizeMap};
Expand All @@ -39,7 +40,6 @@ use mz_ore::tracing::OpenTelemetryEnableCallback;
use mz_persist_client::usage::StorageUsageClient;
use mz_secrets::SecretsController;
use mz_storage::types::connections::ConnectionContext;
use tracing::error;

use crate::tcp_connection::ConnectionHandler;

Expand Down Expand Up @@ -250,22 +250,23 @@ pub async fn serve(config: Config) -> Result<Server, anyhow::Error> {
let controller = mz_controller::Controller::new(config.controller).await;

// Initialize adapter.
let (adapter_handle, adapter_client) = mz_adapter::serve(mz_adapter::Config {
dataflow_client: controller,
storage: adapter_storage,
unsafe_mode: config.unsafe_mode,
build_info: &BUILD_INFO,
metrics_registry: config.metrics_registry.clone(),
now: config.now,
secrets_controller: config.secrets_controller,
cluster_replica_sizes: config.cluster_replica_sizes,
storage_host_sizes: config.storage_host_sizes,
default_storage_host_size: config.default_storage_host_size,
availability_zones: config.availability_zones,
connection_context: config.connection_context,
storage_usage_client,
})
.await?;
let (adapter_handle, external_adapter_client, internal_adapter_client) =
mz_adapter::serve(mz_adapter::Config {
dataflow_client: controller,
storage: adapter_storage,
unsafe_mode: config.unsafe_mode,
build_info: &BUILD_INFO,
metrics_registry: config.metrics_registry.clone(),
now: config.now,
secrets_controller: config.secrets_controller,
cluster_replica_sizes: config.cluster_replica_sizes,
storage_host_sizes: config.storage_host_sizes,
default_storage_host_size: config.default_storage_host_size,
availability_zones: config.availability_zones,
connection_context: config.connection_context,
storage_usage_client,
})
.await?;

// TODO(benesch): replace both `TCPListenerStream`s below with
// `<type>_listener.incoming()` if that is
Expand All @@ -282,7 +283,7 @@ pub async fn serve(config: Config) -> Result<Server, anyhow::Error> {
task::spawn(|| "pgwire_server", {
let pgwire_server = mz_pgwire::Server::new(mz_pgwire::Config {
tls: pgwire_tls,
adapter_client: adapter_client.clone(),
adapter_client: external_adapter_client.clone(),
frontegg: config.frontegg.clone(),
});

Expand All @@ -302,7 +303,7 @@ pub async fn serve(config: Config) -> Result<Server, anyhow::Error> {
task::spawn(|| "internal_pgwire_server", {
let internal_pgwire_server = mz_pgwire::Server::new(mz_pgwire::Config {
tls: None,
adapter_client: adapter_client.clone(),
adapter_client: internal_adapter_client,
frontegg: None,
});
let mut incoming = TcpListenerStream::new(internal_sql_listener);
Expand All @@ -321,7 +322,7 @@ pub async fn serve(config: Config) -> Result<Server, anyhow::Error> {
let http_server = http::Server::new(http::Config {
tls: http_tls,
frontegg: config.frontegg,
adapter_client,
adapter_client: external_adapter_client,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where does the internal http server get its internal adapter client?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit perplexing to me. The internal http port uses the http::InternalServer struct (initialized above all of this on line 204) which doesn't use an adapter client and has the following endpoints:

  • /metrics
  • /api/livez
  • /api/opentelemetry/config

The external http port uses the http::Server struct which uses an adapter client and has the following endpoints:

  • /
  • /api/internal/catalog (This one is really weird since it contains "internal" in it's path but is only accessible from the external port).
  • /api/sql
  • /memory
  • /hierarchical-memory
  • /prof/
  • /static/*path

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, good point! We should move that internal catalog API to the internal HTTP server.

allowed_origin: config.cors_allowed_origin,
});
let mut incoming = TcpListenerStream::new(http_listener);
Expand Down
Loading