Skip to content

Commit

Permalink
refactor siri positions and combine gtfs decode feed into one functio…
Browse files Browse the repository at this point in the history
…n, improve chunking, change default address to 127.0.0.1
  • Loading branch information
jonerrr committed Aug 14, 2024
1 parent 0763087 commit f9b8b51
Show file tree
Hide file tree
Showing 14 changed files with 261 additions and 177 deletions.
3 changes: 2 additions & 1 deletion backend/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ container_*
trips.txt
positions.txt
siri-vehicles.json
gtfs/
gtfs/
env
52 changes: 50 additions & 2 deletions backend/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion backend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ axum = "0.7.5"
chrono = { version = "0.4.38", features = ["serde"] }
chrono-tz = "0.9.0"
csv = "1.3.0"
# geo-types = "0.7.13"
futures = "0.3.30"
geo = "0.28.0"
http = "1.1.0"
Expand All @@ -34,10 +33,13 @@ sqlx = { version = "0.8.0", features = [
] }
thiserror = "1.0.63"
tokio = { version = "1.39.2", features = ["full"] }
tower = "0.5.0"
tower-http = { version = "0.5.2", features = [
"trace",
"timeout",
"compression-gzip",
"compression-br",
"normalize-path",
] }
tracing = "0.1.40"
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
Expand Down
42 changes: 14 additions & 28 deletions backend/src/alerts.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,18 @@
use crate::{
feed::{self},
static_data::ROUTES,
trips::DecodeFeedError,
gtfs::decode,
train::{static_data::ROUTES, trips::DecodeFeedError},
};
use chrono::{DateTime, Utc};
use prost::Message;
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
use rayon::prelude::*;
use sqlx::{PgPool, QueryBuilder};
use std::{env::var, time::Duration};
use tokio::{
fs::{create_dir, write},
time::sleep,
};
use std::time::Duration;
use tokio::time::sleep;
use uuid::Uuid;

