Skip to content

Commit

Permalink
Add more substantial tests for compute migrations
Browse files Browse the repository at this point in the history
The previous tests really didn't do much. This set should be quite a bit
more encompassing.

Signed-off-by: Tristan Partin <tristan@neon.tech>
  • Loading branch information
tristan957 committed Dec 23, 2024
1 parent 9c53b41 commit b34260a
Show file tree
Hide file tree
Showing 26 changed files with 327 additions and 47 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion compute_tools/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ license.workspace = true
[features]
default = []
# Enables test specific features.
testing = []
testing = ["fail/failpoints"]

[dependencies]
base64.workspace = true
Expand All @@ -19,6 +19,7 @@ camino.workspace = true
chrono.workspace = true
cfg-if.workspace = true
clap.workspace = true
fail.workspace = true
flate2.workspace = true
futures.workspace = true
hyper0 = { workspace = true, features = ["full"] }
Expand Down
5 changes: 5 additions & 0 deletions compute_tools/src/bin/compute_ctl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,15 @@ use compute_tools::params::*;
use compute_tools::spec::*;
use compute_tools::swap::resize_swap;
use rlimit::{setrlimit, Resource};
use utils::failpoint_support;

// this is an arbitrary build tag. Fine as a default / for testing purposes
// in-case of not-set environment var
const BUILD_TAG_DEFAULT: &str = "latest";

fn main() -> Result<()> {
let scenario = failpoint_support::init();

let (build_tag, clap_args) = init()?;

// enable core dumping for all child processes
Expand Down Expand Up @@ -100,6 +103,8 @@ fn main() -> Result<()> {

maybe_delay_exit(delay_exit);

scenario.teardown();

deinit_and_exit(wait_pg_result);
}

Expand Down
15 changes: 13 additions & 2 deletions compute_tools/src/compute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1181,8 +1181,19 @@ impl ComputeNode {
let mut conf = postgres::config::Config::from(conf);
conf.application_name("compute_ctl:migrations");

let mut client = conf.connect(NoTls)?;
handle_migrations(&mut client).context("apply_config handle_migrations")
match conf.connect(NoTls) {
Ok(mut client) => {
if let Err(e) = handle_migrations(&mut client) {
error!("Failed to run migrations: {}", e);
}
}
Err(e) => {
error!(
"Failed to connect to the compute for running migrations: {}",
e
);
}
};
});

Ok::<(), anyhow::Error>(())
Expand Down
15 changes: 15 additions & 0 deletions compute_tools/src/http/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,11 @@ use metrics::proto::MetricFamily;
use metrics::Encoder;
use metrics::TextEncoder;
use tokio::task;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, warn};
use tracing_utils::http::OtelName;
use utils::failpoint_support::failpoints_handler;
use utils::http::error::ApiError;
use utils::http::request::must_get_query_param;

fn status_response_from_state(state: &ComputeState) -> ComputeStatusResponse {
Expand Down Expand Up @@ -310,6 +313,18 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
}
}

(&Method::POST, "/failpoints") if cfg!(feature = "testing") => {
match failpoints_handler(req, CancellationToken::new()).await {
Ok(r) => r,
Err(ApiError::BadRequest(e)) => {
render_json_error(&e.to_string(), StatusCode::BAD_REQUEST)
}
Err(_) => {
render_json_error("Internal server error", StatusCode::INTERNAL_SERVER_ERROR)
}
}
}

// download extension files from remote extension storage on demand
(&Method::POST, route) if route.starts_with("/extension_server/") => {
info!("serving {:?} POST request", route);
Expand Down
41 changes: 38 additions & 3 deletions compute_tools/src/migration.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,24 @@
use anyhow::{Context, Result};
use fail::fail_point;
use postgres::Client;
use tracing::info;

/// Runs a series of migrations on a target database
pub(crate) struct MigrationRunner<'m> {
client: &'m mut Client,
migrations: &'m [&'m str],
}

