Skip to content

Commit

Permalink
Re-apply unhandled operations during startup of materializer service (#…
Browse files Browse the repository at this point in the history
…623)

* Store method to get all un-indexed operation ids

* Pick up un-indexed operations when starting materializer service, add a test

* Add entry to CHANGELOG.md
  • Loading branch information
adzialocha authored Jun 16, 2024
1 parent 2ce83d0 commit 34d7e43
Show file tree
Hide file tree
Showing 3 changed files with 160 additions and 37 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

- Re-materialize blobs which were only partially written to disc due to node crash [#618](https://github.com/p2panda/aquadoggo/pull/618)
- Include all logs for target schemas during replication [#620](https://github.com/p2panda/aquadoggo/pull/620)
- Re-apply unhandled operations during startup of materializer service [#623](https://github.com/p2panda/aquadoggo/pull/623)

### Added

Expand Down
24 changes: 24 additions & 0 deletions aquadoggo/src/db/stores/operation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,30 @@ fn group_and_parse_operation_rows(
}

impl SqlStore {
/// Returns ids of operations which have not been processed by `reduce` task yet.
pub async fn get_unindexed_operation_ids(
&self,
) -> Result<Vec<OperationId>, OperationStorageError> {
let id_rows: Vec<String> = query_scalar(
"
SELECT
operations_v1.operation_id
FROM
operations_v1
WHERE
operations_v1.sorted_index IS NULL
",
)
.fetch_all(&self.pool)
.await
.map_err(|e| OperationStorageError::FatalStorageError(e.to_string()))?;

Ok(id_rows
.iter()
.map(|id| id.parse().expect("invalid operation id in database"))
.collect())
}

/// Update the sorted index of an operation. This method is used in `reduce` tasks as each
/// operation is processed.
pub async fn update_operation_index(
Expand Down
172 changes: 135 additions & 37 deletions aquadoggo/src/materializer/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,47 +87,63 @@ pub async fn materializer_service(
factory.queue(task.to_owned());
});

// Subscribe to communication bus
let mut rx = tx.subscribe();

// Listen to incoming new entries and operations and move them into task queue
let handle = task::spawn(async move {
loop {
if let Ok(ServiceMessage::NewOperation(operation_id)) = rx.recv().await {
// Resolve document id of regarding operation
let document_id = context
.store
.get_document_id_by_operation_id(&operation_id)
.await
.unwrap_or_else(|_| {
panic!(
let handle = {
let context = context.clone();

// Subscribe to communication bus
let mut rx = tx.subscribe();

// Listen to incoming new entries and operations and move them into task queue
task::spawn(async move {
loop {
if let Ok(ServiceMessage::NewOperation(operation_id)) = rx.recv().await {
// Resolve document id of regarding operation
let document_id = context
.store
.get_document_id_by_operation_id(&operation_id)
.await
.unwrap_or_else(|_| {
panic!(
"Failed database query when retrieving document id by operation_id {}",
operation_id
)
});

match document_id {
Some(document_id) => {
// Dispatch "reduce" task which will materialize the regarding document.
factory.queue(Task::new("reduce", TaskInput::DocumentId(document_id)))
}
None => {
// Panic when we couldn't find the regarding document in the database. We can
// safely assure that this is due to a critical bug affecting the database
// integrity. Panicking here will close `handle` and by that signal a node
// shutdown.
panic!("Could not find document for operation_id {}", operation_id);
}
};
});

match document_id {
Some(document_id) => {
// Dispatch "reduce" task which will materialize the regarding document.
factory.queue(Task::new("reduce", TaskInput::DocumentId(document_id)))
}
None => {
// Panic when we couldn't find the regarding document in the database. We can
// safely assure that this is due to a critical bug affecting the database
// integrity. Panicking here will close `handle` and by that signal a node
// shutdown.
panic!("Could not find document for operation_id {}", operation_id);
}
};
}
}
}
});
})
};

debug!("Materialiser service is ready");
if tx_ready.send(()).is_err() {
warn!("No subscriber informed about materialiser service being ready");
};

// Re-apply unmaterialized operations as they might have slipped through in an unexpected crash
// or node shutdown
let unindexed_operation_ids = context
.store
.get_unindexed_operation_ids()
.await
.unwrap_or_else(|_| panic!("Failed database query when loading unindexed operation ids"));

for id in unindexed_operation_ids {
let _ = tx.send(ServiceMessage::NewOperation(id));
}

// Wait until we received the application shutdown signal or handle closed
tokio::select! {
_ = handle => (),
Expand All @@ -144,6 +160,7 @@ mod tests {
use std::time::Duration;

use p2panda_rs::document::traits::AsDocument;
use p2panda_rs::document::DocumentId;
use p2panda_rs::entry::traits::AsEncodedEntry;
use p2panda_rs::identity::KeyPair;
use p2panda_rs::operation::{Operation, OperationId, OperationValue};
Expand Down Expand Up @@ -175,7 +192,8 @@ mod tests {
) {
// Prepare database which inserts data for one document
test_runner(move |node: TestNode| async move {
// Populate the store with some entries and operations but DON'T materialise any resulting documents.
// Populate the store with some entries and operations but DON'T materialise any
// resulting documents
let documents = populate_store(&node.context.store, &config).await;
let document_id = documents[0].id();

Expand Down Expand Up @@ -252,12 +270,13 @@ mod tests {
config: PopulateStoreConfig,
) {
test_runner(move |node: TestNode| async move {
// Populate the store with some entries and operations but DON'T materialise any resulting documents.
// Populate the store with some entries and operations but DON'T materialise any
// resulting documents
let documents = populate_store(&node.context.store, &config).await;
let document_id = documents[0].id();

// Store a pending "reduce" task from last runtime in the database so it gets picked up by
// the materializer service
// Store a pending "reduce" task from last runtime in the database so it gets picked up
// by the materializer service
node.context
.store
.insert_task(&Task::new(
Expand Down Expand Up @@ -295,8 +314,8 @@ mod tests {
panic!("Service dropped");
}

// Wait for service to be done .. it should materialize the document since it was waiting
// as a "pending" task in the database
// Wait for service to be done .. it should materialize the document since it was
// waiting as a "pending" task in the database
tokio::time::sleep(Duration::from_millis(200)).await;

// Make sure the service did not crash and is still running
Expand All @@ -318,6 +337,85 @@ mod tests {
});
}

#[rstest]
fn materialize_unhandled_operations(
#[from(operation)]
#[with(Some(operation_fields(doggo_fields())), None, doggo_schema().id().to_owned())]
operation: Operation,
key_pair: KeyPair,
) {
test_runner(move |node: TestNode| async move {
// Prepare arguments for service
let context = Context::new(
node.context.store.clone(),
KeyPair::new(),
Configuration::default(),
SchemaProvider::default(),
);

// Create an operation in the database which was not handled by the `reduce` task yet.
// This might happen for example when the node crashed right _after_ the operation
// arrived in the database but _before_ the `reduce` task kicked in.
let (entry_signed, _) =
send_to_store(&node.context.store, &operation, &doggo_schema(), &key_pair)
.await
.expect("Publish CREATE operation");
let document_id: DocumentId = entry_signed.hash().into();

// There should be one unhandled operation in the database
let unindexed_operation_ids = context
.store
.get_unindexed_operation_ids()
.await
.unwrap_or_else(|_| {
panic!("Failed database query when loading unindexed operation ids")
});
assert!(unindexed_operation_ids.len() == 1);

let shutdown = task::spawn(async {
loop {
// Do this forever .. this means that the shutdown handler will never resolve
tokio::time::sleep(Duration::from_millis(100)).await;
}
});
let (tx, _) = broadcast::channel(1024);
let (tx_ready, rx_ready) = oneshot::channel::<()>();

// Start materializer service, it should pick up the un-indexed operation automatically
let tx_clone = tx.clone();
let handle = tokio::spawn(async move {
materializer_service(context, shutdown, tx_clone, tx_ready)
.await
.unwrap();
});

if rx_ready.await.is_err() {
panic!("Service dropped");
}

// Wait for service to be done .. it should materialize the document since it was
// waiting as a "pending" task in the database
tokio::time::sleep(Duration::from_millis(200)).await;

// Make sure the service did not crash and is still running
assert!(!handle.is_finished());

// Check database for materialized documents
let document = node
.context
.store
.get_document(&document_id)
.await
.unwrap()
.expect("We expect that the document is `Some`");
assert_eq!(document.id().to_string(), document_id.to_string());
assert_eq!(
document.get("username").unwrap().to_owned(),
OperationValue::String("bubu".into())
);
});
}

#[rstest]
fn materialize_update_document(
#[from(populate_store_config)]
Expand Down

0 comments on commit 34d7e43

Please sign in to comment.