diff --git a/backend/migrations/20240813211652_alerts.up.sql b/backend/migrations/20240813211652_alerts.up.sql index 1cdc302..0dcffca 100644 --- a/backend/migrations/20240813211652_alerts.up.sql +++ b/backend/migrations/20240813211652_alerts.up.sql @@ -25,7 +25,13 @@ CREATE TABLE IF NOT EXISTS affected_entities( stop_id VARCHAR REFERENCES stops(id), bus_route_id VARCHAR REFERENCES bus_routes(id), sort_order INTEGER NOT NULL, - UNIQUE (alert_id, route_id, stop_id, bus_route_id) + UNIQUE ( + alert_id, + route_id, + stop_id, + bus_route_id, + sort_order + ) ); CREATE INDEX idx_active_periods_alert_id_start_time_end_time ON active_periods (alert_id, start_time, end_time); diff --git a/backend/src/alerts.rs b/backend/src/alerts.rs index e0bb8dd..af10eee 100644 --- a/backend/src/alerts.rs +++ b/backend/src/alerts.rs @@ -111,7 +111,6 @@ async fn parse_gtfs(pool: &PgPool) -> Result<(), DecodeFeedError> { .await?; let mut in_feed_ids = vec![]; - let mut cloned_ids: Vec = vec![]; let mut cloned_mta_ids: Vec = vec![]; let mut alerts: Vec = vec![]; @@ -270,7 +269,7 @@ async fn parse_gtfs(pool: &PgPool) -> Result<(), DecodeFeedError> { .execute(pool) .await?; - let start = Utc::now(); + // let start = Utc::now(); // Filter out cloned ids from alerts, active periods, and affected entities. There is probably a more elegant way to remove cloned ids // let alerts: Vec = alerts // .into_par_iter() @@ -380,9 +379,10 @@ async fn parse_gtfs(pool: &PgPool) -> Result<(), DecodeFeedError> { .push_bind(affected_entity.stop_id) .push_bind(affected_entity.sort_order); }); - query_builder.push( - "ON CONFLICT (alert_id, route_id, bus_route_id, stop_id) DO UPDATE SET sort_order = EXCLUDED.sort_order", - ); + // query_builder.push( + // "ON CONFLICT (alert_id, route_id, bus_route_id, stop_id, sort_order) DO UPDATE SET sort_order = EXCLUDED.sort_order", + // ); + query_builder.push("ON CONFLICT DO NOTHING"); let query = query_builder.build(); query.execute(pool).await?; diff --git a/backend/src/bus/positions.rs b/backend/src/bus/positions.rs index 370510e..1cae270 100644 --- a/backend/src/bus/positions.rs +++ b/backend/src/bus/positions.rs @@ -39,12 +39,18 @@ pub async fn import(pool: PgPool) { loop { match parse_siri(pool.clone()).await { Ok(_) => (), - Err(e) => { - tracing::error!("Error importing SIRI bus data: {:?}", e); - } - } + Err(e) => match e { + DecodeFeedError::Reqwest(e) => { + // Decode errors happen bc the SIRI api occasionally times out, so I am ignoring those errors. + if !e.is_decode() { + tracing::error!("Error importing SIRI bus data: {:?}", e); + } + } + e => tracing::error!("Error importing SIRI bus data: {:?}", e), + }, + }; - sleep(Duration::from_secs(40)).await; + sleep(Duration::from_secs(35)).await; } }); } @@ -56,13 +62,6 @@ pub async fn parse_gtfs(pool: PgPool) -> Result<(), DecodeFeedError> { ) .await?; - // let stop_ids = sqlx::query!("SELECT id FROM bus_stops") - // .fetch_all(&pool) - // .await? - // .into_iter() - // .map(|s| s.id) - // .collect::>(); - let positions = feed .entity .into_par_iter() @@ -94,23 +93,6 @@ pub async fn parse_gtfs(pool: PgPool) -> Result<(), DecodeFeedError> { } }; - - // if !stop_ids.contains(&stop_id) { - // println!("Skipping stop_id: {}", stop_id); - // return None; - // } - - // let id_name = trip_id.to_owned() - // + &route_id - // + " " - // + &direction.to_string() - // + " " - // + start_date - // + " " - // + &vehicle_id.to_string(); - // let id = Uuid::new_v5(&Uuid::NAMESPACE_OID, id_name.as_bytes()); - // let start_date = chrono::NaiveDate::parse_from_str(start_date, "%Y%m%d").unwrap(); - let Some(position) = vehicle.position else { tracing::debug!(target: "bus_positions", "Skipping vehicle without position"); return None; @@ -266,7 +248,6 @@ struct Capacities { pub async fn parse_siri(pool: PgPool) -> Result<(), DecodeFeedError> { let siri_res = reqwest::Client::new() .get("https://api.prod.obanyc.com/api/siri/vehicle-monitoring.json") - .timeout(Duration::from_secs(29)) .query(&[ ("key", api_key()), ("version", "2"),