Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adapter: Prevent login to mz_system externally #14093

Merged
merged 14 commits into from
Aug 15, 2022
3 changes: 2 additions & 1 deletion src/adapter/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ pub mod builtin;
pub mod storage;

pub const SYSTEM_CONN_ID: ConnectionId = 0;
const SYSTEM_USER: &str = "mz_system";
pub const SYSTEM_USER: &str = "mz_system";
pub const HTTP_DEFAULT_USER: &str = "mz_http_default_user";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very slight preference for this to be mz_http_internal_user because default suggests that it's a fallback and is regularly used on the external http connection.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is actually only used on the external http connection as a fallback if there's no user authenticated. So maybe it should be mz_http_default_external_user?

const CREATE_SQL_TODO: &str = "TODO";

/// A `Catalog` keeps track of the SQL objects known to the planner.
Expand Down
11 changes: 9 additions & 2 deletions src/adapter/src/catalog/builtin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ use mz_compute_client::logging::{ComputeLog, DifferentialLog, LogVariant, Timely
use mz_repr::{RelationDesc, ScalarType};
use mz_sql::catalog::{CatalogType, CatalogTypeDetails, NameReference, TypeReference};

use crate::catalog::{HTTP_DEFAULT_USER, SYSTEM_USER};

pub const MZ_TEMP_SCHEMA: &str = "mz_temp";
pub const MZ_CATALOG_SCHEMA: &str = "mz_catalog";
pub const PG_CATALOG_SCHEMA: &str = "pg_catalog";
Expand Down Expand Up @@ -2105,7 +2107,11 @@ AS SELECT
FROM mz_catalog.mz_roles r",
};

pub const MZ_SYSTEM: BuiltinRole = BuiltinRole { name: "mz_system" };
pub const MZ_SYSTEM: BuiltinRole = BuiltinRole { name: SYSTEM_USER };

pub const MZ_HTTP_DEFAULT_USER: BuiltinRole = BuiltinRole {
name: HTTP_DEFAULT_USER,
};

