Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage controller: read from database in validate API #8784

Merged
merged 10 commits into from
Sep 4, 2024
2 changes: 1 addition & 1 deletion storage_controller/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ async fn handle_validate(mut req: Request<Body>) -> Result<Response<Body>, ApiEr

let validate_req = json_request::<ValidateRequest>(&mut req).await?;
let state = get_state(&req);
json_response(StatusCode::OK, state.service.validate(validate_req))
json_response(StatusCode::OK, state.service.validate(validate_req).await?)
}

/// Call into this before attaching a tenant to a pageserver, to acquire a generation number
Expand Down
70 changes: 66 additions & 4 deletions storage_controller/src/persistence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use self::split_state::SplitState;
use diesel::pg::PgConnection;
use diesel::prelude::*;
use diesel::Connection;
use itertools::Itertools;
use pageserver_api::controller_api::MetadataHealthRecord;
use pageserver_api::controller_api::ShardSchedulingPolicy;
use pageserver_api::controller_api::{NodeSchedulingPolicy, PlacementPolicy};
Expand Down Expand Up @@ -91,7 +92,8 @@ pub(crate) enum DatabaseOperation {
Detach,
ReAttach,
IncrementGeneration,
PeekGenerations,
TenantGenerations,
ShardGenerations,
ListTenantShards,
InsertTenantShards,
UpdateTenantShard,
Expand Down Expand Up @@ -544,13 +546,13 @@ impl Persistence {
/// If the tenant doesn't exist, an empty vector is returned.
///
/// Output is sorted by shard number
pub(crate) async fn peek_generations(
pub(crate) async fn tenant_generations(
&self,
filter_tenant_id: TenantId,
) -> Result<Vec<ShardGenerationState>, DatabaseError> {
use crate::schema::tenant_shards::dsl::*;
let rows = self
.with_measured_conn(DatabaseOperation::PeekGenerations, move |conn| {
.with_measured_conn(DatabaseOperation::TenantGenerations, move |conn| {
let result = tenant_shards
.filter(tenant_id.eq(filter_tenant_id.to_string()))
.select(TenantShardPersistence::as_select())
Expand All @@ -572,6 +574,64 @@ impl Persistence {
.collect())
}

/// Read the generation number of specific tenant shards
///
/// Output is unsorted. Output may not include values for all inputs, if they are missing in the database.
pub(crate) async fn shard_generations(
&self,
mut tenant_shard_ids: impl Iterator<Item = &TenantShardId>,
) -> Result<Vec<(TenantShardId, Option<Generation>)>, DatabaseError> {
let mut rows = Vec::with_capacity(tenant_shard_ids.size_hint().0);

// We will chunk our input to avoid composing arbitrarily long `IN` clauses. Typically we are
// called with a single digit number of IDs, but in principle we could be called with tens
// of thousands (all the shards on one pageserver) from the generation validation API.
loop {
// A modest hardcoded chunk size to handle typical cases in a single query but never generate particularly
// large query strings.
let chunk_ids = tenant_shard_ids.by_ref().take(32);

// Compose a comma separated list of tuples for matching on (tenant_id, shard_number, shard_count)
let in_clause = chunk_ids
.map(|tsid| {
format!(
"('{}', {}, {})",
tsid.tenant_id, tsid.shard_number.0, tsid.shard_count.0
)
})
.join(",");

// We are done when our iterator gives us nothing to filter on
if in_clause.is_empty() {
break;
}

let chunk_rows = self
.with_measured_conn(DatabaseOperation::ShardGenerations, move |conn| {
// diesel doesn't support multi-column IN queries, so we compose raw SQL. No escaping is required because
// the inputs are strongly typed and cannot carry any user-supplied raw string content.
let result : Vec<TenantShardPersistence> = diesel::sql_query(
format!("SELECT * from tenant_shards where (tenant_id, shard_number, shard_count) in ({in_clause});").as_str()
).load(conn)?;

jcsp marked this conversation as resolved.
Show resolved Hide resolved
Ok(result)
})
.await?;
rows.extend(chunk_rows.into_iter())
}

Ok(rows
.into_iter()
.map(|tsp| {
(
tsp.get_tenant_shard_id()
.expect("Bad tenant ID in database"),
tsp.generation.map(|g| Generation::new(g as u32)),
)
})
.collect())
}

#[allow(non_local_definitions)]
/// For use when updating a persistent property of a tenant, such as its config or placement_policy.
///
Expand Down Expand Up @@ -941,7 +1001,9 @@ impl Persistence {
}

/// Parts of [`crate::tenant_shard::TenantShard`] that are stored durably
#[derive(Queryable, Selectable, Insertable, Serialize, Deserialize, Clone, Eq, PartialEq)]
#[derive(
QueryableByName, Queryable, Selectable, Insertable, Serialize, Deserialize, Clone, Eq, PartialEq,
)]
#[diesel(table_name = crate::schema::tenant_shards)]
pub(crate) struct TenantShardPersistence {
#[serde(default)]
Expand Down
3 changes: 3 additions & 0 deletions storage_controller/src/reconciler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use utils::failpoint_support;
use utils::generation::Generation;
use utils::id::{NodeId, TimelineId};
use utils::lsn::Lsn;
use utils::pausable_failpoint;
use utils::sync::gate::GateGuard;

use crate::compute_hook::{ComputeHook, NotifyError};
Expand Down Expand Up @@ -593,6 +594,8 @@ impl Reconciler {
notify_attempts += 1;
}

pausable_failpoint!("reconciler-live-migrate-post-notify");

// Downgrade the origin to secondary. If the tenant's policy is PlacementPolicy::Attached(0), then
// this location will be deleted in the general case reconciliation that runs after this.
let origin_secondary_conf = build_location_config(
Expand Down
91 changes: 64 additions & 27 deletions storage_controller/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1851,37 +1851,74 @@ impl Service {
Ok(response)
}

pub(crate) fn validate(&self, validate_req: ValidateRequest) -> ValidateResponse {
let locked = self.inner.read().unwrap();
pub(crate) async fn validate(
&self,
validate_req: ValidateRequest,
) -> Result<ValidateResponse, DatabaseError> {
// Fast in-memory check: we may reject validation on anything that doesn't match our
// in-memory generation for a shard
let in_memory_result = {
let mut in_memory_result = Vec::new();
let locked = self.inner.read().unwrap();
for req_tenant in validate_req.tenants {
if let Some(tenant_shard) = locked.tenants.get(&req_tenant.id) {
let valid = tenant_shard.generation == Some(Generation::new(req_tenant.gen));
tracing::info!(
"handle_validate: {}(gen {}): valid={valid} (latest {:?})",
req_tenant.id,
req_tenant.gen,
tenant_shard.generation
);

in_memory_result.push((req_tenant.id, Generation::new(req_tenant.gen), valid));
} else {
// This is legal: for example during a shard split the pageserver may still
// have deletions in its queue from the old pre-split shard, or after deletion
// of a tenant that was busy with compaction/gc while being deleted.
tracing::info!(
"Refusing deletion validation for missing shard {}",
req_tenant.id
);
}
}

in_memory_result
};

// Database calls to confirm validity for anything that passed the in-memory check. We must do this
// in case of controller split-brain, where some other controller process might have incremented the generation.
let db_generations = self
.persistence
.shard_generations(in_memory_result.iter().filter_map(|i| {
if i.2 {
Some(&i.0)
} else {
None
}
jcsp marked this conversation as resolved.
Show resolved Hide resolved
}))
.await?;
let db_generations = db_generations.into_iter().collect::<HashMap<_, _>>();

let mut response = ValidateResponse {
tenants: Vec::new(),
};

for req_tenant in validate_req.tenants {
if let Some(tenant_shard) = locked.tenants.get(&req_tenant.id) {
let valid = tenant_shard.generation == Some(Generation::new(req_tenant.gen));
tracing::info!(
"handle_validate: {}(gen {}): valid={valid} (latest {:?})",
req_tenant.id,
req_tenant.gen,
tenant_shard.generation
);
response.tenants.push(ValidateResponseTenant {
id: req_tenant.id,
valid,
});
for (tenant_shard_id, validate_generation, valid) in in_memory_result.into_iter() {
let valid = if valid {
let db_generation = db_generations.get(&tenant_shard_id);
db_generation == Some(&Some(validate_generation))
} else {
// After tenant deletion, we may approve any validation. This avoids
// spurious warnings on the pageserver if it has pending LSN updates
// at the point a deletion happens.
response.tenants.push(ValidateResponseTenant {
id: req_tenant.id,
valid: true,
});
}
// If in-memory state says it's invalid, trust that. It's always safe to fail a validation, at worst
// this prevents a pageserver from cleaning up an object in S3.
false
jcsp marked this conversation as resolved.
Show resolved Hide resolved
};

response.tenants.push(ValidateResponseTenant {
id: tenant_shard_id,
valid,
})
}
response

Ok(response)
}

pub(crate) async fn tenant_create(
Expand Down Expand Up @@ -3176,7 +3213,7 @@ impl Service {
// run concurrently with reconciliations, and it is not guaranteed that the node we find here
// will still be the latest when we're done: we will check generations again at the end of
// this function to handle that.
let generations = self.persistence.peek_generations(tenant_id).await?;
let generations = self.persistence.tenant_generations(tenant_id).await?;

if generations
.iter()
Expand Down Expand Up @@ -3233,7 +3270,7 @@ impl Service {
// Post-check: are all the generations of all the shards the same as they were initially? This proves that
// our remote operation executed on the latest generation and is therefore persistent.
{
let latest_generations = self.persistence.peek_generations(tenant_id).await?;
let latest_generations = self.persistence.tenant_generations(tenant_id).await?;
if latest_generations
.into_iter()
.map(
Expand Down
112 changes: 112 additions & 0 deletions test_runner/regress/test_storage_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -2330,3 +2330,115 @@ def has_hit_failpoint():
connect=0, # Disable retries: we want to see the 503
)
).timeline_create(PgVersion.NOT_SET, tenant_id, create_timeline_id)


def test_storage_controller_validate_during_migration(neon_env_builder: NeonEnvBuilder):
"""
A correctness edge case: while we are live migrating and a shard's generation is
visible to the Reconciler but not to the central Service, the generation validation
API should still prevent stale generations from doing deletions.
"""
neon_env_builder.num_pageservers = 2
neon_env_builder.enable_pageserver_remote_storage(s3_storage())
env = neon_env_builder.init_configs()
env.start()

TENANT_CONF = {
# small checkpointing and compaction targets to ensure we generate many upload operations
"checkpoint_distance": 128 * 1024,
"compaction_threshold": 1,
"compaction_target_size": 128 * 1024,
# disable background compaction and GC. We invoke it manually when we want it to happen.
"gc_period": "0s",
"compaction_period": "0s",
}

tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
env.neon_cli.create_tenant(tenant_id, timeline_id)
env.storage_controller.pageserver_api().set_tenant_config(tenant_id, TENANT_CONF)

# Write enough data that a compaction would do some work (deleting some L0s)
workload = Workload(env, tenant_id, timeline_id)
workload.init()
workload.write_rows(64)
for _i in range(0, 2):
workload.churn_rows(64, upload=False)

# Upload but don't compact
origin_pageserver = env.get_tenant_pageserver(tenant_id)
dest_ps_id = [p.id for p in env.pageservers if p.id != origin_pageserver.id][0]
origin_pageserver.http_client().timeline_checkpoint(
tenant_id, timeline_id, wait_until_uploaded=True, compact=False
)

# Start a compaction that will pause on a failpoint.
compaction_failpoint = "before-upload-index-pausable"
origin_pageserver.http_client().configure_failpoints((compaction_failpoint, "pause"))

# This failpoint can also cause migration code to time out trying to politely flush
# during migrations
origin_pageserver.allowed_errors.append(".*Timed out waiting for flush to remote storage.*")

try:
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
compact_fut = executor.submit(
origin_pageserver.http_client().timeline_compact,
tenant_id,
timeline_id,
wait_until_uploaded=True,
)

# Let the compaction start and then get stuck uploading an index: when we live migrate, the new generation's
# index will be initialized from the pre-compaction index, referencing layers that the compaction will try to delete
def has_hit_compaction_failpoint():
assert origin_pageserver.log_contains(f"at failpoint {compaction_failpoint}")

wait_until(10, 1, has_hit_compaction_failpoint)

# While the compaction is running, start a live migration which will pause long enough for the compaction to sleep,
# after incrementing generation and attaching the new location
migration_failpoint = "reconciler-live-migrate-post-notify"
env.storage_controller.configure_failpoints((migration_failpoint, "pause"))
migrate_fut = executor.submit(
env.storage_controller.tenant_shard_migrate,
TenantShardId(tenant_id, 0, 0),
dest_ps_id,
)

def has_hit_migration_failpoint():
assert env.storage_controller.log_contains(f"at failpoint {migration_failpoint}")

wait_until(10, 1, has_hit_migration_failpoint)

# Origin pageserver has succeeded with compaction before the migration completed. It has done all the writes it wanted to do in its own (stale) generation
origin_pageserver.http_client().configure_failpoints((compaction_failpoint, "off"))
compact_fut.result()
origin_pageserver.http_client().deletion_queue_flush(execute=True)

# Eventually migration completes
env.storage_controller.configure_failpoints((migration_failpoint, "off"))
migrate_fut.result()
except:
# Always disable 'pause' failpoints, even on failure, to avoid hanging in shutdown
env.storage_controller.configure_failpoints((migration_failpoint, "off"))
origin_pageserver.http_client().configure_failpoints((compaction_failpoint, "off"))
raise

# Ensure the destination of the migration writes an index, so that if it has corrupt state that is
# visible to the scrubber.
workload.write_rows(1, upload=False)
env.get_pageserver(dest_ps_id).http_client().timeline_checkpoint(
tenant_id, timeline_id, wait_until_uploaded=True, compact=False
)

# The destination of the live migration would now have a corrupt index (referencing deleted L0s) if
# the controller had not properly applied validation rules.
healthy, _summary = env.storage_scrubber.scan_metadata()
try:
log.info(f"scrubbed, healthy={healthy}")
assert healthy
except:
# On failures, we want to report them FAIL during the test, not as ERROR during teardown
neon_env_builder.enable_scrub_on_exit = False
raise
11 changes: 11 additions & 0 deletions test_runner/regress/test_storage_scrubber.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,13 @@ def test_scrubber_physical_gc_ancestors(
workload.init()
workload.write_rows(100)

# Issue a deletion queue flush so that the parent shard can't leave behind layers
# that will look like unexpected garbage to the scrubber
for pre_split_shard in env.storage_controller.locate(tenant_id):
env.get_pageserver(pre_split_shard["node_id"]).http_client().deletion_queue_flush(
execute=True
)

new_shard_count = 4
assert shard_count is None or new_shard_count > shard_count
shards = env.storage_controller.tenant_shard_split(tenant_id, shard_count=new_shard_count)
Expand Down Expand Up @@ -321,6 +328,10 @@ def test_scrubber_physical_gc_timeline_deletion(neon_env_builder: NeonEnvBuilder
workload.write_rows(100, upload=False)
workload.stop()

# Issue a deletion queue flush so that the parent shard can't leave behind layers
# that will look like unexpected garbage to the scrubber
env.get_tenant_pageserver(tenant_id).http_client().deletion_queue_flush(execute=True)

new_shard_count = 4
shards = env.storage_controller.tenant_shard_split(tenant_id, shard_count=new_shard_count)
for shard in shards:
Expand Down
Loading