Skip to content

update to steno 0.2.0 #1532

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 40 commits into from
Aug 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
7b153d9
it builds (by commenting out nearly all the code)
davepacheco Jul 29, 2022
8286b76
disk create saga compiles
davepacheco Jul 29, 2022
238fe67
caller of disk_create saga builds now too
davepacheco Jul 29, 2022
7eae546
fix up some of the database interactions (build only)
davepacheco Jul 29, 2022
3289117
recovery tests build
davepacheco Jul 29, 2022
030a485
minor cleanup
davepacheco Jul 29, 2022
2e7fb5b
two sagas converted, ah ah ah (compiling only)
davepacheco Jul 29, 2022
8026615
three sagas almost building, ah ah ah
davepacheco Jul 29, 2022
3460b94
they all build
davepacheco Jul 30, 2022
ddee869
tests build too
davepacheco Jul 30, 2022
dd454ff
improve saga error reporting
davepacheco Jul 30, 2022
710ff8e
fix steno package
davepacheco Jul 30, 2022
1f08c85
checking some dummy tests for future debugging
davepacheco Jul 30, 2022
9309c44
remove those tests
davepacheco Jul 30, 2022
c53a035
tests pass (still needs much cleanup work)
davepacheco Jul 30, 2022
9dc035e
Merge remote-tracking branch 'origin/main' into steno-update
davepacheco Aug 1, 2022
2913263
clean up error handling (one XXX)
davepacheco Aug 1, 2022
c619e05
update steno
davepacheco Aug 1, 2022
001005e
remove stale XXX
davepacheco Aug 1, 2022
12915d6
NIC nodes should use subsaga to pass index
davepacheco Aug 1, 2022
f5c8ab7
simplify subsaga a bit further
davepacheco Aug 1, 2022
7dc1538
external IP node should use subsaga with params to pass index
davepacheco Aug 1, 2022
b1b7eee
disk nodes could pass index via subsaga params
davepacheco Aug 1, 2022
7c9af66
update Steno
davepacheco Aug 1, 2022
0767d1c
more cleanup
davepacheco Aug 2, 2022
b312587
include saga name in log, database
davepacheco Aug 2, 2022
33951e2
review nits
davepacheco Aug 2, 2022
fe5b9df
refer to issue about looking up things multiple times by name
davepacheco Aug 2, 2022
0c4dc30
update steno
davepacheco Aug 2, 2022
6fa454d
Merge remote-tracking branch 'origin/main' into steno-update
davepacheco Aug 2, 2022
c185e71
update steno
davepacheco Aug 3, 2022
32ecbac
Merge remote-tracking branch 'origin/main' into steno-update
davepacheco Aug 3, 2022
96dba63
review feedback
davepacheco Aug 3, 2022
268c1c8
switch steno back to "main"
davepacheco Aug 3, 2022
7a61dee
Revert "switch steno back to "main""
davepacheco Aug 3, 2022
d078a59
switch internal deps to branch "main"
davepacheco Aug 3, 2022
9edc90b
Cargo.lock change that seems like it should work but fails explosively
davepacheco Aug 3, 2022
78fc50a
Revert "Cargo.lock change that seems like it should work but fails ex…
davepacheco Aug 3, 2022
67e3e30
add patch to enable build
davepacheco Aug 3, 2022
3142807
Merge remote-tracking branch 'origin/main' into steno-update
davepacheco Aug 4, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
199 changes: 46 additions & 153 deletions Cargo.lock

Large diffs are not rendered by default.

22 changes: 22 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,28 @@ opt-level = 3
[profile.release]
panic = "abort"

# The following patch is needed to temporarily to resolve a cyclic dependency +
# breaking change.
#
# Steno recently had a breaking change (steno#29). omicron-common uses steno.
# Almost all of the targets in this repo use the copy of omicron_common that is
# itself in this repo. That one has been updated to use a newer steno.
# However, omicron-sled-agent (also in this repo) pulls in crucible, which pulls
# in omicron-common _not_ from this repo (since it can't -- it's in another
# repo) but rather from "main". As a result, if we try to build everything in
# this repo, we'll wind up with two copies of omicron-common. One of them will
# be updated to use the new steno. The other one will not. But both depend on
# steno branch "main".
#
# The workaround here is that when we build crucible, we override its
# omicron-common dependency to point to the local path one, which has been
# updated for the new Steno.
#
# Once we land this onto "main", we can immediately remove this workaround
# because any subsequent build will pick up the updated omicron-common.
[patch."https://github.com/oxidecomputer/omicron"]
omicron-common = { path = "./common" }