pub static BUILTINS_STATIC: Lazy<Vec<Builtin<NameReference>>> = Lazy::new(|| {
let mut builtins = vec![
Expand Down Expand Up @@ -2289,7 +2295,8 @@ pub static BUILTINS_STATIC: Lazy<Vec<Builtin<NameReference>>> = Lazy::new(|| {

builtins
});
pub static BUILTIN_ROLES: Lazy<Vec<BuiltinRole>> = Lazy::new(|| vec![MZ_SYSTEM]);
pub static BUILTIN_ROLES: Lazy<Vec<BuiltinRole>> =
Lazy::new(|| vec![MZ_SYSTEM, MZ_HTTP_DEFAULT_USER]);

#[allow(non_snake_case)]
pub mod BUILTINS {
Expand Down
27 changes: 25 additions & 2 deletions src/adapter/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use mz_ore::thread::JoinOnDropHandle;
use mz_repr::{Datum, GlobalId, Row, ScalarType};
use mz_sql::ast::{Raw, Statement};

use crate::catalog::SYSTEM_USER;
use crate::command::{
Canceled, Command, ExecuteResponse, Response, SimpleExecuteResponse, SimpleResult,
StartupResponse,
Expand Down Expand Up @@ -67,6 +68,15 @@ impl Handle {
}
}

/// Distinguishes between external and internal clients.
#[derive(Debug, Clone)]
pub enum ClientType {
/// Client that has connected over an external port.
External,
/// Client that has connected over an internal port.
Internal,
}

/// A coordinator client.
///
/// A coordinator client is a simple handle to a communication channel with the
Expand All @@ -78,13 +88,15 @@ impl Handle {
pub struct Client {
cmd_tx: mpsc::UnboundedSender<Command>,
id_alloc: Arc<IdAllocator<ConnectionId>>,
client_type: ClientType,
}

impl Client {
pub(crate) fn new(cmd_tx: mpsc::UnboundedSender<Command>) -> Client {
pub(crate) fn new(cmd_tx: mpsc::UnboundedSender<Command>, client_type: ClientType) -> Client {
Client {
cmd_tx,
id_alloc: Arc::new(IdAllocator::new(1, 1 << 16)),
client_type,
}
}

Expand All @@ -103,7 +115,7 @@ impl Client {
/// a system user.
pub async fn system_execute(&self, stmts: &str) -> Result<SimpleExecuteResponse, AdapterError> {
let conn_client = self.new_conn()?;
let session = Session::new(conn_client.conn_id(), "mz_system".into());
let session = Session::new(conn_client.conn_id(), SYSTEM_USER.into());
let (mut session_client, _) = conn_client.startup(session, false).await?;
session_client.simple_execute(stmts).await
}
Expand Down Expand Up @@ -152,6 +164,8 @@ impl ConnClient {
session: Session,
create_user_if_not_exists: bool,
) -> Result<(SessionClient, StartupResponse), AdapterError> {
self.is_user_disallowed_by_client_type(session.user())?;

// Cancellation works by creating a watch channel (which remembers only
// the last value sent to it) and sharing it between the coordinator and
// connection. The coordinator will send a canceled message on it if a
Expand Down Expand Up @@ -186,6 +200,15 @@ impl ConnClient {
}
}

/// Returns an error if the user is not permitted to log in to the ClientType.
fn is_user_disallowed_by_client_type(&self, user: &str) -> Result<(), AdapterError> {
match (&self.inner.client_type, user) {
(ClientType::Internal, SYSTEM_USER) => Ok(()),
(ClientType::Internal, _) => Err(AdapterError::UnauthorizedLogin(user.to_string())),
(ClientType::External, _) => Ok(()),
}
}

/// Cancels the query currently running on another connection.
pub async fn cancel_request(&mut self, conn_id: ConnectionId, secret_key: u32) {
self.inner
Expand Down
11 changes: 6 additions & 5 deletions src/adapter/src/coord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ use chrono::{DateTime, Utc};
use derivative::Derivative;
use futures::StreamExt;
use itertools::Itertools;
use mz_ore::tracing::OpenTelemetryContext;
use rand::seq::SliceRandom;
use timely::progress::{Antichain, Timestamp as TimelyTimestamp};
use tokio::runtime::Handle as TokioHandle;
Expand All @@ -94,6 +93,7 @@ use mz_ore::metrics::MetricsRegistry;
use mz_ore::now::NowFn;
use mz_ore::stack;
use mz_ore::thread::JoinHandleExt;
use mz_ore::tracing::OpenTelemetryContext;
use mz_persist_client::usage::StorageUsageClient;
use mz_repr::{Datum, Diff, GlobalId, Row, Timestamp};
use mz_secrets::SecretsController;
Expand All @@ -112,7 +112,7 @@ use crate::catalog::{
self, storage, BuiltinMigrationMetadata, BuiltinTableUpdate, Catalog, CatalogItem,
ClusterReplicaSizeMap, Sink, SinkConnectionState, StorageHostSizeMap,
};
use crate::client::{Client, ConnectionId, Handle};
use crate::client::{Client, ClientType, ConnectionId, Handle};
use crate::command::{Canceled, Command, ExecuteResponse};
use crate::coord::appends::{AdvanceLocalInput, Deferred, PendingWriteTxn};
use crate::coord::id_bundle::CollectionIdBundle;
Expand Down Expand Up @@ -840,7 +840,7 @@ pub async fn serve<S: Append + 'static>(
connection_context,
storage_usage_client,
}: Config<S>,
) -> Result<(Handle, Client), AdapterError> {
) -> Result<(Handle, Client, Client), AdapterError> {
let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
let (internal_cmd_tx, internal_cmd_rx) = mpsc::unbounded_channel();
let (strict_serializable_reads_tx, strict_serializable_reads_rx) = mpsc::unbounded_channel();
Expand Down Expand Up @@ -957,8 +957,9 @@ pub async fn serve<S: Append + 'static>(
start_instant,
_thread: thread.join_on_drop(),
};
let client = Client::new(cmd_tx);
Ok((handle, client))
let external_client = Client::new(cmd_tx.clone(), ClientType::External);
let internal_client = Client::new(cmd_tx, ClientType::Internal);
Ok((handle, external_client, internal_client))
}
Err(e) => Err(e),
}
Expand Down
5 changes: 5 additions & 0 deletions src/adapter/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ pub enum AdapterError {
TailOnlyTransaction,
/// An error occurred in the MIR stage of the optimizer.
Transform(TransformError),
/// Unauthorized to login using the provided user.
UnauthorizedLogin(String),
/// The specified function cannot be called
UncallableFunction {
func: UnmaterializableFunc,
Expand Down Expand Up @@ -364,6 +366,9 @@ impl fmt::Display for AdapterError {
f.write_str("TAIL in transactions must be the only read statement")
}
AdapterError::Transform(e) => e.fmt(f),
AdapterError::UnauthorizedLogin(user) => {
write!(f, "unauthorized login to user '{user}'")
}
AdapterError::UncallableFunction { func, context } => {
write!(f, "cannot call {} in {}", func, context)
}
Expand Down
7 changes: 4 additions & 3 deletions src/adapter/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,20 @@ use mz_sql::ast::{Raw, Statement, TransactionAccessMode};
use mz_sql::plan::{Params, PlanContext, StatementDesc};
use mz_sql_parser::ast::TransactionIsolationLevel;

use crate::catalog::SYSTEM_USER;
use crate::client::ConnectionId;
use crate::coord::peek::PeekResponseUnary;
use crate::coord::CoordTimestamp;
use crate::error::AdapterError;
use crate::session::vars::IsolationLevel;

pub(crate) mod vars;

pub use self::vars::{
ClientSeverity, SessionVars, Var, DEFAULT_DATABASE_NAME, SERVER_MAJOR_VERSION,
SERVER_MINOR_VERSION, SERVER_PATCH_VERSION,
};

pub(crate) mod vars;

const DUMMY_CONNECTION_ID: ConnectionId = 0;

/// A session holds per-connection state.
Expand Down Expand Up @@ -67,7 +68,7 @@ impl<T: CoordTimestamp> Session<T> {
/// Dummy sessions are intended for use when executing queries on behalf of
/// the system itself, rather than on behalf of a user.
pub fn dummy() -> Session<T> {
Self::new_internal(DUMMY_CONNECTION_ID, "mz_system".into())
Self::new_internal(DUMMY_CONNECTION_ID, SYSTEM_USER.into())
}

fn new_internal(conn_id: ConnectionId, user: String) -> Session<T> {
Expand Down
11 changes: 5 additions & 6 deletions src/environmentd/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,7 @@ use http::header::{AUTHORIZATION, CONTENT_TYPE};
use http::{Request, StatusCode};
use hyper::server::conn::AddrIncoming;
use hyper_openssl::MaybeHttpsStream;
use mz_adapter::SessionClient;
use mz_ore::metrics::MetricsRegistry;
use mz_ore::tracing::OpenTelemetryEnableCallback;
use mz_adapter::catalog::HTTP_DEFAULT_USER;
use openssl::nid::Nid;
use openssl::ssl::{Ssl, SslContext};
use openssl::x509::X509;
Expand All @@ -48,7 +46,10 @@ use tower_http::cors::{AllowOrigin, Any, CorsLayer};
use tracing::{error, warn};

use mz_adapter::session::Session;
use mz_adapter::SessionClient;
use mz_frontegg_auth::{FronteggAuthentication, FronteggError};
use mz_ore::metrics::MetricsRegistry;
use mz_ore::tracing::OpenTelemetryEnableCallback;

use crate::BUILD_INFO;

Expand All @@ -57,8 +58,6 @@ mod memory;
mod root;
mod sql;

const SYSTEM_USER: &str = "mz_system";

#[derive(Debug, Clone)]
pub struct Config {
pub tls: Option<TlsConfig>,
Expand Down Expand Up @@ -278,7 +277,7 @@ async fn auth<B>(
let user = match frontegg {
// If no Frontegg authentication, we can use the cert's username if
// present, otherwise the system user.
None => user.unwrap_or_else(|| SYSTEM_USER.to_string()),
None => user.unwrap_or_else(|| HTTP_DEFAULT_USER.to_string()),
// If we require Frontegg auth, fetch credentials from the HTTP auth
// header. Basic auth comes with a username/password, where the password
// is the client+secret pair. Bearer auth is an existing JWT that must
Expand Down
41 changes: 21 additions & 20 deletions src/environmentd/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use tokio::net::TcpListener;
use tokio::sync::oneshot;
use tokio_stream::wrappers::TcpListenerStream;
use tower_http::cors::AllowOrigin;
use tracing::{error, info};

use mz_adapter::catalog::storage::BootstrapArgs;
use mz_adapter::catalog::{ClusterReplicaSizeMap, StorageHostSizeMap};
Expand All @@ -39,7 +40,6 @@ use mz_ore::tracing::OpenTelemetryEnableCallback;
use mz_persist_client::usage::StorageUsageClient;
use mz_secrets::SecretsController;
use mz_storage::types::connections::ConnectionContext;
use tracing::{error, info};

use crate::tcp_connection::ConnectionHandler;

Expand Down Expand Up @@ -259,22 +259,23 @@ pub async fn serve(config: Config) -> Result<Server, anyhow::Error> {
// Initialize controller.
let controller = mz_controller::Controller::new(config.controller).await;
// Initialize adapter.
let (adapter_handle, adapter_client) = mz_adapter::serve(mz_adapter::Config {
dataflow_client: controller,
storage: adapter_storage,
unsafe_mode: config.unsafe_mode,
build_info: &BUILD_INFO,
metrics_registry: config.metrics_registry.clone(),
now: config.now,
secrets_controller: config.secrets_controller,
cluster_replica_sizes: config.cluster_replica_sizes,
storage_host_sizes: config.storage_host_sizes,
default_storage_host_size: config.default_storage_host_size,
availability_zones: config.availability_zones,
connection_context: config.connection_context,
storage_usage_client,
})
.await?;
let (adapter_handle, external_adapter_client, internal_adapter_client) =
mz_adapter::serve(mz_adapter::Config {
dataflow_client: controller,
storage: adapter_storage,
unsafe_mode: config.unsafe_mode,
build_info: &BUILD_INFO,
metrics_registry: config.metrics_registry.clone(),
now: config.now,
secrets_controller: config.secrets_controller,
cluster_replica_sizes: config.cluster_replica_sizes,
storage_host_sizes: config.storage_host_sizes,
default_storage_host_size: config.default_storage_host_size,
availability_zones: config.availability_zones,
connection_context: config.connection_context,
storage_usage_client,
})
.await?;

// TODO(benesch): replace both `TCPListenerStream`s below with
// `<type>_listener.incoming()` if that is
Expand All @@ -291,7 +292,7 @@ pub async fn serve(config: Config) -> Result<Server, anyhow::Error> {
task::spawn(|| "pgwire_server", {
let pgwire_server = mz_pgwire::Server::new(mz_pgwire::Config {
tls: pgwire_tls,
adapter_client: adapter_client.clone(),
adapter_client: external_adapter_client.clone(),
frontegg: config.frontegg.clone(),
});

Expand All @@ -311,7 +312,7 @@ pub async fn serve(config: Config) -> Result<Server, anyhow::Error> {
task::spawn(|| "internal_pgwire_server", {
let internal_pgwire_server = mz_pgwire::Server::new(mz_pgwire::Config {
tls: None,
adapter_client: adapter_client.clone(),
adapter_client: internal_adapter_client,
frontegg: None,
});
let mut incoming = TcpListenerStream::new(internal_sql_listener);
Expand All @@ -330,7 +331,7 @@ pub async fn serve(config: Config) -> Result<Server, anyhow::Error> {
let http_server = http::Server::new(http::Config {
tls: http_tls,
frontegg: config.frontegg,
adapter_client,
adapter_client: external_adapter_client,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where does the internal http server get its internal adapter client?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit perplexing to me. The internal http port uses the http::InternalServer struct (initialized above all of this on line 204) which doesn't use an adapter client and has the following endpoints:

  • /metrics
  • /api/livez
  • /api/opentelemetry/config

The external http port uses the http::Server struct which uses an adapter client and has the following endpoints:

  • /
  • /api/internal/catalog (This one is really weird since it contains "internal" in it's path but is only accessible from the external port).
  • /api/sql
  • /memory
  • /hierarchical-memory
  • /prof/
  • /static/*path

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, good point! We should move that internal catalog API to the internal HTTP server.

allowed_origin: config.cors_allowed_origin,
});
let mut incoming = TcpListenerStream::new(http_listener);
Expand Down
Loading