pub async fn import(pool: PgPool) {
tokio::spawn(async move {
loop {
if let Err(e) = decode(&pool).await {
if let Err(e) = parse_gtfs(&pool).await {
tracing::error!("Failed to decode feed: {:?}", e);
}
sleep(Duration::from_secs(10)).await;
Expand Down Expand Up @@ -106,20 +101,12 @@ pub struct AffectedEntity {
pub sort_order: i32,
}

async fn decode(pool: &PgPool) -> Result<(), DecodeFeedError> {
let data = reqwest::Client::new()
.get("https://api-endpoint.mta.info/Dataservice/mtagtfsfeeds/camsys%2Fall-alerts")
.send()
.await?
.bytes()
.await?;
let feed = feed::FeedMessage::decode(data)?;

if var("DEBUG_GTFS").is_ok() {
let msgs = format!("{:#?}", feed);
create_dir("./gtfs").await.ok();
write("./gtfs/alerts.txt", msgs).await.unwrap();
}
async fn parse_gtfs(pool: &PgPool) -> Result<(), DecodeFeedError> {
let feed = decode(
"https://api-endpoint.mta.info/Dataservice/mtagtfsfeeds/camsys%2Fall-alerts",
"alerts",
)
.await?;

let mut in_feed_ids = vec![];
let mut cloned_ids: Vec<String> = vec![];
Expand Down Expand Up @@ -220,14 +207,13 @@ async fn decode(pool: &PgPool) -> Result<(), DecodeFeedError> {
r
}
})
.map(|r| {
.and_then(|r| {
if ROUTES.contains(&r.as_str()) {
Some(r)
} else {
None
}
})
.flatten();
});

// check if route_id is in ROUTES, otherwise its a bus route
// TODO: improve this
Expand Down
115 changes: 67 additions & 48 deletions backend/src/bus/positions.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
use crate::{bus::api_key, feed, trips::DecodeFeedError};
use crate::{bus::api_key, gtfs::decode, train::trips::DecodeFeedError};
use chrono::{DateTime, Utc};
use prost::Message;
use rayon::prelude::*;
use serde::{Deserialize, Deserializer};
use sqlx::{PgPool, QueryBuilder};
use std::time::Duration;
use tokio::time::sleep;

// use std::io::Write;

#[derive(Debug)]
struct Position {
vehicle_id: i32,
Expand All @@ -27,7 +24,7 @@ pub async fn import(pool: PgPool) {
let pool1 = pool.clone();
tokio::spawn(async move {
loop {
match decode_feed(pool1.clone()).await {
match parse_gtfs(pool1.clone()).await {
Ok(_) => (),
Err(e) => {
tracing::error!("Error importing bus position data: {:?}", e);
Expand All @@ -40,7 +37,7 @@ pub async fn import(pool: PgPool) {

tokio::spawn(async move {
loop {
match decode_siri(pool.clone()).await {
match parse_siri(pool.clone()).await {
Ok(_) => (),
Err(e) => {
tracing::error!("Error importing SIRI bus data: {:?}", e);
Expand All @@ -52,19 +49,12 @@ pub async fn import(pool: PgPool) {
});
}

pub async fn decode_feed(pool: PgPool) -> Result<(), DecodeFeedError> {
let data = reqwest::Client::new()
.get("https://gtfsrt.prod.obanyc.com/vehiclePositions")
.send()
.await?
.bytes()
.await?;

let feed = feed::FeedMessage::decode(data)?;

// let mut msgs = Vec::new();
// write!(msgs, "{:#?}", feed).unwrap();
// tokio::fs::write("./positions.txt", msgs).await.unwrap();
pub async fn parse_gtfs(pool: PgPool) -> Result<(), DecodeFeedError> {
let feed = decode(
"https://gtfsrt.prod.obanyc.com/vehiclePositions",
"buspositions",
)
.await?;

// let stop_ids = sqlx::query!("SELECT id FROM bus_stops")
// .fetch_all(&pool)
Expand Down Expand Up @@ -164,6 +154,41 @@ pub async fn decode_feed(pool: PgPool) -> Result<(), DecodeFeedError> {
Ok(())
}

#[derive(Debug)]
struct SiriPosition {
vehicle_id: i32,
mta_id: String,
progress_status: Option<String>,
passengers: Option<i32>,
capacity: Option<i32>,
}

impl From<MonitoredVehicleJourney> for SiriPosition {
fn from(value: MonitoredVehicleJourney) -> Self {
let vehicle_id: i32 = value.vehicle_ref.parse().unwrap();
// TODO: simplify
let capacity = value.monitored_call.and_then(|c| {
c.extensions.map(|e| {
(
e.capacities.estimated_passenger_count,
e.capacities.estimated_passenger_capacity,
)
})
});

let progress_status = value.progress_status.and_then(|s| s.into_iter().nth(0));
let mta_id = value.framed_vehicle_journey_ref.dated_vehicle_journey_ref;

Self {
vehicle_id,
mta_id,
progress_status,
passengers: capacity.map(|c| c.0),
capacity: capacity.map(|c| c.1),
}
}
}

#[derive(Debug, Deserialize)]
#[serde(rename_all = "PascalCase")]
struct ServiceDelivery {
Expand Down Expand Up @@ -238,9 +263,10 @@ struct Capacities {
}

// need to get siri feed so we can get progress status and capacities
pub async fn decode_siri(pool: PgPool) -> Result<(), DecodeFeedError> {
pub async fn parse_siri(pool: PgPool) -> Result<(), DecodeFeedError> {
let siri_res = reqwest::Client::new()
.get("https://api.prod.obanyc.com/api/siri/vehicle-monitoring.json")
.timeout(Duration::from_secs(29))
.query(&[
("key", api_key()),
("version", "2"),
Expand Down Expand Up @@ -269,36 +295,29 @@ pub async fn decode_siri(pool: PgPool) -> Result<(), DecodeFeedError> {
return Err(DecodeFeedError::Siri("no vehicles".to_string()));
};

// TODO: make sure progress status is correct (that we only need to worry about statuses bc when rate is unknown/no progress its always layover/spooking)
for vehicle in vehicles.vehicle_activity {
let monitored_vehicle_journey = vehicle.monitored_vehicle_journey;
let capacities = monitored_vehicle_journey.monitored_call.and_then(|c| {
c.extensions.map(|e| {
(
e.capacities.estimated_passenger_count,
e.capacities.estimated_passenger_capacity,
)
})
});

let progress_status = monitored_vehicle_journey
.progress_status
.map(|s| s[0].clone());

let vehicle_id: i32 = monitored_vehicle_journey.vehicle_ref.parse().unwrap();
let trip_id = monitored_vehicle_journey
.framed_vehicle_journey_ref
.dated_vehicle_journey_ref;
let positions = vehicles
.vehicle_activity
.into_par_iter()
.map(|v| v.monitored_vehicle_journey.into())
.collect::<Vec<SiriPosition>>();

sqlx::query!(
"UPDATE bus_positions SET progress_status = $1, passengers = $2, capacity = $3 WHERE vehicle_id = $4 AND mta_id = $5",
progress_status,
capacities.map(|c| c.0),
capacities.map(|c| c.1),
vehicle_id,
trip_id
).execute(&pool).await?;
// TODO: fix querybuilder issue "trailing junk after parameter at or near \"$5V\"
for p in positions {
sqlx::query!("UPDATE bus_positions SET progress_status = $1, passengers = $2, capacity = $3 WHERE vehicle_id = $4 AND mta_id = $5", p.progress_status, p.passengers, p.capacity, p.vehicle_id, p.mta_id).execute(&pool).await?;
}
// for positions in positions.chunks(1) {
// dbg!(positions);
// let mut query_builder = QueryBuilder::new("UPDATE bus_positions SET progress_status = $1, passengers = $2, capacity = $3 WHERE vehicle_id = $4 AND mta_id = $5");
// query_builder.push_values(positions, |mut b, position| {
// b.push_bind(&position.progress_status)
// .push_bind(position.passengers)
// .push_bind(position.capacity)
// .push_bind(position.vehicle_id)
// .push_bind(&position.mta_id);
// });
// let query = query_builder.build();
// query.execute(&pool).await?;
// }

// println!("{:#?}", service_delivery);
// let mut progresses = Vec::new();
Expand Down
Loading

0 comments on commit f9b8b51

Please sign in to comment.