#
# It's common during development to use a local copy of dropshot, propolis
# or steno in the parent directory. If you want to use those, uncomment
Expand Down
6 changes: 3 additions & 3 deletions common/src/api/external/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ impl Error {
/// `context` prepended to it to provide more context
///
/// If the error has no internal message, then it is returned unchanged.
fn prepend_internal_message<C>(self, context: C) -> Error
pub fn internal_context<C>(self, context: C) -> Error
where
C: Display + Send + Sync + 'static,
{
Expand Down Expand Up @@ -452,15 +452,15 @@ impl<T> InternalContext<T> for Result<T, Error> {
where
C: Display + Send + Sync + 'static,
{
self.map_err(|error| error.prepend_internal_message(context))
self.map_err(|error| error.internal_context(context))
}

fn with_internal_context<C, F>(self, make_context: F) -> Result<T, Error>
where
C: Display + Send + Sync + 'static,
F: FnOnce() -> C,
{
self.map_err(|error| error.prepend_internal_message(make_context()))
self.map_err(|error| error.internal_context(make_context()))
}
}

Expand Down
2 changes: 1 addition & 1 deletion common/src/api/external/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -901,7 +901,7 @@ impl From<steno::SagaView> for Saga {
pub enum SagaState {
Running,
Succeeded,
Failed { error_node_name: String, error_info: SagaErrorInfo },
Failed { error_node_name: steno::NodeName, error_info: SagaErrorInfo },
}

#[derive(Clone, Debug, Serialize, JsonSchema)]
Expand Down
10 changes: 5 additions & 5 deletions common/src/sql/dbinit.sql
Original file line number Diff line number Diff line change
Expand Up @@ -1184,12 +1184,12 @@ CREATE TABLE omicron.public.saga (
id UUID PRIMARY KEY,
/* unique id of the creator */
creator UUID NOT NULL,
/* name of the saga template name being run */
template_name STRING(127) NOT NULL,
/* time the saga was started */
time_created TIMESTAMPTZ NOT NULL,
/* saga parameters */
saga_params JSONB NOT NULL,
/* saga name */
name STRING(128) NOT NULL,
/* saga DAG (includes params and name) */
saga_dag JSONB NOT NULL,

/*
* TODO:
Expand All @@ -1214,7 +1214,7 @@ CREATE UNIQUE INDEX ON omicron.public.saga (

/*
* TODO more indexes for Saga?
* - Debugging and/or reporting: saga_template_name? creator?
* - Debugging and/or reporting: saga_name? creator?
*/
/*
* TODO: This is a data-carrying enum, see note on disk_state.
Expand Down
27 changes: 18 additions & 9 deletions nexus/db-model/src/saga_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,26 +184,35 @@ impl FromSql<SagaCachedStateEnum, Pg> for SagaCachedState {
pub struct Saga {
pub id: SagaId,
pub creator: SecId,
pub template_name: String,
pub time_created: chrono::DateTime<chrono::Utc>,
pub saga_params: serde_json::Value,
pub name: String,
pub saga_dag: serde_json::Value,
pub saga_state: SagaCachedState,
pub current_sec: Option<SecId>,
pub adopt_generation: super::Generation,
pub adopt_time: chrono::DateTime<chrono::Utc>,
}

impl Saga {
pub fn new(id: SecId, params: steno::SagaCreateParams) -> Self {
pub fn new(creator: SecId, params: steno::SagaCreateParams) -> Self {
let now = chrono::Utc::now();

// This match will help us identify a case where Steno adds a new field
// to `SagaCreateParams` that we aren't persisting in the database. (If
// you're getting a compilation failure here, you need to figure out
// what to do with the new field. The assumption as of this writing is
// that we must store it into the database or we won't be able to
// properly recover the saga.)
let steno::SagaCreateParams { id, name, dag, state } = params;

Self {
id: params.id.into(),
creator: id,
template_name: params.template_name,
id: id.into(),
creator,
time_created: now,
saga_params: params.saga_params,
saga_state: params.state.into(),
current_sec: Some(id),
name: name.to_string(),
saga_dag: dag,
saga_state: state.into(),
current_sec: Some(creator),
adopt_generation: Generation::new().into(),
adopt_time: now,
}
Expand Down
4 changes: 2 additions & 2 deletions nexus/db-model/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,9 +298,9 @@ table! {
saga (id) {
id -> Uuid,
creator -> Uuid,
template_name -> Text,
time_created -> Timestamptz,
saga_params -> Jsonb,
name -> Text,
saga_dag -> Jsonb,
saga_state -> crate::saga_types::SagaCachedStateEnum,
current_sec -> Nullable<Uuid>,
adopt_generation -> Int8,
Expand Down
37 changes: 14 additions & 23 deletions nexus/src/app/disk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use omicron_common::api::external::CreateResult;
use omicron_common::api::external::DataPageParams;
use omicron_common::api::external::DeleteResult;
use omicron_common::api::external::Error;
use omicron_common::api::external::InternalContext;
use omicron_common::api::external::ListResultVec;
use omicron_common::api::external::LookupResult;
use omicron_common::api::external::LookupType;
Expand Down Expand Up @@ -68,8 +69,8 @@ impl super::Nexus {
});
}

// Reject disks where the MIN_DISK_SIZE_BYTES doesn't evenly divide
// the size
// Reject disks where the MIN_DISK_SIZE_BYTES doesn't evenly
// divide the size
if (params.size.to_bytes() % params::MIN_DISK_SIZE_BYTES as u64)
!= 0
{
Expand Down Expand Up @@ -145,8 +146,8 @@ impl super::Nexus {
});
}

// Reject disks where the MIN_DISK_SIZE_BYTES doesn't evenly divide
// the size
// Reject disks where the MIN_DISK_SIZE_BYTES doesn't evenly
// divide the size
if (params.size.to_bytes() % params::MIN_DISK_SIZE_BYTES as u64)
!= 0
{
Expand All @@ -161,23 +162,18 @@ impl super::Nexus {
}
}

let saga_params = Arc::new(sagas::disk_create::Params {
let saga_params = sagas::disk_create::Params {
serialized_authn: authn::saga::Serialized::for_opctx(opctx),
project_id: authz_project.id(),
create_params: params.clone(),
});
};
let saga_outputs = self
.execute_saga(
Arc::clone(&sagas::disk_create::SAGA_TEMPLATE),
sagas::disk_create::SAGA_NAME,
saga_params,
)
.execute_saga::<sagas::disk_create::SagaDiskCreate>(saga_params)
.await?;
let disk_created = saga_outputs
.lookup_output::<db::model::Disk>("created_disk")
.map_err(|e| Error::InternalError {
internal_message: e.to_string(),
})?;
.lookup_node_output::<db::model::Disk>("created_disk")
.map_err(|e| Error::internal_error(&format!("{:#}", &e)))
.internal_context("looking up output from disk create saga")?;
Ok(disk_created)
}

Expand Down Expand Up @@ -341,14 +337,9 @@ impl super::Nexus {
.await?;

let saga_params =
Arc::new(sagas::disk_delete::Params { disk_id: authz_disk.id() });
self.execute_saga(
Arc::clone(&sagas::disk_delete::SAGA_TEMPLATE),
sagas::disk_delete::SAGA_NAME,
saga_params,
)
.await?;

sagas::disk_delete::Params { disk_id: authz_disk.id() };
self.execute_saga::<sagas::disk_delete::SagaDiskDelete>(saga_params)
.await?;
Ok(())
}

Expand Down
27 changes: 12 additions & 15 deletions nexus/src/app/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,26 +120,25 @@ impl super::Nexus {
});
}

let saga_params = Arc::new(sagas::instance_create::Params {
let saga_params = sagas::instance_create::Params {
serialized_authn: authn::saga::Serialized::for_opctx(opctx),
organization_name: organization_name.clone().into(),
project_name: project_name.clone().into(),
project_id: authz_project.id(),
create_params: params.clone(),
});
};

let saga_outputs = self
.execute_saga(
Arc::clone(&sagas::instance_create::SAGA_TEMPLATE),
sagas::instance_create::SAGA_NAME,
.execute_saga::<sagas::instance_create::SagaInstanceCreate>(
saga_params,
)
.await?;
// TODO-error more context would be useful
let instance_id =
saga_outputs.lookup_output::<Uuid>("instance_id").map_err(|e| {
Error::InternalError { internal_message: e.to_string() }
})?;

let instance_id = saga_outputs
.lookup_node_output::<Uuid>("instance_id")
.map_err(|e| Error::internal_error(&format!("{:#}", &e)))
.internal_context("looking up output from instance create saga")?;

// TODO-correctness TODO-robustness TODO-design It's not quite correct
// to take this instance id and look it up again. It's possible that
// it's been modified or even deleted since the saga executed. In that
Expand Down Expand Up @@ -280,14 +279,12 @@ impl super::Nexus {
.await?;

// Kick off the migration saga
let saga_params = Arc::new(sagas::instance_migrate::Params {
let saga_params = sagas::instance_migrate::Params {
serialized_authn: authn::saga::Serialized::for_opctx(opctx),
instance_id: authz_instance.id(),
migrate_params: params,
});
self.execute_saga(
Arc::clone(&sagas::instance_migrate::SAGA_TEMPLATE),
sagas::instance_migrate::SAGA_NAME,
};
self.execute_saga::<sagas::instance_migrate::SagaInstanceMigrate>(
saga_params,
)
.await?;
Expand Down
2 changes: 1 addition & 1 deletion nexus/src/app/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ impl Nexus {
))),
db_datastore,
Arc::clone(&sec_client),
&sagas::ALL_TEMPLATES,
sagas::ACTION_REGISTRY.clone(),
);

*nexus.recovery_task.lock().unwrap() = Some(recovery_task);
Expand Down
57 changes: 31 additions & 26 deletions nexus/src/app/saga.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@

//! Saga management and execution

use super::sagas::NexusSaga;
use super::sagas::SagaInitError;
use super::sagas::ACTION_REGISTRY;
use crate::authz;
use crate::context::OpContext;
use crate::saga_interface::SagaContext;
Expand All @@ -17,10 +20,11 @@ use omicron_common::api::external::LookupResult;
use omicron_common::api::external::ResourceType;
use omicron_common::bail_unless;
use std::sync::Arc;
use steno::DagBuilder;
use steno::SagaDag;
use steno::SagaId;
use steno::SagaName;
use steno::SagaResultOk;
use steno::SagaTemplate;
use steno::SagaType;
use uuid::Uuid;

impl super::Nexus {
Expand Down Expand Up @@ -53,7 +57,7 @@ impl super::Nexus {
) -> LookupResult<external::Saga> {
opctx.authorize(authz::Action::Read, &authz::FLEET).await?;
self.sec_client
.saga_get(steno::SagaId::from(id))
.saga_get(SagaId::from(id))
.await
.map(external::Saga::from)
.map(Ok)
Expand All @@ -62,25 +66,23 @@ impl super::Nexus {
})?
}

/// Given a saga template and parameters, create a new saga and execute it.
pub(crate) async fn execute_saga<P, S>(
/// Given a saga type and parameters, create a new saga and execute it.
pub(crate) async fn execute_saga<N: NexusSaga>(
self: &Arc<Self>,
saga_template: Arc<SagaTemplate<S>>,
template_name: &str,
saga_params: Arc<P>,
) -> Result<SagaResultOk, Error>
where
S: SagaType<
ExecContextType = Arc<SagaContext>,
SagaParamsType = Arc<P>,
>,
// TODO-cleanup The bound `P: Serialize` should not be necessary because
// SagaParamsType must already impl Serialize.
P: serde::Serialize,
{
params: N::Params,
) -> Result<SagaResultOk, Error> {
let saga = {
let builder = DagBuilder::new(SagaName::new(N::NAME));
let dag = N::make_saga_dag(&params, builder)?;
let params = serde_json::to_value(&params).map_err(|e| {
SagaInitError::SerializeError(String::from("saga params"), e)
})?;
SagaDag::new(dag, params)
};

let saga_id = SagaId(Uuid::new_v4());
let saga_logger =
self.log.new(o!("template_name" => template_name.to_owned()));
self.log.new(o!("saga_name" => saga.saga_name().to_string()));
let saga_context = Arc::new(Arc::new(SagaContext::new(
Arc::clone(self),
saga_logger,
Expand All @@ -91,9 +93,8 @@ impl super::Nexus {
.saga_create(
saga_id,
saga_context,
saga_template,
template_name.to_owned(),
saga_params,
Arc::new(saga),
ACTION_REGISTRY.clone(),
)
.await
.context("creating saga")
Expand All @@ -112,10 +113,14 @@ impl super::Nexus {

let result = future.await;
result.kind.map_err(|saga_error| {
saga_error.error_source.convert::<Error>().unwrap_or_else(|e| {
// TODO-error more context would be useful
Error::InternalError { internal_message: e.to_string() }
})
saga_error
.error_source
.convert::<Error>()
.unwrap_or_else(|e| Error::internal_error(&e.to_string()))
.internal_context(format!(
"saga error at node {:?}",
saga_error.error_node_name
))
})
}
}
Loading