Skip to content

Commit

Permalink
fix duplicate affected entities being inserted into db and hide SIRI …
Browse files Browse the repository at this point in the history
…timeout errors
  • Loading branch information
jonerrr committed Aug 14, 2024
1 parent 7f002e9 commit 03e3cba
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 36 deletions.
8 changes: 7 additions & 1 deletion backend/migrations/20240813211652_alerts.up.sql
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,13 @@ CREATE TABLE IF NOT EXISTS affected_entities(
stop_id VARCHAR REFERENCES stops(id),
bus_route_id VARCHAR REFERENCES bus_routes(id),
sort_order INTEGER NOT NULL,
UNIQUE (alert_id, route_id, stop_id, bus_route_id)
UNIQUE (
alert_id,
route_id,
stop_id,
bus_route_id,
sort_order
)
);

CREATE INDEX idx_active_periods_alert_id_start_time_end_time ON active_periods (alert_id, start_time, end_time);
Expand Down
10 changes: 5 additions & 5 deletions backend/src/alerts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@ async fn parse_gtfs(pool: &PgPool) -> Result<(), DecodeFeedError> {
.await?;

let mut in_feed_ids = vec![];
let mut cloned_ids: Vec<Uuid> = vec![];
let mut cloned_mta_ids: Vec<String> = vec![];

let mut alerts: Vec<Alert> = vec![];
Expand Down Expand Up @@ -270,7 +269,7 @@ async fn parse_gtfs(pool: &PgPool) -> Result<(), DecodeFeedError> {
.execute(pool)
.await?;

let start = Utc::now();
// let start = Utc::now();
// Filter out cloned ids from alerts, active periods, and affected entities. There is probably a more elegant way to remove cloned ids
// let alerts: Vec<Alert> = alerts
// .into_par_iter()
Expand Down Expand Up @@ -380,9 +379,10 @@ async fn parse_gtfs(pool: &PgPool) -> Result<(), DecodeFeedError> {
.push_bind(affected_entity.stop_id)
.push_bind(affected_entity.sort_order);
});
query_builder.push(
"ON CONFLICT (alert_id, route_id, bus_route_id, stop_id) DO UPDATE SET sort_order = EXCLUDED.sort_order",
);
// query_builder.push(
// "ON CONFLICT (alert_id, route_id, bus_route_id, stop_id, sort_order) DO UPDATE SET sort_order = EXCLUDED.sort_order",
// );
query_builder.push("ON CONFLICT DO NOTHING");
let query = query_builder.build();
query.execute(pool).await?;

Expand Down
41 changes: 11 additions & 30 deletions backend/src/bus/positions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,18 @@ pub async fn import(pool: PgPool) {
loop {
match parse_siri(pool.clone()).await {
Ok(_) => (),
Err(e) => {
tracing::error!("Error importing SIRI bus data: {:?}", e);
}
}
Err(e) => match e {
DecodeFeedError::Reqwest(e) => {
// Decode errors happen bc the SIRI api occasionally times out, so I am ignoring those errors.
if !e.is_decode() {
tracing::error!("Error importing SIRI bus data: {:?}", e);
}
}
e => tracing::error!("Error importing SIRI bus data: {:?}", e),
},
};

sleep(Duration::from_secs(40)).await;
sleep(Duration::from_secs(35)).await;
}
});
}
Expand All @@ -56,13 +62,6 @@ pub async fn parse_gtfs(pool: PgPool) -> Result<(), DecodeFeedError> {
)
.await?;

// let stop_ids = sqlx::query!("SELECT id FROM bus_stops")
// .fetch_all(&pool)
// .await?
// .into_iter()
// .map(|s| s.id)
// .collect::<Vec<i32>>();

let positions = feed
.entity
.into_par_iter()
Expand Down Expand Up @@ -94,23 +93,6 @@ pub async fn parse_gtfs(pool: PgPool) -> Result<(), DecodeFeedError> {
}
};


// if !stop_ids.contains(&stop_id) {
// println!("Skipping stop_id: {}", stop_id);
// return None;
// }

// let id_name = trip_id.to_owned()
// + &route_id
// + " "
// + &direction.to_string()
// + " "
// + start_date
// + " "
// + &vehicle_id.to_string();
// let id = Uuid::new_v5(&Uuid::NAMESPACE_OID, id_name.as_bytes());
// let start_date = chrono::NaiveDate::parse_from_str(start_date, "%Y%m%d").unwrap();

let Some(position) = vehicle.position else {
tracing::debug!(target: "bus_positions", "Skipping vehicle without position");
return None;
Expand Down Expand Up @@ -266,7 +248,6 @@ struct Capacities {
pub async fn parse_siri(pool: PgPool) -> Result<(), DecodeFeedError> {
let siri_res = reqwest::Client::new()
.get("https://api.prod.obanyc.com/api/siri/vehicle-monitoring.json")
.timeout(Duration::from_secs(29))
.query(&[
("key", api_key()),
("version", "2"),
Expand Down

0 comments on commit 03e3cba

Please sign in to comment.