From fd804df542b5e32cdf7e589594e78e98b4150f63 Mon Sep 17 00:00:00 2001 From: Varik Matevosyan Date: Tue, 21 Nov 2023 22:58:25 +0100 Subject: [PATCH] Create lock table in lantern schema hash trigger names (#37) * - Create lock table in lantern schema - Hash client trigger function and trigger names so they won't exceed the character limit * Bump cli version --- lantern_cli/Cargo.toml | 2 +- lantern_daemon/Cargo.toml | 1 + lantern_daemon/src/cli.rs | 4 ++++ lantern_daemon/src/client_jobs.rs | 36 ++++++++++++++++++++----------- lantern_daemon/src/lib.rs | 29 +++++++++++++++++++++---- 5 files changed, 55 insertions(+), 17 deletions(-) diff --git a/lantern_cli/Cargo.toml b/lantern_cli/Cargo.toml index d2a2ac6b..390775b7 100644 --- a/lantern_cli/Cargo.toml +++ b/lantern_cli/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "lantern_cli" -version = "0.0.7" +version = "0.0.9" edition = "2021" [[bin]] diff --git a/lantern_daemon/Cargo.toml b/lantern_daemon/Cargo.toml index cf03eb9e..68172996 100644 --- a/lantern_daemon/Cargo.toml +++ b/lantern_daemon/Cargo.toml @@ -19,3 +19,4 @@ lantern_logger = { path = "../lantern_logger" } lantern_utils = { path = "../lantern_utils" } lazy_static = "1.4.0" itertools = "0.11.0" +md5 = "0.7.0" diff --git a/lantern_daemon/src/cli.rs b/lantern_daemon/src/cli.rs index b1510e55..2297503d 100644 --- a/lantern_daemon/src/cli.rs +++ b/lantern_daemon/src/cli.rs @@ -42,6 +42,10 @@ pub struct DaemonArgs { #[arg(short, long, default_value = "public")] pub schema: String, + /// Internal schema name to create required tables + #[arg(short, long, default_value = "lantern")] + pub internal_schema: String, + /// Max concurrent jobs #[arg(short, long, default_value_t = 1)] pub queue_size: usize, diff --git a/lantern_daemon/src/client_jobs.rs b/lantern_daemon/src/client_jobs.rs index 856cc2fa..7d3e21e6 100644 --- a/lantern_daemon/src/client_jobs.rs +++ b/lantern_daemon/src/client_jobs.rs @@ -76,6 +76,21 @@ pub async fn toggle_client_job( Ok(()) } +fn get_trigger_name(src_column: &str, dst_column: &str) -> String { + let digest = md5::compute(format!("{src_column}{dst_column}").as_bytes()); + return format!("trigger_lantern_jobs_insert_{:x}", digest); +} + +fn get_function_name(table: &str, src_column: &str, dst_column: &str) -> String { + let digest = md5::compute(format!("{table}{src_column}{dst_column}").as_bytes()); + return format!("notify_insert_lantern_daemon_{:x}", digest); +} + +fn get_notification_channel_name(table: &str, src_column: &str, dst_column: &str) -> String { + let digest = md5::compute(format!("{table}{src_column}{dst_column}").as_bytes()); + return format!("lantern_client_notifications_{:x}", digest); +} + async fn setup_client_triggers( job_id: i32, client: Arc, @@ -91,15 +106,11 @@ async fn setup_client_triggers( let full_table_name = get_full_table_name(schema.deref(), table.deref()); check_table_exists(client.clone(), &full_table_name).await?; - // Set up trigger on table insert - let function_name = quote_ident(&format!( - "notify_insert_lantern_daemon_{table}_{src_column}_{dst_column}" - )); - let trigger_name = quote_ident(&format!( - "trigger_lantern_jobs_insert_{src_column}_{dst_column}" - )); - let channel = channel.replace("\"", ""); + let function_name = get_function_name(table.deref(), src_column.deref(), dst_column.deref()); + let trigger_name = get_trigger_name(src_column.deref(), dst_column.deref()); + let function_name = get_full_table_name(schema.deref(), &function_name); + // Set up trigger on table insert client .batch_execute(&format!( " @@ -277,10 +288,11 @@ async fn start_client_job( } }); - let notification_channel = Arc::new(quote_ident(&format!( - "lantern_client_notifications_{table}_{src_column}_{dst_column}" - ))); - + let notification_channel = Arc::new(get_notification_channel_name( + &table, + &src_column, + &dst_column, + )); // Wrap variables into Arc to share between tasks let db_uri = Arc::new(db_uri); let src_column = Arc::new(src_column); diff --git a/lantern_daemon/src/lib.rs b/lantern_daemon/src/lib.rs index e0eb97d8..434cb394 100644 --- a/lantern_daemon/src/lib.rs +++ b/lantern_daemon/src/lib.rs @@ -102,10 +102,16 @@ async fn db_notification_listener( Ok(()) } -async fn lock_row(client: Arc, logger: Arc, job_id: i32, row_id: &str) -> bool { +async fn lock_row( + client: Arc, + lock_table_schema: &str, + logger: Arc, + job_id: i32, + row_id: &str, +) -> bool { let res = client .execute( - &format!("INSERT INTO {EMB_LOCK_TABLE_NAME} (job_id, row_id) VALUES ($1, $2)"), + &format!("INSERT INTO {lock_table_schema} (job_id, row_id) VALUES ($1, $2)"), &[&job_id, &row_id], ) .await; @@ -185,12 +191,14 @@ async fn startup_hook( client: Arc, table: &str, schema: &str, + lock_table_schema: &str, channel: &str, logger: Arc, ) -> AnyhowVoidResult { logger.info("Setting up environment"); // verify that table exists let full_table_name = get_full_table_name(schema, table); + let lock_table_name = get_full_table_name(lock_table_schema, EMB_LOCK_TABLE_NAME); check_table_exists(client.clone(), &full_table_name).await?; // Set up trigger on table insert @@ -228,12 +236,14 @@ async fn startup_hook( EXECUTE PROCEDURE notify_update_lantern_daemon(); -- Create Lock Table - CREATE UNLOGGED TABLE IF NOT EXISTS {EMB_LOCK_TABLE_NAME} ( + CREATE SCHEMA IF NOT EXISTS {lock_table_schema}; + CREATE UNLOGGED TABLE IF NOT EXISTS {lock_table_name} ( job_id INTEGER NOT NULL, row_id TEXT NOT NULL, CONSTRAINT ldb_lock_jobid_rowid UNIQUE (job_id, row_id) ); ", + lock_table_schema = quote_ident(lock_table_schema) )) .await?; @@ -271,6 +281,7 @@ async fn job_insert_processor( mut notifications_rx: Receiver, job_tx: Sender, schema: String, + lock_table_schema: String, table: String, logger: Arc, ) -> AnyhowVoidResult { @@ -302,6 +313,7 @@ async fn job_insert_processor( let client_r1 = client.clone(); let job_tx_r1 = job_tx.clone(); let logger_r1 = logger.clone(); + let lock_table_name = get_full_table_name(&lock_table_schema, EMB_LOCK_TABLE_NAME); let job_batching_hashmap_r1 = job_batching_hashmap.clone(); let insert_processor_task = tokio::spawn(async move { @@ -310,7 +322,14 @@ async fn job_insert_processor( if let Some(row_id) = notification.row_id { // Single row update received from client job, lock row and add to batching map - let status = lock_row(client_r1.clone(), logger_r1.clone(), id, &row_id).await; + let status = lock_row( + client_r1.clone(), + &lock_table_name, + logger_r1.clone(), + id, + &row_id, + ) + .await; if status { // this means locking was successfull and row will be processed @@ -506,6 +525,7 @@ pub async fn start(args: cli::DaemonArgs, logger: Option) -> Result<(), main_db_client.clone(), &args.table, &args.schema, + &args.internal_schema, ¬ification_channel, logger.clone(), ) @@ -524,6 +544,7 @@ pub async fn start(args: cli::DaemonArgs, logger: Option) -> Result<(), insert_notification_queue_rx, job_queue_tx, args.schema.clone(), + args.internal_schema.clone(), args.table.clone(), logger.clone(), )) as VoidFuture,