Skip to content

Add failpoints for worker failures #146

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ tracing-subscriber = { version = "0.3", default-features = false }
utoipa = { version = "4.2.3", default-features = false }
utoipa-swagger-ui = { version = "7.1.0", default-features = false }
uuid = { version = "1.10.0", default-features = false }
fail = { version = "0.5.1", features = ["failpoints"], default-features = false }

# [patch."https://github.com/imor/gcp-bigquery-client"]
# gcp-bigquery-client = { path = "../gcp-bigquery-client" }
2 changes: 1 addition & 1 deletion api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }
tracing = { workspace = true, default-features = false }
tracing-actix-web = { workspace = true, features = ["emit_event_on_error"] }
utoipa = { workspace = true, features = ["actix_extras"] }
utoipa-swagger-ui = { workspace = true, features = ["actix-web", "reqwest"] }
utoipa-swagger-ui = { workspace = true, features = ["actix-web", "reqwest", "vendored"] }
uuid = { version = "1.10.0", features = ["v4"] }

[dev-dependencies]
Expand Down
1 change: 1 addition & 0 deletions etl/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ tracing-subscriber = { workspace = true, default-features = true, features = [
"env-filter",
] }
uuid = { workspace = true, features = ["v4"] }
fail = { workspace = true }

[dev-dependencies]
postgres = { workspace = true, features = ["test-utils", "tokio"] }
Expand Down
9 changes: 9 additions & 0 deletions etl/src/v2/replication/table_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use crate::v2::state::table::{TableReplicationPhase, TableReplicationPhaseType};
use crate::v2::workers::table_sync::{TableSyncWorkerState, TableSyncWorkerStateError};
use futures::StreamExt;
use std::sync::Arc;
use fail::fail_point;
use thiserror::Error;
use tokio::pin;
use tokio::sync::watch;
Expand Down Expand Up @@ -150,6 +151,14 @@ where
let table_schema = transaction
.get_table_schema(table_id, Some(identity.publication_name()))
.await?;
fail_point!(
"table_sync_worker_store_table_schema",
|_| -> Result<TableSyncResult, TableSyncError> {
Err(TableSyncError::StateStore(
StateStoreError::TableReplicationStateNotFound,
))
}
);
state_store
.store_table_schema(identity.id(), table_schema.clone(), true)
.await?;
Expand Down
10 changes: 10 additions & 0 deletions etl/src/v2/workers/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use crate::v2::workers::table_sync::{
};
use postgres::schema::Oid;
use std::sync::Arc;
use fail::fail_point;
use thiserror::Error;
use tokio::sync::watch;
use tokio::task::JoinHandle;
Expand Down Expand Up @@ -103,6 +104,15 @@ where
info!("Starting apply worker");

