Skip to content

Commit

Permalink
refactor refresh data logic and move index migrations
Browse files Browse the repository at this point in the history
  • Loading branch information
jonerrr committed Aug 13, 2024
1 parent 6629033 commit 25597e5
Show file tree
Hide file tree
Showing 10 changed files with 104 additions and 88 deletions.
16 changes: 15 additions & 1 deletion backend/compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,24 @@ services:
- POSTGRES_PASSWORD=trains
- POSTGRES_DB=trains
volumes:
- ./container_postgres-data:/var/lib/postgresql/data:z
- trainstatus-db:/var/lib/postgresql/data:z
# ts-backend:
# environment:
# - DATABASE_URL=postgres://trains:trains@localhost:5432/trains
# - API_KEY=mta api key
# build: .
# network_mode: host
# ts-database:
# image: timescale/timescaledb:latest-pg16
# container_name: timescale-ts
# ports:
# - 5432:5432
# environment:
# - POSTGRES_USER=trains
# - POSTGRES_PASSWORD=trains
# - POSTGRES_DB=trains
# volumes:
# - trainstatus-db:/var/lib/postgresql/data:z

volumes:
trainstatus-db:
6 changes: 6 additions & 0 deletions backend/migrations/20240520002212_trips.down.sql
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
DROP INDEX IF EXISTS idx_trips_created_at;

DROP INDEX IF EXISTS idx_positions_updated_at;

DROP INDEX IF EXISTS idx_stop_times_arrival;

DROP TABLE IF EXISTS positions;

DROP TABLE IF EXISTS stop_times;
Expand Down
8 changes: 7 additions & 1 deletion backend/migrations/20240520002212_trips.up.sql
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,10 @@ CREATE TABLE IF NOT EXISTS stop_times (
arrival TIMESTAMP WITH TIME ZONE NOT NULL,
departure TIMESTAMP WITH TIME ZONE NOT NULL,
PRIMARY KEY (trip_id, stop_id)
);
);

CREATE INDEX idx_trips_created_at ON trips (created_at);

CREATE INDEX idx_positions_updated_at ON positions (updated_at);

CREATE INDEX idx_stop_times_arrival ON stop_times (arrival);
6 changes: 6 additions & 0 deletions backend/migrations/20240613230108_bus_trips.down.sql
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
DROP INDEX IF EXISTS INDEX idx_bus_trips_created_at;

DROP INDEX IF EXISTS INDEX idx_bus_positions_updated_at;

DROP INDEX IF EXISTS INDEX idx_bus_stop_times_arrival;

DROP TABLE IF EXISTS bus_positions;

DROP TABLE IF EXISTS bus_stop_times;
Expand Down
8 changes: 7 additions & 1 deletion backend/migrations/20240613230108_bus_trips.up.sql
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,10 @@ CREATE TABLE IF NOT EXISTS bus_stop_times (
departure TIMESTAMP WITH TIME ZONE NOT NULL,
stop_sequence SMALLINT NOT NULL,
PRIMARY KEY (trip_id, stop_id)
);
);

CREATE INDEX idx_bus_trips_created_at ON bus_trips (created_at);

CREATE INDEX idx_bus_positions_updated_at ON bus_positions (updated_at);

CREATE INDEX idx_bus_stop_times_arrival ON bus_stop_times (arrival);
15 changes: 0 additions & 15 deletions backend/migrations/20240730230831_trip_indexes.down.sql

This file was deleted.

17 changes: 0 additions & 17 deletions backend/migrations/20240730230831_trip_indexes.up.sql

This file was deleted.

29 changes: 21 additions & 8 deletions backend/src/bus/trips.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,6 @@ use uuid::Uuid;

// use std::io::Write;

struct StopTime {
trip_id: Uuid,
stop_id: i32,
arrival: DateTime<Utc>,
departure: DateTime<Utc>,
stop_sequence: i16,
}

// TODO: remove unwraps and handle errors

pub async fn import(pool: PgPool) {
Expand Down Expand Up @@ -47,9 +39,30 @@ fn convert_timestamp(timestamp: Option<i64>) -> Option<DateTime<Utc>> {
}
}

pub struct BusTrip {
pub id: Uuid,
pub mta_id: String,
pub vehicle_id: i32,
pub start_date: chrono::NaiveDate,
pub created_at: DateTime<Utc>,
pub direction: i16,
pub deviation: Option<i32>,
pub route_id: String,
}

struct StopTime {
trip_id: Uuid,
stop_id: i32,
arrival: DateTime<Utc>,
departure: DateTime<Utc>,
stop_sequence: i16,
}

