diff --git a/Cargo.lock b/Cargo.lock index 372042a76b52..a4ad54aea672 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4509,6 +4509,19 @@ dependencies = [ "workspace-hack", ] +[[package]] +name = "mz-mysql-util" +version = "0.1.0" +dependencies = [ + "anyhow", + "mysql_async", + "mz-ore", + "mz-ssh-util", + "thiserror", + "tracing", + "workspace-hack", +] + [[package]] name = "mz-npm" version = "0.0.0" @@ -5248,7 +5261,7 @@ dependencies = [ "futures", "mz-ore", "openssl", - "socket2 0.4.9", + "socket2 0.5.3", "tokio", "tokio-stream", "tracing", @@ -9675,6 +9688,7 @@ dependencies = [ "kube-client", "kube-core", "libc", + "libz-sys", "log", "lru", "memchr", @@ -9718,7 +9732,7 @@ dependencies = [ "sha2", "similar", "smallvec", - "socket2 0.4.9", + "socket2 0.5.3", "subtle", "syn 1.0.107", "syn 2.0.39", diff --git a/Cargo.toml b/Cargo.toml index b8c991cb18f0..768030ecf33f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,6 +35,7 @@ members = [ "src/lowertest", "src/lowertest-derive", "src/metabase", + "src/mysql-util", "src/mz", "src/lsp-server", "src/metrics", diff --git a/src/mysql-util/Cargo.toml b/src/mysql-util/Cargo.toml new file mode 100644 index 000000000000..d6560ca1342b --- /dev/null +++ b/src/mysql-util/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "mz-mysql-util" +description = "MySQL utility library." +version = "0.1.0" +edition.workspace = true +rust-version.workspace = true +publish = false + +[dependencies] +anyhow = "1.0.66" +mysql_async = { version = "0.33.0", default-features = false, features = ["minimal"] } +mz-ssh-util = { path = "../ssh-util" } +mz-ore = { path = "../ore", features = ["async"] } +thiserror = "1.0.37" +workspace-hack = { version = "0.0.0", path = "../workspace-hack" } +tracing = "0.1.37" + +[package.metadata.cargo-udeps.ignore] +normal = ["workspace-hack"] diff --git a/src/mysql-util/src/desc.rs b/src/mysql-util/src/desc.rs new file mode 100644 index 000000000000..ab6f17df3a52 --- /dev/null +++ b/src/mysql-util/src/desc.rs @@ -0,0 +1,40 @@ +// Copyright Materialize, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +#[derive(Debug, Clone, Eq, PartialEq)] +pub struct MySqlTableDesc { + /// In MySQL the schema and database of a table are synonymous. + pub schema_name: String, + /// The name of the table. + pub name: String, + /// Columns for the table + /// + /// The index of each column is based on its `ordinal_position` + /// reported by the information_schema.columns table, which defines + /// the order of column values when received in a row. + pub columns: Vec, +} + +#[derive(Debug, Clone, Eq, PartialEq)] +pub struct MySqlColumnDesc { + /// The name of the column. + pub name: String, + /// The MySQL datatype of the column. + pub data_type: MySqlDataType, + /// Whether the column is nullable. + pub nullable: bool, + // TODO: add more column properties +} + +#[derive(Debug, Clone, Eq, PartialEq)] +pub enum MySqlDataType { + Int, + Varchar(usize), + // TODO: add more data types +} diff --git a/src/mysql-util/src/lib.rs b/src/mysql-util/src/lib.rs new file mode 100644 index 000000000000..80c4f7f4c581 --- /dev/null +++ b/src/mysql-util/src/lib.rs @@ -0,0 +1,108 @@ +// Copyright Materialize, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +// BEGIN LINT CONFIG +// DO NOT EDIT. Automatically generated by bin/gen-lints. +// Have complaints about the noise? See the note in misc/python/materialize/cli/gen-lints.py first. +#![allow(unknown_lints)] +#![allow(clippy::style)] +#![allow(clippy::complexity)] +#![allow(clippy::large_enum_variant)] +#![allow(clippy::mutable_key_type)] +#![allow(clippy::stable_sort_primitive)] +#![allow(clippy::map_entry)] +#![allow(clippy::box_default)] +#![allow(clippy::drain_collect)] +#![warn(clippy::bool_comparison)] +#![warn(clippy::clone_on_ref_ptr)] +#![warn(clippy::no_effect)] +#![warn(clippy::unnecessary_unwrap)] +#![warn(clippy::dbg_macro)] +#![warn(clippy::todo)] +#![warn(clippy::wildcard_dependencies)] +#![warn(clippy::zero_prefixed_literal)] +#![warn(clippy::borrowed_box)] +#![warn(clippy::deref_addrof)] +#![warn(clippy::double_must_use)] +#![warn(clippy::double_parens)] +#![warn(clippy::extra_unused_lifetimes)] +#![warn(clippy::needless_borrow)] +#![warn(clippy::needless_question_mark)] +#![warn(clippy::needless_return)] +#![warn(clippy::redundant_pattern)] +#![warn(clippy::redundant_slicing)] +#![warn(clippy::redundant_static_lifetimes)] +#![warn(clippy::single_component_path_imports)] +#![warn(clippy::unnecessary_cast)] +#![warn(clippy::useless_asref)] +#![warn(clippy::useless_conversion)] +#![warn(clippy::builtin_type_shadow)] +#![warn(clippy::duplicate_underscore_argument)] +#![warn(clippy::double_neg)] +#![warn(clippy::unnecessary_mut_passed)] +#![warn(clippy::wildcard_in_or_patterns)] +#![warn(clippy::crosspointer_transmute)] +#![warn(clippy::excessive_precision)] +#![warn(clippy::overflow_check_conditional)] +#![warn(clippy::as_conversions)] +#![warn(clippy::match_overlapping_arm)] +#![warn(clippy::zero_divided_by_zero)] +#![warn(clippy::must_use_unit)] +#![warn(clippy::suspicious_assignment_formatting)] +#![warn(clippy::suspicious_else_formatting)] +#![warn(clippy::suspicious_unary_op_formatting)] +#![warn(clippy::mut_mutex_lock)] +#![warn(clippy::print_literal)] +#![warn(clippy::same_item_push)] +#![warn(clippy::useless_format)] +#![warn(clippy::write_literal)] +#![warn(clippy::redundant_closure)] +#![warn(clippy::redundant_closure_call)] +#![warn(clippy::unnecessary_lazy_evaluations)] +#![warn(clippy::partialeq_ne_impl)] +#![warn(clippy::redundant_field_names)] +#![warn(clippy::transmutes_expressible_as_ptr_casts)] +#![warn(clippy::unused_async)] +#![warn(clippy::disallowed_methods)] +#![warn(clippy::disallowed_macros)] +#![warn(clippy::disallowed_types)] +#![warn(clippy::from_over_into)] +// END LINT CONFIG + +//! MySQL utility library. + +mod tunnel; +pub use tunnel::{Config, TunnelConfig}; + +mod desc; +pub use desc::{MySqlColumnDesc, MySqlDataType, MySqlTableDesc}; + +mod replication; +pub use replication::{ensure_full_row_binlog_format, ensure_gtid_consistency, query_sys_var}; + +pub mod schemas; +pub use schemas::schema_info; + +#[derive(Debug, thiserror::Error)] +pub enum MySqlError { + #[error("unsupported data type: {0}")] + UnsupportedDataType(String), + #[error("invalid mysql system setting '{setting}'. Expected '{expected}'. Got '{actual}'.")] + InvalidSystemSetting { + setting: String, + expected: String, + actual: String, + }, + /// Any other error we bail on. + #[error(transparent)] + Generic(#[from] anyhow::Error), + /// A mysql_async error. + #[error(transparent)] + MySql(#[from] mysql_async::Error), +} diff --git a/src/mysql-util/src/replication.rs b/src/mysql-util/src/replication.rs new file mode 100644 index 000000000000..1ee6e4fc3ee4 --- /dev/null +++ b/src/mysql-util/src/replication.rs @@ -0,0 +1,51 @@ +// Copyright Materialize, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use mysql_async::prelude::Queryable; +use mysql_async::Conn; + +use crate::MySqlError; + +/// Query a MySQL System Variable +pub async fn query_sys_var(conn: &mut Conn, name: &str) -> Result { + let value: String = conn + .query_first(format!("SELECT @@{}", name)) + .await? + .unwrap(); + Ok(value) +} + +/// Verify a MySQL System Variable matches the expected value +async fn verify_sys_setting( + conn: &mut Conn, + setting: &str, + expected: &str, +) -> Result<(), MySqlError> { + match query_sys_var(conn, setting).await?.as_str() { + actual if actual == expected => Ok(()), + actual => Err(MySqlError::InvalidSystemSetting { + setting: setting.to_string(), + expected: expected.to_string(), + actual: actual.to_string(), + }), + } +} + +pub async fn ensure_full_row_binlog_format(conn: &mut Conn) -> Result<(), MySqlError> { + verify_sys_setting(conn, "log_bin", "1").await?; + verify_sys_setting(conn, "binlog_format", "ROW").await?; + verify_sys_setting(conn, "binlog_row_image", "FULL").await?; + Ok(()) +} + +pub async fn ensure_gtid_consistency(conn: &mut Conn) -> Result<(), MySqlError> { + verify_sys_setting(conn, "gtid_mode", "ON").await?; + verify_sys_setting(conn, "enforce_gtid_consistency", "ON").await?; + Ok(()) +} diff --git a/src/mysql-util/src/schemas.rs b/src/mysql-util/src/schemas.rs new file mode 100644 index 000000000000..ad45dd3d56f0 --- /dev/null +++ b/src/mysql-util/src/schemas.rs @@ -0,0 +1,64 @@ +// Copyright Materialize, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use mysql_async::prelude::Queryable; +use mysql_async::Conn; + +use crate::desc::{MySqlColumnDesc, MySqlDataType, MySqlTableDesc}; +use crate::MySqlError; + +/// Retrieve the tables and column descriptions for tables in the given schemas. +pub async fn schema_info( + conn: &mut Conn, + schemas: Vec<&str>, +) -> Result, MySqlError> { + // Get all tables of type 'Base Table' in schema + let table_q = "SELECT table_name, table_schema + FROM information_schema.tables + WHERE table_type = 'BASE TABLE' + AND table_schema IN (?)"; + let table_rows: Vec<(String, String)> = conn.exec(table_q, schemas).await?; + + let mut tables = vec![]; + for (table_name, schema_name) in table_rows { + let column_q = "SELECT column_name, data_type, is_nullable, ordinal_position + FROM information_schema.columns + WHERE table_name = ? AND table_schema = ?"; + let column_rows = conn + .exec::<(String, String, String, u16), _, _>(column_q, (&table_name, &schema_name)) + .await?; + + let mut columns = Vec::with_capacity(column_rows.len()); + for (name, data_type, is_nullable, ordinal_position) in column_rows { + let data_type = match data_type.as_str() { + "int" => MySqlDataType::Int, + "varchar" => MySqlDataType::Varchar(usize::MAX), + "char" => MySqlDataType::Varchar(usize::MAX), + _ => return Err(MySqlError::UnsupportedDataType(data_type)), + }; + columns.push(( + ordinal_position, + MySqlColumnDesc { + name, + data_type, + nullable: is_nullable == "YES", + }, + )); + } + // Sort columns in-place by their ordinal_position + columns.sort_by(|a, b| a.0.cmp(&b.0)); + + tables.push(MySqlTableDesc { + schema_name, + name: table_name, + columns: columns.into_iter().map(|(_, c)| c).collect(), + }); + } + Ok(tables) +} diff --git a/src/mysql-util/src/tunnel.rs b/src/mysql-util/src/tunnel.rs new file mode 100644 index 000000000000..1306c4224d7b --- /dev/null +++ b/src/mysql-util/src/tunnel.rs @@ -0,0 +1,75 @@ +// Copyright Materialize, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use mz_ore::option::OptionExt; +use mz_ssh_util::tunnel_manager::SshTunnelManager; + +use mysql_async::{Conn, Opts}; +use tracing::{info, warn}; + +use crate::MySqlError; + +/// Configures an optional tunnel for use when connecting to a MySQL +/// database. +#[derive(Debug, PartialEq, Clone)] +pub enum TunnelConfig { + /// Establish a direct TCP connection to the database host. + Direct, + // TODO: Implement SSH tunneling for MySQL connections + // TODO: Implement AWS PrivateLink tunneling for MySQL connections +} + +/// Configuration for MySQL connections. +/// +/// This wraps [`mysql_async::Opts`] to allow the configuration of a +/// tunnel via a [`TunnelConfig`]. +#[derive(Clone, Debug)] +pub struct Config { + inner: Opts, + tunnel: TunnelConfig, +} + +impl Config { + pub fn new(inner: Opts, tunnel: TunnelConfig) -> Self { + Self { inner, tunnel } + } + + pub async fn connect( + &self, + task_name: &str, + _ssh_tunnel_manager: &SshTunnelManager, + ) -> Result { + let address = format!( + "mysql:://{}@{}:{}/{}", + self.inner.user().display_or(""), + self.inner.ip_or_hostname(), + self.inner.tcp_port(), + self.inner.db_name().display_or(""), + ); + info!(%task_name, %address, "connecting"); + match self.connect_internal().await { + Ok(t) => { + info!(%task_name, %address, "connected"); + Ok(t) + } + Err(e) => { + warn!(%task_name, %address, "connection failed: {e:#}"); + Err(e) + } + } + } + + async fn connect_internal(&self) -> Result { + match &self.tunnel { + TunnelConfig::Direct => Conn::new(self.inner.clone()) + .await + .map_err(MySqlError::from), + } + } +} diff --git a/src/server-core/Cargo.toml b/src/server-core/Cargo.toml index 28de883a1bfc..2b7b106b9b8f 100644 --- a/src/server-core/Cargo.toml +++ b/src/server-core/Cargo.toml @@ -10,7 +10,7 @@ publish = false anyhow = "1.0.66" clap = { version = "3.2.24", features = ["derive", "env"] } openssl = { version = "0.10.48", features = ["vendored"] } -socket2 = "0.4.7" +socket2 = "0.5.3" tokio-stream = "0.1.11" tracing = "0.1.37" futures = "0.3.25" diff --git a/src/workspace-hack/Cargo.toml b/src/workspace-hack/Cargo.toml index 2cd1ea01f917..f11024f3ee4a 100644 --- a/src/workspace-hack/Cargo.toml +++ b/src/workspace-hack/Cargo.toml @@ -55,6 +55,7 @@ kube = { version = "0.87.1", default-features = false, features = ["client", "de kube-client = { version = "0.87.1", default-features = false, features = ["jsonpatch", "openssl-tls", "ws"] } kube-core = { version = "0.87.1", default-features = false, features = ["jsonpatch", "schema", "ws"] } libc = { version = "0.2.148", features = ["extra_traits", "use_std"] } +libz-sys = { version = "1.1.8", features = ["static"] } log = { version = "0.4.17", default-features = false, features = ["std"] } lru = { version = "0.12.0" } memchr = { version = "2.5.0", features = ["use_std"] } @@ -94,7 +95,7 @@ serde_json = { version = "1.0.99", features = ["alloc", "arbitrary_precision", " sha2 = { version = "0.10.6" } similar = { version = "2.2.1", features = ["inline", "unicode"] } smallvec = { version = "1.10.0", default-features = false, features = ["const_generics", "serde", "union", "write"] } -socket2 = { version = "0.4.9", default-features = false, features = ["all"] } +socket2 = { version = "0.5.3", default-features = false, features = ["all"] } subtle = { version = "2.4.1" } syn-dff4ba8e3ae991db = { package = "syn", version = "1.0.107", features = ["extra-traits", "full", "visit", "visit-mut"] } syn-f595c2ba2a3f28df = { package = "syn", version = "2.0.39", features = ["extra-traits", "full", "visit-mut"] } @@ -163,6 +164,7 @@ kube = { version = "0.87.1", default-features = false, features = ["client", "de kube-client = { version = "0.87.1", default-features = false, features = ["jsonpatch", "openssl-tls", "ws"] } kube-core = { version = "0.87.1", default-features = false, features = ["jsonpatch", "schema", "ws"] } libc = { version = "0.2.148", features = ["extra_traits", "use_std"] } +libz-sys = { version = "1.1.8", features = ["static"] } log = { version = "0.4.17", default-features = false, features = ["std"] } lru = { version = "0.12.0" } memchr = { version = "2.5.0", features = ["use_std"] } @@ -202,7 +204,7 @@ serde_json = { version = "1.0.99", features = ["alloc", "arbitrary_precision", " sha2 = { version = "0.10.6" } similar = { version = "2.2.1", features = ["inline", "unicode"] } smallvec = { version = "1.10.0", default-features = false, features = ["const_generics", "serde", "union", "write"] } -socket2 = { version = "0.4.9", default-features = false, features = ["all"] } +socket2 = { version = "0.5.3", default-features = false, features = ["all"] } subtle = { version = "2.4.1" } syn-dff4ba8e3ae991db = { package = "syn", version = "1.0.107", features = ["extra-traits", "full", "visit", "visit-mut"] } syn-f595c2ba2a3f28df = { package = "syn", version = "2.0.39", features = ["extra-traits", "full", "visit-mut"] }