-
Notifications
You must be signed in to change notification settings - Fork 45
Provision destination volume for snapshot blocks #1752
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
2baae91
110623e
d2d3aef
a554ef5
89d30cb
4ebe297
c6358a7
6ed3354
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -400,23 +400,24 @@ impl super::Nexus { | |
project_name: &Name, | ||
params: ¶ms::SnapshotCreate, | ||
) -> CreateResult<db::model::Snapshot> { | ||
let (authz_silo, authz_org) = | ||
let authz_silo: authz::Silo; | ||
let _authz_org: authz::Organization; | ||
let authz_project: authz::Project; | ||
let authz_disk: authz::Disk; | ||
|
||
(authz_silo, _authz_org, authz_project, authz_disk) = | ||
LookupPath::new(opctx, &self.db_datastore) | ||
.organization_name(organization_name) | ||
.lookup_for(authz::Action::ListChildren) | ||
.project_name(project_name) | ||
.disk_name(&db::model::Name(params.disk.clone())) | ||
.lookup_for(authz::Action::Read) | ||
.await?; | ||
|
||
let (.., authz_project) = LookupPath::new(opctx, &self.db_datastore) | ||
.organization_name(organization_name) | ||
.project_name(project_name) | ||
.lookup_for(authz::Action::ListChildren) | ||
.await?; | ||
|
||
let saga_params = sagas::snapshot_create::Params { | ||
serialized_authn: authn::saga::Serialized::for_opctx(opctx), | ||
silo_id: authz_silo.id(), | ||
organization_id: authz_org.id(), | ||
project_id: authz_project.id(), | ||
disk_id: authz_disk.id(), | ||
create_params: params.clone(), | ||
}; | ||
|
||
|
@@ -510,8 +511,11 @@ impl super::Nexus { | |
.project_delete_snapshot(opctx, &authz_snapshot, &db_snapshot) | ||
.await?; | ||
|
||
// Kick off volume deletion saga | ||
// Kick off volume deletion saga(s) | ||
self.volume_delete(db_snapshot.volume_id).await?; | ||
davepacheco marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if let Some(volume_id) = db_snapshot.destination_volume_id { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It seems like a couple of these steps maybe need to happen transactionally? If we crash either after L511 or L514, it seems like we'd wind up not having cleaned stuff up. I can understand punting on this but it seems like we should track this somewhere (even if just a TODO). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a problem, yeah. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should be addressed by #2090 |
||
self.volume_delete(volume_id).await?; | ||
} | ||
|
||
Ok(()) | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,262 @@ | ||
// This Source Code Form is subject to the terms of the Mozilla Public | ||
// License, v. 2.0. If a copy of the MPL was not distributed with this | ||
// file, You can obtain one at https://mozilla.org/MPL/2.0/. | ||
|
||
//! Functions common to interacting with the Crucible agent in saga actions | ||
|
||
use super::*; | ||
|
||
use crate::db; | ||
use crate::db::identity::Asset; | ||
use anyhow::anyhow; | ||
use crucible_agent_client::{ | ||
types::{CreateRegion, RegionId, State as RegionState}, | ||
Client as CrucibleAgentClient, | ||
}; | ||
use futures::StreamExt; | ||
use omicron_common::api::external::Error; | ||
use omicron_common::backoff::{self, BackoffError}; | ||
use slog::Logger; | ||
|
||
// Arbitrary limit on concurrency, for operations issued on multiple regions | ||
// within a disk at the same time. | ||
const MAX_CONCURRENT_REGION_REQUESTS: usize = 3; | ||
|
||
/// Call out to Crucible agent and perform region creation. | ||
pub async fn ensure_region_in_dataset( | ||
log: &Logger, | ||
dataset: &db::model::Dataset, | ||
region: &db::model::Region, | ||
) -> Result<crucible_agent_client::types::Region, Error> { | ||
let url = format!("http://{}", dataset.address()); | ||
let client = CrucibleAgentClient::new(&url); | ||
|
||
let region_request = CreateRegion { | ||
block_size: region.block_size().to_bytes(), | ||
extent_count: region.extent_count().try_into().unwrap(), | ||
extent_size: region.blocks_per_extent().try_into().unwrap(), | ||
// TODO: Can we avoid casting from UUID to string? | ||
// NOTE: This'll require updating the crucible agent client. | ||
id: RegionId(region.id().to_string()), | ||
encrypted: region.encrypted(), | ||
cert_pem: None, | ||
key_pem: None, | ||
root_pem: None, | ||
}; | ||
|
||
let create_region = || async { | ||
let region = client | ||
.region_create(®ion_request) | ||
.await | ||
.map_err(|e| BackoffError::Permanent(e.into()))?; | ||
match region.state { | ||
RegionState::Requested => Err(BackoffError::transient(anyhow!( | ||
"Region creation in progress" | ||
))), | ||
RegionState::Created => Ok(region), | ||
_ => Err(BackoffError::Permanent(anyhow!( | ||
"Failed to create region, unexpected state: {:?}", | ||
region.state | ||
))), | ||
} | ||
}; | ||
|
||
let log_create_failure = |_, delay| { | ||
warn!( | ||
log, | ||
"Region requested, not yet created. Retrying in {:?}", delay | ||
); | ||
}; | ||
|
||
let region = backoff::retry_notify( | ||
backoff::internal_service_policy(), | ||
create_region, | ||
log_create_failure, | ||
) | ||
.await | ||
.map_err(|e| Error::internal_error(&e.to_string()))?; | ||
|
||
Ok(region.into_inner()) | ||
} | ||
|
||
pub async fn ensure_all_datasets_and_regions( | ||
log: &Logger, | ||
datasets_and_regions: Vec<(db::model::Dataset, db::model::Region)>, | ||
) -> Result< | ||
Vec<(db::model::Dataset, crucible_agent_client::types::Region)>, | ||
ActionError, | ||
> { | ||
let request_count = datasets_and_regions.len(); | ||
|
||
// Allocate regions, and additionally return the dataset that the region was | ||
// allocated in. | ||
let datasets_and_regions: Vec<( | ||
db::model::Dataset, | ||
crucible_agent_client::types::Region, | ||
)> = futures::stream::iter(datasets_and_regions) | ||
.map(|(dataset, region)| async move { | ||
match ensure_region_in_dataset(log, &dataset, ®ion).await { | ||
Ok(result) => Ok((dataset, result)), | ||
Err(e) => Err(e), | ||
} | ||
}) | ||
// Execute the allocation requests concurrently. | ||
.buffer_unordered(std::cmp::min( | ||
request_count, | ||
MAX_CONCURRENT_REGION_REQUESTS, | ||
)) | ||
.collect::<Vec< | ||
Result< | ||
(db::model::Dataset, crucible_agent_client::types::Region), | ||
Error, | ||
>, | ||
>>() | ||
.await | ||
.into_iter() | ||
.collect::<Result< | ||
Vec<(db::model::Dataset, crucible_agent_client::types::Region)>, | ||
Error, | ||
>>() | ||
.map_err(ActionError::action_failed)?; | ||
|
||
// Assert each region has the same block size, otherwise Volume creation | ||
// will fail. | ||
let all_region_have_same_block_size = datasets_and_regions | ||
.windows(2) | ||
.all(|w| w[0].1.block_size == w[1].1.block_size); | ||
|
||
if !all_region_have_same_block_size { | ||
return Err(ActionError::action_failed(Error::internal_error( | ||
"volume creation will fail due to block size mismatch", | ||
))); | ||
} | ||
|
||
Ok(datasets_and_regions) | ||
} | ||
|
||
// Given a list of datasets and regions, send DELETE calls to the datasets | ||
// corresponding Crucible Agent for each region. | ||
pub(super) async fn delete_crucible_regions( | ||
datasets_and_regions: Vec<(db::model::Dataset, db::model::Region)>, | ||
) -> Result<(), Error> { | ||
let request_count = datasets_and_regions.len(); | ||
if request_count == 0 { | ||
return Ok(()); | ||
} | ||
|
||
futures::stream::iter(datasets_and_regions) | ||
.map(|(dataset, region)| async move { | ||
let url = format!("http://{}", dataset.address()); | ||
let client = CrucibleAgentClient::new(&url); | ||
let id = RegionId(region.id().to_string()); | ||
client.region_delete(&id).await.map_err(|e| match e { | ||
crucible_agent_client::Error::ErrorResponse(rv) => { | ||
match rv.status() { | ||
http::StatusCode::SERVICE_UNAVAILABLE => { | ||
Error::unavail(&rv.message) | ||
} | ||
status if status.is_client_error() => { | ||
Error::invalid_request(&rv.message) | ||
} | ||
_ => Error::internal_error(&rv.message), | ||
} | ||
} | ||
_ => Error::internal_error( | ||
"unexpected failure during `delete_crucible_regions`", | ||
), | ||
}) | ||
}) | ||
// Execute the allocation requests concurrently. | ||
.buffer_unordered(std::cmp::min( | ||
request_count, | ||
MAX_CONCURRENT_REGION_REQUESTS, | ||
)) | ||
.collect::<Vec<Result<_, _>>>() | ||
.await | ||
.into_iter() | ||
.collect::<Result<Vec<_>, _>>()?; | ||
|
||
Ok(()) | ||
} | ||
|
||
// Given a list of datasets and region snapshots, send DELETE calls to the | ||
// datasets corresponding Crucible Agent for each running read-only downstairs | ||
// and snapshot. | ||
pub(super) async fn delete_crucible_snapshots( | ||
datasets_and_snapshots: Vec<( | ||
db::model::Dataset, | ||
db::model::RegionSnapshot, | ||
)>, | ||
) -> Result<(), Error> { | ||
let request_count = datasets_and_snapshots.len(); | ||
if request_count == 0 { | ||
return Ok(()); | ||
} | ||
|
||
futures::stream::iter(datasets_and_snapshots) | ||
.map(|(dataset, region_snapshot)| async move { | ||
let url = format!("http://{}", dataset.address()); | ||
let client = CrucibleAgentClient::new(&url); | ||
|
||
// delete running snapshot | ||
client | ||
.region_delete_running_snapshot( | ||
&RegionId(region_snapshot.region_id.to_string()), | ||
®ion_snapshot.snapshot_id.to_string(), | ||
) | ||
.await | ||
.map_err(|e| match e { | ||
crucible_agent_client::Error::ErrorResponse(rv) => { | ||
match rv.status() { | ||
http::StatusCode::SERVICE_UNAVAILABLE => { | ||
Error::unavail(&rv.message) | ||
} | ||
status if status.is_client_error() => { | ||
Error::invalid_request(&rv.message) | ||
} | ||
_ => Error::internal_error(&rv.message), | ||
} | ||
} | ||
_ => Error::internal_error( | ||
"unexpected failure during `region_delete_running_snapshot`", | ||
), | ||
})?; | ||
|
||
// delete snapshot | ||
client | ||
.region_delete_snapshot( | ||
&RegionId(region_snapshot.region_id.to_string()), | ||
®ion_snapshot.snapshot_id.to_string(), | ||
) | ||
.await | ||
.map_err(|e| match e { | ||
crucible_agent_client::Error::ErrorResponse(rv) => { | ||
match rv.status() { | ||
http::StatusCode::SERVICE_UNAVAILABLE => { | ||
Error::unavail(&rv.message) | ||
} | ||
status if status.is_client_error() => { | ||
Error::invalid_request(&rv.message) | ||
} | ||
_ => Error::internal_error(&rv.message), | ||
} | ||
} | ||
_ => Error::internal_error( | ||
"unexpected failure during `region_delete_snapshot`", | ||
), | ||
})?; | ||
|
||
Ok(()) | ||
}) | ||
// Execute the allocation requests concurrently. | ||
.buffer_unordered(std::cmp::min( | ||
request_count, | ||
MAX_CONCURRENT_REGION_REQUESTS, | ||
)) | ||
.collect::<Vec<Result<(), Error>>>() | ||
.await | ||
.into_iter() | ||
.collect::<Result<Vec<_>, _>>()?; | ||
|
||
Ok(()) | ||
} |
Uh oh!
There was an error while loading. Please reload this page.