Skip to content

Provision destination volume for snapshot blocks #1752

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Oct 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions common/src/sql/dbinit.sql
Original file line number Diff line number Diff line change
Expand Up @@ -654,6 +654,8 @@ CREATE TABLE omicron.public.disk (
/* Indicates that the object has been deleted */
/* This is redundant for Disks, but we keep it here for consistency. */
time_deleted TIMESTAMPTZ,

/* child resource generation number, per RFD 192 */
rcgen INT NOT NULL,

/* Every Disk is in exactly one Project at a time. */
Expand Down Expand Up @@ -772,6 +774,9 @@ CREATE TABLE omicron.public.snapshot (
/* Every Snapshot consists of a root volume */
volume_id UUID NOT NULL,

/* Where will the scrubbed blocks eventually land? */
destination_volume_id UUID,

gen INT NOT NULL,
state omicron.public.snapshot_state NOT NULL,
block_size omicron.public.block_size NOT NULL,
Expand Down
2 changes: 2 additions & 0 deletions nexus/db-model/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ table! {
disk_id -> Uuid,
volume_id -> Uuid,

destination_volume_id -> Nullable<Uuid>,

gen -> Int8,
state -> crate::SnapshotStateEnum,
block_size -> crate::BlockSizeEnum,
Expand Down
4 changes: 4 additions & 0 deletions nexus/db-model/src/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,13 @@ pub struct Snapshot {
pub identity: SnapshotIdentity,

pub project_id: Uuid,
// which disk is this a snapshot of
pub disk_id: Uuid,
pub volume_id: Uuid,

// destination of all snapshot blocks
pub destination_volume_id: Option<Uuid>,

pub gen: Generation,
pub state: SnapshotState,
pub block_size: BlockSize,
Expand Down
24 changes: 14 additions & 10 deletions nexus/src/app/disk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -400,23 +400,24 @@ impl super::Nexus {
project_name: &Name,
params: &params::SnapshotCreate,
) -> CreateResult<db::model::Snapshot> {
let (authz_silo, authz_org) =
let authz_silo: authz::Silo;
let _authz_org: authz::Organization;
let authz_project: authz::Project;
let authz_disk: authz::Disk;

(authz_silo, _authz_org, authz_project, authz_disk) =
LookupPath::new(opctx, &self.db_datastore)
.organization_name(organization_name)
.lookup_for(authz::Action::ListChildren)
.project_name(project_name)
.disk_name(&db::model::Name(params.disk.clone()))
.lookup_for(authz::Action::Read)
.await?;

let (.., authz_project) = LookupPath::new(opctx, &self.db_datastore)
.organization_name(organization_name)
.project_name(project_name)
.lookup_for(authz::Action::ListChildren)
.await?;

let saga_params = sagas::snapshot_create::Params {
serialized_authn: authn::saga::Serialized::for_opctx(opctx),
silo_id: authz_silo.id(),
organization_id: authz_org.id(),
project_id: authz_project.id(),
disk_id: authz_disk.id(),
create_params: params.clone(),
};

Expand Down Expand Up @@ -510,8 +511,11 @@ impl super::Nexus {
.project_delete_snapshot(opctx, &authz_snapshot, &db_snapshot)
.await?;

// Kick off volume deletion saga
// Kick off volume deletion saga(s)
self.volume_delete(db_snapshot.volume_id).await?;
if let Some(volume_id) = db_snapshot.destination_volume_id {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like a couple of these steps maybe need to happen transactionally? If we crash either after L511 or L514, it seems like we'd wind up not having cleaned stuff up.

I can understand punting on this but it seems like we should track this somewhere (even if just a TODO).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a problem, yeah. Nexus::project_delete_snapshot is directly called from an HTTP endpoint, so it's not a case where a saga node would be replayed during a crash. I'm not sure how to solve this, and I feel like this may be a more general problem. I'll give it some thought.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be addressed by #2090

self.volume_delete(volume_id).await?;
}

Ok(())
}
Expand Down
262 changes: 262 additions & 0 deletions nexus/src/app/sagas/common_storage.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,262 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.

//! Functions common to interacting with the Crucible agent in saga actions

use super::*;

use crate::db;
use crate::db::identity::Asset;
use anyhow::anyhow;
use crucible_agent_client::{
types::{CreateRegion, RegionId, State as RegionState},
Client as CrucibleAgentClient,
};
use futures::StreamExt;
use omicron_common::api::external::Error;
use omicron_common::backoff::{self, BackoffError};
use slog::Logger;

// Arbitrary limit on concurrency, for operations issued on multiple regions
// within a disk at the same time.
const MAX_CONCURRENT_REGION_REQUESTS: usize = 3;

/// Call out to Crucible agent and perform region creation.
pub async fn ensure_region_in_dataset(
log: &Logger,
dataset: &db::model::Dataset,
region: &db::model::Region,
) -> Result<crucible_agent_client::types::Region, Error> {
let url = format!("http://{}", dataset.address());
let client = CrucibleAgentClient::new(&url);

let region_request = CreateRegion {
block_size: region.block_size().to_bytes(),
extent_count: region.extent_count().try_into().unwrap(),
extent_size: region.blocks_per_extent().try_into().unwrap(),
// TODO: Can we avoid casting from UUID to string?
// NOTE: This'll require updating the crucible agent client.
id: RegionId(region.id().to_string()),
encrypted: region.encrypted(),
cert_pem: None,
key_pem: None,
root_pem: None,
};

let create_region = || async {
let region = client
.region_create(&region_request)
.await
.map_err(|e| BackoffError::Permanent(e.into()))?;
match region.state {
RegionState::Requested => Err(BackoffError::transient(anyhow!(
"Region creation in progress"
))),
RegionState::Created => Ok(region),
_ => Err(BackoffError::Permanent(anyhow!(
"Failed to create region, unexpected state: {:?}",
region.state
))),
}
};

let log_create_failure = |_, delay| {
warn!(
log,
"Region requested, not yet created. Retrying in {:?}", delay
);
};

let region = backoff::retry_notify(
backoff::internal_service_policy(),
create_region,
log_create_failure,
)
.await
.map_err(|e| Error::internal_error(&e.to_string()))?;

Ok(region.into_inner())
}

pub async fn ensure_all_datasets_and_regions(
log: &Logger,
datasets_and_regions: Vec<(db::model::Dataset, db::model::Region)>,
) -> Result<
Vec<(db::model::Dataset, crucible_agent_client::types::Region)>,
ActionError,
> {
let request_count = datasets_and_regions.len();

// Allocate regions, and additionally return the dataset that the region was
// allocated in.
let datasets_and_regions: Vec<(
db::model::Dataset,
crucible_agent_client::types::Region,
)> = futures::stream::iter(datasets_and_regions)
.map(|(dataset, region)| async move {
match ensure_region_in_dataset(log, &dataset, &region).await {
Ok(result) => Ok((dataset, result)),
Err(e) => Err(e),
}
})
// Execute the allocation requests concurrently.
.buffer_unordered(std::cmp::min(
request_count,
MAX_CONCURRENT_REGION_REQUESTS,
))
.collect::<Vec<
Result<
(db::model::Dataset, crucible_agent_client::types::Region),
Error,
>,
>>()
.await
.into_iter()
.collect::<Result<
Vec<(db::model::Dataset, crucible_agent_client::types::Region)>,
Error,
>>()
.map_err(ActionError::action_failed)?;

// Assert each region has the same block size, otherwise Volume creation
// will fail.
let all_region_have_same_block_size = datasets_and_regions
.windows(2)
.all(|w| w[0].1.block_size == w[1].1.block_size);

if !all_region_have_same_block_size {
return Err(ActionError::action_failed(Error::internal_error(
"volume creation will fail due to block size mismatch",
)));
}

Ok(datasets_and_regions)
}

// Given a list of datasets and regions, send DELETE calls to the datasets
// corresponding Crucible Agent for each region.
pub(super) async fn delete_crucible_regions(
datasets_and_regions: Vec<(db::model::Dataset, db::model::Region)>,
) -> Result<(), Error> {
let request_count = datasets_and_regions.len();
if request_count == 0 {
return Ok(());
}

futures::stream::iter(datasets_and_regions)
.map(|(dataset, region)| async move {
let url = format!("http://{}", dataset.address());
let client = CrucibleAgentClient::new(&url);
let id = RegionId(region.id().to_string());
client.region_delete(&id).await.map_err(|e| match e {
crucible_agent_client::Error::ErrorResponse(rv) => {
match rv.status() {
http::StatusCode::SERVICE_UNAVAILABLE => {
Error::unavail(&rv.message)
}
status if status.is_client_error() => {
Error::invalid_request(&rv.message)
}
_ => Error::internal_error(&rv.message),
}
}
_ => Error::internal_error(
"unexpected failure during `delete_crucible_regions`",
),
})
})
// Execute the allocation requests concurrently.
.buffer_unordered(std::cmp::min(
request_count,
MAX_CONCURRENT_REGION_REQUESTS,
))
.collect::<Vec<Result<_, _>>>()
.await
.into_iter()
.collect::<Result<Vec<_>, _>>()?;

Ok(())
}

// Given a list of datasets and region snapshots, send DELETE calls to the
// datasets corresponding Crucible Agent for each running read-only downstairs
// and snapshot.
pub(super) async fn delete_crucible_snapshots(
datasets_and_snapshots: Vec<(
db::model::Dataset,
db::model::RegionSnapshot,
)>,
) -> Result<(), Error> {
let request_count = datasets_and_snapshots.len();
if request_count == 0 {
return Ok(());
}

futures::stream::iter(datasets_and_snapshots)
.map(|(dataset, region_snapshot)| async move {
let url = format!("http://{}", dataset.address());
let client = CrucibleAgentClient::new(&url);

// delete running snapshot
client
.region_delete_running_snapshot(
&RegionId(region_snapshot.region_id.to_string()),
&region_snapshot.snapshot_id.to_string(),
)
.await
.map_err(|e| match e {
crucible_agent_client::Error::ErrorResponse(rv) => {
match rv.status() {
http::StatusCode::SERVICE_UNAVAILABLE => {
Error::unavail(&rv.message)
}
status if status.is_client_error() => {
Error::invalid_request(&rv.message)
}
_ => Error::internal_error(&rv.message),
}
}
_ => Error::internal_error(
"unexpected failure during `region_delete_running_snapshot`",
),
})?;

// delete snapshot
client
.region_delete_snapshot(
&RegionId(region_snapshot.region_id.to_string()),
&region_snapshot.snapshot_id.to_string(),
)
.await
.map_err(|e| match e {
crucible_agent_client::Error::ErrorResponse(rv) => {
match rv.status() {
http::StatusCode::SERVICE_UNAVAILABLE => {
Error::unavail(&rv.message)
}
status if status.is_client_error() => {
Error::invalid_request(&rv.message)
}
_ => Error::internal_error(&rv.message),
}
}
_ => Error::internal_error(
"unexpected failure during `region_delete_snapshot`",
),
})?;

Ok(())
})
// Execute the allocation requests concurrently.
.buffer_unordered(std::cmp::min(
request_count,
MAX_CONCURRENT_REGION_REQUESTS,
))
.collect::<Vec<Result<(), Error>>>()
.await
.into_iter()
.collect::<Result<Vec<_>, _>>()?;

Ok(())
}
Loading