impl<'m> MigrationRunner<'m> {
/// Create a new migration runner
pub fn new(client: &'m mut Client, migrations: &'m [&'m str]) -> Self {
// The neon_migration.migration_id::id column is a bigint, which is equivalent to an i64
assert!(migrations.len() + 1 < i64::MAX as usize);

Self { client, migrations }
}

/// Get the current value neon_migration.migration_id
fn get_migration_id(&mut self) -> Result<i64> {
let query = "SELECT id FROM neon_migration.migration_id";
let row = self
Expand All @@ -25,17 +29,43 @@ impl<'m> MigrationRunner<'m> {
Ok(row.get::<&str, i64>("id"))
}

/// Update the neon_migration.migration_id value
///
/// This function has a fail point called compute-migration, which can be
/// used if you would like to fail the application of a series of migrations
/// at some point.
fn update_migration_id(&mut self, migration_id: i64) -> Result<()> {
let setval = format!("UPDATE neon_migration.migration_id SET id={}", migration_id);

// We use this fail point in order to check that failing in the
// middle of applying a series of migrations fails in an expected
// manner
if cfg!(feature = "testing") {
let fail = (|| {
fail_point!("compute-migration", |fail_migration_id| {
migration_id == fail_migration_id.unwrap().parse::<i64>().unwrap()
});

false
})();

if fail {
return Err(anyhow::anyhow!(format!(
"migration {} was configured to fail because of a failpoint",
migration_id
)));
}
}

self.client
.simple_query(&setval)
.context("run_migrations update id")?;

Ok(())
}

fn prepare_migrations(&mut self) -> Result<()> {
/// Prepare the migrations the target database for handling migrations
fn prepare_database(&mut self) -> Result<()> {
let query = "CREATE SCHEMA IF NOT EXISTS neon_migration";
self.client.simple_query(query)?;

Expand All @@ -54,8 +84,9 @@ impl<'m> MigrationRunner<'m> {
Ok(())
}

/// Run the configrured set of migrations
pub fn run_migrations(mut self) -> Result<()> {
self.prepare_migrations()?;
self.prepare_database()?;

let mut current_migration = self.get_migration_id()? as usize;
while current_migration < self.migrations.len() {
Expand All @@ -69,6 +100,11 @@ impl<'m> MigrationRunner<'m> {

if migration.starts_with("-- SKIP") {
info!("Skipping migration id={}", migration_id!(current_migration));

// Even though we are skipping the migration, updating the
// migration ID should help keep logic easy to understand when
// trying to understand the state of a cluster.
self.update_migration_id(migration_id!(current_migration))?;
} else {
info!(
"Running migration id={}:\n{}\n",
Expand All @@ -87,7 +123,6 @@ impl<'m> MigrationRunner<'m> {
)
})?;

// Migration IDs start at 1
self.update_migration_id(migration_id!(current_migration))?;

self.client
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
DO $$
DECLARE
bypassrls boolean;
BEGIN
SELECT rolbypassrls INTO bypassrls FROM pg_roles WHERE rolname = 'neon_superuser';
IF NOT bypassrls THEN
RAISE EXCEPTION 'neon_superuser cannot bypass RLS';
END IF;
END $$;
25 changes: 25 additions & 0 deletions compute_tools/src/migrations/tests/0002-alter_roles.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
DO $$
DECLARE
role record;
BEGIN
FOR role IN
SELECT rolname AS name, rolinherit AS inherit
FROM pg_roles
WHERE pg_has_role(rolname, 'neon_superuser', 'member')
LOOP
IF NOT role.inherit THEN
RAISE EXCEPTION '% cannot inherit', quote_ident(role.name);
END IF;
END LOOP;

FOR role IN
SELECT rolname AS name, rolbypassrls AS bypassrls
FROM pg_roles
WHERE NOT pg_has_role(rolname, 'neon_superuser', 'member')
AND NOT starts_with(rolname, 'pg_')
LOOP
IF role.bypassrls THEN
RAISE EXCEPTION '% can bypass RLS', quote_ident(role.name);
END IF;
END LOOP;
END $$;
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
DO $$
BEGIN
IF (SELECT current_setting('server_version_num')::numeric < 160000) THEN
RETURN;
END IF;

IF NOT (SELECT pg_has_role('neon_superuser', 'pg_create_subscription', 'member')) THEN
RAISE EXCEPTION 'neon_superuser cannot execute pg_create_subscription';
END IF;
END $$;
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
DO $$
DECLARE
monitor record;
BEGIN
SELECT pg_has_role('neon_superuser', 'pg_monitor', 'member') AS member,
admin_option AS admin
INTO monitor
FROM pg_auth_members
WHERE roleid = 'pg_monitor'::regrole
AND member = 'pg_monitor'::regrole;

IF NOT monitor.member THEN
RAISE EXCEPTION 'neon_superuser is not a member of pg_monitor';
END IF;

IF NOT monitor.admin THEN
RAISE EXCEPTION 'neon_superuser cannot grant pg_monitor';
END IF;
END $$;
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- This test was never written becuase at the time migration tests were added
-- the accompanying migration was already skipped.
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- This test was never written becuase at the time migration tests were added
-- the accompanying migration was already skipped.
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- This test was never written becuase at the time migration tests were added
-- the accompanying migration was already skipped.
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- This test was never written becuase at the time migration tests were added
-- the accompanying migration was already skipped.
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- This test was never written becuase at the time migration tests were added
-- the accompanying migration was already skipped.
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
DO $$
DECLARE
can_execute boolean;
BEGIN
SELECT bool_and(has_function_privilege('neon_superuser', oid, 'execute'))
INTO can_execute
FROM pg_proc
WHERE proname IN ('pg_export_snapshot', 'pg_log_standby_snapshot')
AND pronamespace = 'pg_catalog'::regnamespace;
IF NOT can_execute THEN
RAISE EXCEPTION 'neon_superuser cannot execute both pg_export_snapshot and pg_log_standby_snapshot';
END IF;
END $$;
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
DO $$
DECLARE
can_execute boolean;
BEGIN
SELECT has_function_privilege('neon_superuser', oid, 'execute')
INTO can_execute
FROM pg_proc
WHERE proname = 'pg_show_replication_origin_status'
AND pronamespace = 'pg_catalog'::regnamespace;
IF NOT can_execute THEN
RAISE EXCEPTION 'neon_superuser cannot execute pg_show_replication_origin_status';
END IF;
END $$;
4 changes: 2 additions & 2 deletions libs/utils/src/failpoint_support.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use serde::{Deserialize, Serialize};
use tokio_util::sync::CancellationToken;
use tracing::*;

/// Declare a failpoint that can use the `pause` failpoint action.
/// Declare a failpoint that can use to `pause` failpoint action.
/// We don't want to block the executor thread, hence, spawn_blocking + await.
#[macro_export]
macro_rules! pausable_failpoint {
Expand Down Expand Up @@ -181,7 +181,7 @@ pub async fn failpoints_handler(
) -> Result<Response<Body>, ApiError> {
if !fail::has_failpoints() {
return Err(ApiError::BadRequest(anyhow::anyhow!(
"Cannot manage failpoints because storage was compiled without failpoints support"
"Cannot manage failpoints because neon was compiled without failpoints support"
)));
}

Expand Down
1 change: 1 addition & 0 deletions test_runner/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
"fixtures.compute_reconfigure",
"fixtures.storage_controller_proxy",
"fixtures.paths",
"fixtures.compute_migrations",
"fixtures.neon_fixtures",
"fixtures.benchmark_fixture",
"fixtures.pg_stats",
Expand Down
34 changes: 34 additions & 0 deletions test_runner/fixtures/compute_migrations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from __future__ import annotations

import os
from typing import TYPE_CHECKING

import pytest

from fixtures.paths import BASE_DIR

if TYPE_CHECKING:
from collections.abc import Iterator
from pathlib import Path

COMPUTE_MIGRATIONS_DIR = BASE_DIR / "compute_tools" / "src" / "migrations"
COMPUTE_MIGRATIONS_TEST_DIR = COMPUTE_MIGRATIONS_DIR / "tests"

COMPUTE_MIGRATIONS = sorted(next(os.walk(COMPUTE_MIGRATIONS_DIR))[2])
NUM_COMPUTE_MIGRATIONS = len(COMPUTE_MIGRATIONS)


@pytest.fixture(scope="session")
def compute_migrations_dir() -> Iterator[Path]:
"""
Retrieve the path to the compute migrations directory.
"""
yield COMPUTE_MIGRATIONS_DIR


@pytest.fixture(scope="session")
def compute_migrations_test_dir() -> Iterator[Path]:
"""
Retrieve the path to the compute migrations test directory.
"""
yield COMPUTE_MIGRATIONS_TEST_DIR
14 changes: 14 additions & 0 deletions test_runner/fixtures/endpoint/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,17 @@ def metrics(self) -> str:
res = self.get(f"http://localhost:{self.port}/metrics")
res.raise_for_status()
return res.text

def configure_failpoints(self, *args: tuple[str, str]) -> None:
body: list[dict[str, str]] = []

for fp in args:
body.append(
{
"name": fp[0],
"action": fp[1],
}
)

res = self.post(f"http://localhost:{self.port}/failpoints", json=body)
res.raise_for_status()
Loading

0 comments on commit b34260a

Please sign in to comment.