pub async fn decode_feed(pool: &PgPool) -> Result<(), DecodeFeedError> {
// after 30 seconds the obanyc api will return its own timeout error
let data = reqwest::Client::new()
.get("https://gtfsrt.prod.obanyc.com/tripUpdates")
.timeout(Duration::from_secs(29))
.send()
.await?
.bytes()
Expand Down
61 changes: 32 additions & 29 deletions backend/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,38 +57,41 @@ async fn main() {
.await
.unwrap();

let should_update = match last_updated {
Some(last_updated) => {
let now = Utc::now();
let days = Days::new(3);

// tracing::info!("Updating data. Last updated at: {}", last_updated.update_at);

last_updated.update_at < now.checked_sub_days(days).unwrap()
// If user wants to FORCE_UPDATE, then don't check for last updated
if var("FORCE_UPDATE").is_err() {
// Data should be refreshed every 3 days
if let Some(last_updated) = last_updated {
tracing::info!("Last updated at: {}", last_updated.update_at);

let duration_since_last_update =
Utc::now().signed_duration_since(last_updated.update_at);

// Check if the data is older than 3 days
if duration_since_last_update.num_days() <= 3 {
// Sleep until it has been 3 days, take into account the time since last update
let sleep_time = Duration::from_secs(60 * 60 * 24 * 3)
.checked_sub(duration_since_last_update.to_std().unwrap())
.unwrap();
tracing::info!("Waiting {} seconds before updating", sleep_time.as_secs());
sleep(sleep_time).await;
}
}
None => true,
};

if should_update || var("FORCE_UPDATE").is_ok() {
// update_transfers(&s_pool).await;

// tracing::info!("Updating bus stops and routes");
bus::static_data::stops_and_routes(&s_pool).await;
// tracing::info!("Updating train stops and routes");
static_data::stops_and_routes(&s_pool).await;

// remove old update_ats
sqlx::query!("DELETE FROM last_update")
.execute(&s_pool)
.await
.unwrap();
sqlx::query!("INSERT INTO last_update (update_at) VALUES (now())")
.execute(&s_pool)
.await
.unwrap();
}
tracing::info!("Updating stops and trips");

sleep(Duration::from_secs(60 * 60)).await;
bus::static_data::stops_and_routes(&s_pool).await;
static_data::stops_and_routes(&s_pool).await;

// remove old update_ats
sqlx::query!("DELETE FROM last_update")
.execute(&s_pool)
.await
.unwrap();
sqlx::query!("INSERT INTO last_update (update_at) VALUES (now())")
.execute(&s_pool)
.await
.unwrap();
tracing::info!("Data updated");
}
});

Expand Down
26 changes: 10 additions & 16 deletions backend/src/trips.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ impl TripDescriptor {
let mut route_id = id.to_owned();
if route_id == "SS" {
route_id = "SI".to_string();
// TODO: set express to true for SS
};

let mut express = false;
Expand Down Expand Up @@ -218,8 +219,6 @@ impl Trip {
}
None => Ok(false),
}

// Ok(res)
}
}

Expand Down Expand Up @@ -400,15 +399,15 @@ pub async fn decode_feed(pool: &PgPool, endpoint: &str) -> Result<(), DecodeFeed

for entity in feed.entity {
if let Some(trip_update) = entity.trip_update {
let trip_span = span!(
tracing::Level::TRACE,
"trip_update",
trip_id = trip_update.trip.trip_id,
start_date = trip_update.trip.start_date,
start_time = trip_update.trip.start_time,
nyct_trip_descriptor = format!("{:#?}", trip_update.trip.nyct_trip_descriptor)
);
let _enter = trip_span.enter();
// let trip_span = span!(
// tracing::Level::TRACE,
// "trip_update",
// trip_id = trip_update.trip.trip_id,
// start_date = trip_update.trip.start_date,
// start_time = trip_update.trip.start_time,
// nyct_trip_descriptor = format!("{:#?}", trip_update.trip.nyct_trip_descriptor)
// );
// let _enter = trip_span.enter();

let mut trip = match trip_update.trip.clone().into() {
Ok(t) => t,
Expand All @@ -421,11 +420,6 @@ pub async fn decode_feed(pool: &PgPool, endpoint: &str) -> Result<(), DecodeFeed
match trip.direction {
Some(_) => (),
None => {
// let first_stop_id = trip_update
// .stop_time_update
// .first()
// .ok_or(IntoTripError::StopId)?;
// trip.direction = first_stop_id.direction();
match trip_update.stop_time_update.first() {
Some(st) => {
trip.direction = st.direction();
Expand Down

0 comments on commit 25597e5

Please sign in to comment.