Skip to content

Commit

Permalink
Wrapper util around mysql-async, including helpers for describing tab…
Browse files Browse the repository at this point in the history
…les and validating replication settings
  • Loading branch information
rjobanp committed Dec 15, 2023
1 parent a86b504 commit 22a4836
Show file tree
Hide file tree
Showing 10 changed files with 379 additions and 5 deletions.
18 changes: 16 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ members = [
"src/lowertest",
"src/lowertest-derive",
"src/metabase",
"src/mysql-util",
"src/mz",
"src/lsp-server",
"src/metrics",
Expand Down
19 changes: 19 additions & 0 deletions src/mysql-util/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
[package]
name = "mz-mysql-util"
description = "MySQL utility library."
version = "0.1.0"
edition.workspace = true
rust-version.workspace = true
publish = false

[dependencies]
anyhow = "1.0.66"
mysql_async = { version = "0.33.0", default-features = false, features = ["minimal"] }
mz-ssh-util = { path = "../ssh-util" }
mz-ore = { path = "../ore", features = ["async"] }
thiserror = "1.0.37"
workspace-hack = { version = "0.0.0", path = "../workspace-hack" }
tracing = "0.1.37"

[package.metadata.cargo-udeps.ignore]
normal = ["workspace-hack"]
40 changes: 40 additions & 0 deletions src/mysql-util/src/desc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

#[derive(Debug, Clone, Eq, PartialEq)]
pub struct MySqlTableDesc {
/// In MySQL the schema and database of a table are synonymous.
pub schema_name: String,
/// The name of the table.
pub name: String,
/// Columns for the table
///
/// The index of each column is based on its `ordinal_position`
/// reported by the information_schema.columns table, which defines
/// the order of column values when received in a row.
pub columns: Vec<MySqlColumnDesc>,
}

#[derive(Debug, Clone, Eq, PartialEq)]
pub struct MySqlColumnDesc {
/// The name of the column.
pub name: String,
/// The MySQL datatype of the column.
pub data_type: MySqlDataType,
/// Whether the column is nullable.
pub nullable: bool,
// TODO: add more column properties
}

#[derive(Debug, Clone, Eq, PartialEq)]
pub enum MySqlDataType {
Int,
Varchar(usize),
// TODO: add more data types
}
108 changes: 108 additions & 0 deletions src/mysql-util/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

// BEGIN LINT CONFIG
// DO NOT EDIT. Automatically generated by bin/gen-lints.
// Have complaints about the noise? See the note in misc/python/materialize/cli/gen-lints.py first.
#![allow(unknown_lints)]
#![allow(clippy::style)]
#![allow(clippy::complexity)]
#![allow(clippy::large_enum_variant)]
#![allow(clippy::mutable_key_type)]
#![allow(clippy::stable_sort_primitive)]
#![allow(clippy::map_entry)]
#![allow(clippy::box_default)]
#![allow(clippy::drain_collect)]
#![warn(clippy::bool_comparison)]
#![warn(clippy::clone_on_ref_ptr)]
#![warn(clippy::no_effect)]
#![warn(clippy::unnecessary_unwrap)]
#![warn(clippy::dbg_macro)]
#![warn(clippy::todo)]
#![warn(clippy::wildcard_dependencies)]
#![warn(clippy::zero_prefixed_literal)]
#![warn(clippy::borrowed_box)]
#![warn(clippy::deref_addrof)]
#![warn(clippy::double_must_use)]
#![warn(clippy::double_parens)]
#![warn(clippy::extra_unused_lifetimes)]
#![warn(clippy::needless_borrow)]
#![warn(clippy::needless_question_mark)]
#![warn(clippy::needless_return)]
#![warn(clippy::redundant_pattern)]
#![warn(clippy::redundant_slicing)]
#![warn(clippy::redundant_static_lifetimes)]
#![warn(clippy::single_component_path_imports)]
#![warn(clippy::unnecessary_cast)]
#![warn(clippy::useless_asref)]
#![warn(clippy::useless_conversion)]
#![warn(clippy::builtin_type_shadow)]
#![warn(clippy::duplicate_underscore_argument)]
#![warn(clippy::double_neg)]
#![warn(clippy::unnecessary_mut_passed)]
#![warn(clippy::wildcard_in_or_patterns)]
#![warn(clippy::crosspointer_transmute)]
#![warn(clippy::excessive_precision)]
#![warn(clippy::overflow_check_conditional)]
#![warn(clippy::as_conversions)]
#![warn(clippy::match_overlapping_arm)]
#![warn(clippy::zero_divided_by_zero)]
#![warn(clippy::must_use_unit)]
#![warn(clippy::suspicious_assignment_formatting)]
#![warn(clippy::suspicious_else_formatting)]
#![warn(clippy::suspicious_unary_op_formatting)]
#![warn(clippy::mut_mutex_lock)]
#![warn(clippy::print_literal)]
#![warn(clippy::same_item_push)]
#![warn(clippy::useless_format)]
#![warn(clippy::write_literal)]
#![warn(clippy::redundant_closure)]
#![warn(clippy::redundant_closure_call)]
#![warn(clippy::unnecessary_lazy_evaluations)]
#![warn(clippy::partialeq_ne_impl)]
#![warn(clippy::redundant_field_names)]
#![warn(clippy::transmutes_expressible_as_ptr_casts)]
#![warn(clippy::unused_async)]
#![warn(clippy::disallowed_methods)]
#![warn(clippy::disallowed_macros)]
#![warn(clippy::disallowed_types)]
#![warn(clippy::from_over_into)]
// END LINT CONFIG

//! MySQL utility library.

mod tunnel;
pub use tunnel::{Config, TunnelConfig};

mod desc;
pub use desc::{MySqlColumnDesc, MySqlDataType, MySqlTableDesc};

mod replication;
pub use replication::{ensure_full_row_binlog_format, ensure_gtid_consistency, query_sys_var};

pub mod schemas;
pub use schemas::schema_info;

#[derive(Debug, thiserror::Error)]
pub enum MySqlError {
#[error("unsupported data type: {0}")]
UnsupportedDataType(String),
#[error("invalid mysql system setting '{setting}'. Expected '{expected}'. Got '{actual}'.")]
InvalidSystemSetting {
setting: String,
expected: String,
actual: String,
},
/// Any other error we bail on.
#[error(transparent)]
Generic(#[from] anyhow::Error),
/// A mysql_async error.
#[error(transparent)]
MySql(#[from] mysql_async::Error),
}
51 changes: 51 additions & 0 deletions src/mysql-util/src/replication.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use mysql_async::prelude::Queryable;
use mysql_async::Conn;

use crate::MySqlError;

/// Query a MySQL System Variable
pub async fn query_sys_var(conn: &mut Conn, name: &str) -> Result<String, MySqlError> {
let value: String = conn
.query_first(format!("SELECT @@{}", name))
.await?
.unwrap();
Ok(value)
}

/// Verify a MySQL System Variable matches the expected value
async fn verify_sys_setting(
conn: &mut Conn,
setting: &str,
expected: &str,
) -> Result<(), MySqlError> {
match query_sys_var(conn, setting).await?.as_str() {
actual if actual == expected => Ok(()),
actual => Err(MySqlError::InvalidSystemSetting {
setting: setting.to_string(),
expected: expected.to_string(),
actual: actual.to_string(),
}),
}
}

pub async fn ensure_full_row_binlog_format(conn: &mut Conn) -> Result<(), MySqlError> {
verify_sys_setting(conn, "log_bin", "1").await?;
verify_sys_setting(conn, "binlog_format", "ROW").await?;
verify_sys_setting(conn, "binlog_row_image", "FULL").await?;
Ok(())
}

pub async fn ensure_gtid_consistency(conn: &mut Conn) -> Result<(), MySqlError> {
verify_sys_setting(conn, "gtid_mode", "ON").await?;
verify_sys_setting(conn, "enforce_gtid_consistency", "ON").await?;
Ok(())
}
64 changes: 64 additions & 0 deletions src/mysql-util/src/schemas.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use mysql_async::prelude::Queryable;
use mysql_async::Conn;

use crate::desc::{MySqlColumnDesc, MySqlDataType, MySqlTableDesc};
use crate::MySqlError;

/// Retrieve the tables and column descriptions for tables in the given schemas.
pub async fn schema_info(
conn: &mut Conn,
schemas: Vec<&str>,
) -> Result<Vec<MySqlTableDesc>, MySqlError> {
// Get all tables of type 'Base Table' in schema
let table_q = "SELECT table_name, table_schema
FROM information_schema.tables
WHERE table_type = 'BASE TABLE'
AND table_schema IN (?)";
let table_rows: Vec<(String, String)> = conn.exec(table_q, schemas).await?;

let mut tables = vec![];
for (table_name, schema_name) in table_rows {
let column_q = "SELECT column_name, data_type, is_nullable, ordinal_position
FROM information_schema.columns
WHERE table_name = ? AND table_schema = ?";
let column_rows = conn
.exec::<(String, String, String, u16), _, _>(column_q, (&table_name, &schema_name))
.await?;

let mut columns = Vec::with_capacity(column_rows.len());
for (name, data_type, is_nullable, ordinal_position) in column_rows {
let data_type = match data_type.as_str() {
"int" => MySqlDataType::Int,
"varchar" => MySqlDataType::Varchar(usize::MAX),
"char" => MySqlDataType::Varchar(usize::MAX),
_ => return Err(MySqlError::UnsupportedDataType(data_type)),
};
columns.push((
ordinal_position,
MySqlColumnDesc {
name,
data_type,
nullable: is_nullable == "YES",
},
));
}
// Sort columns in-place by their ordinal_position
columns.sort_by(|a, b| a.0.cmp(&b.0));

tables.push(MySqlTableDesc {
schema_name,
name: table_name,
columns: columns.into_iter().map(|(_, c)| c).collect(),
});
}
Ok(tables)
}
Loading

0 comments on commit 22a4836

Please sign in to comment.