let apply_worker = async move {
fail_point!(
"apply_worker_load_replication_origin_state",
|_| -> Result<(), ApplyWorkerError> {
Err(ApplyWorkerError::StateStoreError(
StateStoreError::ReplicationOriginStateNotFound,
))
}
);

// TODO: get the slot of the main apply worker or create it if needed.
// We load or initialize the initial state that will be used for the apply worker.
let replication_origin_state = match self
Expand Down
55 changes: 20 additions & 35 deletions etl/tests/integration/pipeline_v2_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,8 @@ use tokio_postgres::types::Type;
use crate::common::database::{spawn_database, test_table_name};
use crate::common::destination_v2::TestDestination;
use crate::common::pipeline_v2::spawn_pg_pipeline;
use crate::common::state_store::{
FaultConfig, FaultInjectingStateStore, FaultType, StateStoreMethod, TestStateStore,
};
use crate::common::state_store::{StateStoreMethod, TestStateStore};
use fail::FailScenario;

#[derive(Debug)]
struct DatabaseSchema {
Expand Down Expand Up @@ -146,11 +145,9 @@ async fn test_pipeline_with_apply_worker_panic() {
let database = spawn_database().await;
let database_schema = setup_database(&database).await;

let fault_config = FaultConfig {
load_replication_origin_state: Some(FaultType::Panic),
..Default::default()
};
let state_store = FaultInjectingStateStore::wrap(TestStateStore::new(), fault_config);
let _scenario = FailScenario::setup();
fail::cfg("apply_worker_load_replication_origin_state", "panic").unwrap();
let state_store = TestStateStore::new();
let destination = TestDestination::new();

// We start the pipeline from scratch.
Expand All @@ -174,11 +171,9 @@ async fn test_pipeline_with_apply_worker_error() {
let database = spawn_database().await;
let database_schema = setup_database(&database).await;

let fault_config = FaultConfig {
load_replication_origin_state: Some(FaultType::Error),
..Default::default()
};
let state_store = FaultInjectingStateStore::wrap(TestStateStore::new(), fault_config);
let _scenario = FailScenario::setup();
fail::cfg("apply_worker_load_replication_origin_state", "return").unwrap();
let state_store = TestStateStore::new();
let destination = TestDestination::new();

// We start the pipeline from scratch.
Expand All @@ -205,11 +200,9 @@ async fn test_pipeline_with_table_sync_worker_panic() {
let database = spawn_database().await;
let database_schema = setup_database(&database).await;

let fault_config = FaultConfig {
store_table_schema: Some(FaultType::Panic),
..Default::default()
};
let state_store = FaultInjectingStateStore::wrap(TestStateStore::new(), fault_config);
let _scenario = FailScenario::setup();
fail::cfg("table_sync_worker_store_table_schema", "panic").unwrap();
let state_store = TestStateStore::new();
let destination = TestDestination::new();

// We start the pipeline from scratch.
Expand All @@ -223,15 +216,13 @@ async fn test_pipeline_with_table_sync_worker_panic() {

// We register the interest in waiting for both table syncs to have started.
let users_state_notify = state_store
.get_inner()
.notify_on_replication_phase(
pipeline_id,
database_schema.users_table_schema.id,
TableReplicationPhaseType::DataSync,
)
.await;
let orders_state_notify = state_store
.get_inner()
.notify_on_replication_phase(
pipeline_id,
database_schema.orders_table_schema.id,
Expand All @@ -256,11 +247,9 @@ async fn test_pipeline_with_table_sync_worker_error() {
let database = spawn_database().await;
let database_schema = setup_database(&database).await;

let fault_config = FaultConfig {
store_table_schema: Some(FaultType::Error),
..Default::default()
};
let state_store = FaultInjectingStateStore::wrap(TestStateStore::new(), fault_config);
let _scenario = FailScenario::setup();
fail::cfg("table_sync_worker_store_table_schema", "return").unwrap();
let state_store = TestStateStore::new();
let destination = TestDestination::new();

// We start the pipeline from scratch.
Expand All @@ -274,15 +263,13 @@ async fn test_pipeline_with_table_sync_worker_error() {

// We register the interest in waiting for both table syncs to have started.
let users_state_notify = state_store
.get_inner()
.notify_on_replication_phase(
pipeline_id,
database_schema.users_table_schema.id,
TableReplicationPhaseType::DataSync,
)
.await;
let orders_state_notify = state_store
.get_inner()
.notify_on_replication_phase(
pipeline_id,
database_schema.orders_table_schema.id,
Expand All @@ -306,6 +293,8 @@ async fn test_pipeline_with_table_sync_worker_error() {
errors[1],
WorkerWaitError::TableSyncWorkerPropagated(_)
));

fail::remove("table_sync_worker_store_table_schema");
}

#[tokio::test(flavor = "multi_thread")]
Expand All @@ -316,13 +305,11 @@ async fn test_table_schema_copy_with_data_sync_retry() {
let state_store = TestStateStore::new();
let destination = TestDestination::new();

// We start the pipeline from scratch with a faulty state store in order to have a failure during
// We start the pipeline from scratch with fail points enabled in order to have a failure during
// data sync.
let fault_config = FaultConfig {
store_table_schema: Some(FaultType::Error),
..Default::default()
};
let failing_state_store = FaultInjectingStateStore::wrap(state_store.clone(), fault_config);
let _scenario = FailScenario::setup();
fail::cfg("table_sync_worker_store_table_schema", "return").unwrap();
let failing_state_store = state_store.clone();
let mut pipeline = spawn_pg_pipeline(
&database_schema.publication_name,
&database.options,
Expand All @@ -333,15 +320,13 @@ async fn test_table_schema_copy_with_data_sync_retry() {

// We register the interest in waiting for both table syncs to have started.
let users_state_notify = failing_state_store
.get_inner()
.notify_on_replication_phase(
pipeline_id,
database_schema.users_table_schema.id,
TableReplicationPhaseType::DataSync,
)
.await;
let orders_state_notify = failing_state_store
.get_inner()
.notify_on_replication_phase(
pipeline_id,
database_schema.orders_table_schema.id,
Expand Down
Loading