Skip to content

Commit

Permalink
Create lock table in lantern schema hash trigger names (#37)
Browse files Browse the repository at this point in the history
* - Create lock table in lantern schema
- Hash client trigger function and trigger names so they won't exceed the character limit

* Bump cli version
  • Loading branch information
var77 authored Nov 21, 2023
1 parent 279f72a commit fd804df
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 17 deletions.
2 changes: 1 addition & 1 deletion lantern_cli/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "lantern_cli"
version = "0.0.7"
version = "0.0.9"
edition = "2021"

[[bin]]
Expand Down
1 change: 1 addition & 0 deletions lantern_daemon/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,4 @@ lantern_logger = { path = "../lantern_logger" }
lantern_utils = { path = "../lantern_utils" }
lazy_static = "1.4.0"
itertools = "0.11.0"
md5 = "0.7.0"
4 changes: 4 additions & 0 deletions lantern_daemon/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ pub struct DaemonArgs {
#[arg(short, long, default_value = "public")]
pub schema: String,

/// Internal schema name to create required tables
#[arg(short, long, default_value = "lantern")]
pub internal_schema: String,

/// Max concurrent jobs
#[arg(short, long, default_value_t = 1)]
pub queue_size: usize,
Expand Down
36 changes: 24 additions & 12 deletions lantern_daemon/src/client_jobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,21 @@ pub async fn toggle_client_job(
Ok(())
}

fn get_trigger_name(src_column: &str, dst_column: &str) -> String {
let digest = md5::compute(format!("{src_column}{dst_column}").as_bytes());
return format!("trigger_lantern_jobs_insert_{:x}", digest);
}

fn get_function_name(table: &str, src_column: &str, dst_column: &str) -> String {
let digest = md5::compute(format!("{table}{src_column}{dst_column}").as_bytes());
return format!("notify_insert_lantern_daemon_{:x}", digest);
}

fn get_notification_channel_name(table: &str, src_column: &str, dst_column: &str) -> String {
let digest = md5::compute(format!("{table}{src_column}{dst_column}").as_bytes());
return format!("lantern_client_notifications_{:x}", digest);
}

async fn setup_client_triggers(
job_id: i32,
client: Arc<Client>,
Expand All @@ -91,15 +106,11 @@ async fn setup_client_triggers(
let full_table_name = get_full_table_name(schema.deref(), table.deref());
check_table_exists(client.clone(), &full_table_name).await?;

// Set up trigger on table insert
let function_name = quote_ident(&format!(
"notify_insert_lantern_daemon_{table}_{src_column}_{dst_column}"
));
let trigger_name = quote_ident(&format!(
"trigger_lantern_jobs_insert_{src_column}_{dst_column}"
));
let channel = channel.replace("\"", "");
let function_name = get_function_name(table.deref(), src_column.deref(), dst_column.deref());
let trigger_name = get_trigger_name(src_column.deref(), dst_column.deref());
let function_name = get_full_table_name(schema.deref(), &function_name);

// Set up trigger on table insert
client
.batch_execute(&format!(
"
Expand Down Expand Up @@ -277,10 +288,11 @@ async fn start_client_job(
}
});

let notification_channel = Arc::new(quote_ident(&format!(
"lantern_client_notifications_{table}_{src_column}_{dst_column}"
)));

let notification_channel = Arc::new(get_notification_channel_name(
&table,
&src_column,
&dst_column,
));
// Wrap variables into Arc to share between tasks
let db_uri = Arc::new(db_uri);
let src_column = Arc::new(src_column);
Expand Down
29 changes: 25 additions & 4 deletions lantern_daemon/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,16 @@ async fn db_notification_listener(
Ok(())
}

async fn lock_row(client: Arc<Client>, logger: Arc<Logger>, job_id: i32, row_id: &str) -> bool {
async fn lock_row(
client: Arc<Client>,
lock_table_schema: &str,
logger: Arc<Logger>,
job_id: i32,
row_id: &str,
) -> bool {
let res = client
.execute(
&format!("INSERT INTO {EMB_LOCK_TABLE_NAME} (job_id, row_id) VALUES ($1, $2)"),
&format!("INSERT INTO {lock_table_schema} (job_id, row_id) VALUES ($1, $2)"),
&[&job_id, &row_id],
)
.await;
Expand Down Expand Up @@ -185,12 +191,14 @@ async fn startup_hook(
client: Arc<Client>,
table: &str,
schema: &str,
lock_table_schema: &str,
channel: &str,
logger: Arc<Logger>,
) -> AnyhowVoidResult {
logger.info("Setting up environment");
// verify that table exists
let full_table_name = get_full_table_name(schema, table);
let lock_table_name = get_full_table_name(lock_table_schema, EMB_LOCK_TABLE_NAME);
check_table_exists(client.clone(), &full_table_name).await?;

// Set up trigger on table insert
Expand Down Expand Up @@ -228,12 +236,14 @@ async fn startup_hook(
EXECUTE PROCEDURE notify_update_lantern_daemon();
-- Create Lock Table
CREATE UNLOGGED TABLE IF NOT EXISTS {EMB_LOCK_TABLE_NAME} (
CREATE SCHEMA IF NOT EXISTS {lock_table_schema};
CREATE UNLOGGED TABLE IF NOT EXISTS {lock_table_name} (
job_id INTEGER NOT NULL,
row_id TEXT NOT NULL,
CONSTRAINT ldb_lock_jobid_rowid UNIQUE (job_id, row_id)
);
",
lock_table_schema = quote_ident(lock_table_schema)
))
.await?;

Expand Down Expand Up @@ -271,6 +281,7 @@ async fn job_insert_processor(
mut notifications_rx: Receiver<JobInsertNotification>,
job_tx: Sender<Job>,
schema: String,
lock_table_schema: String,
table: String,
logger: Arc<Logger>,
) -> AnyhowVoidResult {
Expand Down Expand Up @@ -302,6 +313,7 @@ async fn job_insert_processor(
let client_r1 = client.clone();
let job_tx_r1 = job_tx.clone();
let logger_r1 = logger.clone();
let lock_table_name = get_full_table_name(&lock_table_schema, EMB_LOCK_TABLE_NAME);
let job_batching_hashmap_r1 = job_batching_hashmap.clone();

let insert_processor_task = tokio::spawn(async move {
Expand All @@ -310,7 +322,14 @@ async fn job_insert_processor(

if let Some(row_id) = notification.row_id {
// Single row update received from client job, lock row and add to batching map
let status = lock_row(client_r1.clone(), logger_r1.clone(), id, &row_id).await;
let status = lock_row(
client_r1.clone(),
&lock_table_name,
logger_r1.clone(),
id,
&row_id,
)
.await;

if status {
// this means locking was successfull and row will be processed
Expand Down Expand Up @@ -506,6 +525,7 @@ pub async fn start(args: cli::DaemonArgs, logger: Option<Logger>) -> Result<(),
main_db_client.clone(),
&args.table,
&args.schema,
&args.internal_schema,
&notification_channel,
logger.clone(),
)
Expand All @@ -524,6 +544,7 @@ pub async fn start(args: cli::DaemonArgs, logger: Option<Logger>) -> Result<(),
insert_notification_queue_rx,
job_queue_tx,
args.schema.clone(),
args.internal_schema.clone(),
args.table.clone(),
logger.clone(),
)) as VoidFuture,
Expand Down

0 comments on commit fd804df

Please sign in to comment.