Skip to content

Commit

Permalink
Entities size optimization
Browse files Browse the repository at this point in the history
  • Loading branch information
GPeaky committed Oct 21, 2024
1 parent c5e212d commit ac36841
Show file tree
Hide file tree
Showing 8 changed files with 46 additions and 42 deletions.
2 changes: 0 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,9 @@ members = [
"crates/utils",
"crates/discord",
]

default-members = ["crates/api"]

[workspace.dependencies]

#
# Workspace member crates
#
Expand Down
6 changes: 3 additions & 3 deletions crates/entities/src/driver.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::sync::Arc;
use std::{num::NonZeroI32, sync::Arc};

use chrono::{DateTime, Utc};
use deadpool_postgres::tokio_postgres::Row;
Expand All @@ -10,7 +10,7 @@ pub type SharedDriver = Arc<Driver>;
pub struct Driver {
pub steam_name: Box<str>,
pub nationality: i16,
pub user_id: Option<i32>,
pub user_id: Option<NonZeroI32>,
pub created_at: DateTime<Utc>,
pub updated_at: Option<DateTime<Utc>>,
}
Expand All @@ -22,7 +22,7 @@ impl Driver {
Driver {
steam_name: row.get(0),
nationality: row.get(1),
user_id: row.get(2),
user_id: row.get::<_, Option<i32>>(2).and_then(NonZeroI32::new),
created_at: row.get(3),
updated_at: row.get(4),
}
Expand Down
4 changes: 1 addition & 3 deletions crates/entities/src/entities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ mod user;

pub use championship::*;
pub use driver::*;
#[allow(unused)]
pub use race::*;
#[allow(unused)]
pub use result::*;
pub use user::*;
pub use user::*;
6 changes: 3 additions & 3 deletions crates/entities/src/user.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::sync::Arc;
use std::{num::NonZeroI64, sync::Arc};

use chrono::{DateTime, Utc};
use deadpool_postgres::tokio_postgres::Row;
Expand Down Expand Up @@ -46,7 +46,7 @@ pub struct User {
pub provider: Provider,
pub role: Role,
#[serde(skip_serializing_if = "Option::is_none")]
pub discord_id: Option<i64>,
pub discord_id: Option<NonZeroI64>,
#[serde(skip_serializing)]
pub active: bool,
pub created_at: DateTime<Utc>,
Expand All @@ -66,7 +66,7 @@ impl User {
avatar: row.get(4),
provider: row.get(5),
role: row.get(6),
discord_id: row.get(7),
discord_id: row.get::<_, Option<i64>>(7).and_then(NonZeroI64::new),
active: row.get(8),
created_at: row.get(9),
updated_at: row.get(10),
Expand Down
2 changes: 1 addition & 1 deletion crates/f1-telemetry/src/championship.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ pub struct F1State {
impl F1ChampionshipManager {
/// Creates a new F1ServiceHandler instance.
pub fn new(f1_state: &'static F1State) -> Self {
let services = Box::leak(Box::new(DashMap::with_capacity(10)));
let services = Box::leak(Box::new(DashMap::with_capacity(5)));
Self { services, f1_state }
}

Expand Down
1 change: 1 addition & 0 deletions crates/f1-telemetry/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ impl F1TelemetryPacketHandler {
}
}

// TODO: Add more idiomatic name
/// Spawns the update task for sending updates
#[inline]
fn spawn_update_task(&mut self, tx: Sender<Bytes>) {
Expand Down
65 changes: 36 additions & 29 deletions crates/f1-telemetry/src/live_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,19 +68,19 @@ pub struct F1LiveTelemetryService {
tick_counter: u8,
championship_id: i32,
port_partially_opened: bool,
last_updates: PacketProcessingTimestamps,
timestamps: PacketProcessingTimestamps,
socket: UdpSocket,
shutdown: oneshot::Receiver<()>,
session_type: Option<SessionType>,
data_manager: F1TelemetryPacketHandler,
packet_handler: F1TelemetryPacketHandler,
services: &'static DashMap<i32, F1SessionBroadcaster>,
f1_state: &'static F1State,
}

/// Holds data related to an F1 service instance
pub struct F1SessionBroadcaster {
inner: Arc<F1SessionBroadcasterInner>,
session_manager: F1TelemetryPacketHandler,
packet_handler: F1TelemetryPacketHandler,
shutdown: Option<oneshot::Sender<()>>,
}

Expand Down Expand Up @@ -140,7 +140,7 @@ impl F1TelemetryPacket<'_> {
impl F1LiveTelemetryService {
/// Creates a new F1LiveTelemetryService instance
pub async fn new(
data_manager: F1TelemetryPacketHandler,
packet_handler: F1TelemetryPacketHandler,
shutdown: oneshot::Receiver<()>,
services: &'static DashMap<i32, F1SessionBroadcaster>,
f1_state: &'static F1State,
Expand All @@ -151,11 +151,11 @@ impl F1LiveTelemetryService {
championship_id: 0,
tick_counter: 10,
port_partially_opened: false,
last_updates: PacketProcessingTimestamps::new(),
timestamps: PacketProcessingTimestamps::new(),
shutdown,
socket: UdpSocket::bind("0.0.0.0:0").await.unwrap(),
session_type: None,
data_manager,
packet_handler,
services,
f1_state,
}
Expand Down Expand Up @@ -312,17 +312,17 @@ impl F1LiveTelemetryService {

#[inline]
fn handle_motion_packet(&mut self, motion_data: &PacketMotionData, now: Instant) {
if now.duration_since(self.last_updates.car_motion) < MOTION_INTERVAL {
if now.duration_since(self.timestamps.car_motion) < MOTION_INTERVAL {
return;
}

self.data_manager.save_motion(motion_data);
self.last_updates.car_motion = now;
self.packet_handler.save_motion(motion_data);
self.timestamps.car_motion = now;
}

#[inline]
async fn handle_session_packet(&mut self, session_data: &PacketSessionData, now: Instant) {
if now.duration_since(self.last_updates.session) < SESSION_INTERVAL {
if now.duration_since(self.timestamps.session) < SESSION_INTERVAL {
return;
}

Expand All @@ -339,8 +339,8 @@ impl F1LiveTelemetryService {
};

self.session_type = Some(session_type);
self.data_manager.save_session(session_data);
self.last_updates.session = now;
self.packet_handler.save_session(session_data);
self.timestamps.session = now;
}

#[inline]
Expand All @@ -349,7 +349,7 @@ impl F1LiveTelemetryService {
participants_data: &PacketParticipantsData,
now: Instant,
) -> AppResult<()> {
if now.duration_since(self.last_updates.participants) < SESSION_INTERVAL {
if now.duration_since(self.timestamps.participants) < SESSION_INTERVAL {
return Ok(());
}

Expand All @@ -362,8 +362,8 @@ impl F1LiveTelemetryService {
.await?;
}

self.data_manager.save_participants(participants_data);
self.last_updates.participants = now;
self.packet_handler.save_participants(participants_data);
self.timestamps.participants = now;
Ok(())
}

Expand All @@ -377,7 +377,7 @@ impl F1LiveTelemetryService {
return;
}

self.data_manager.push_event(event_data);
self.packet_handler.push_event(event_data);
}

#[inline]
Expand All @@ -387,7 +387,7 @@ impl F1LiveTelemetryService {
now: Instant,
) {
let Some(last_update) = self
.last_updates
.timestamps
.car_lap
.get_mut(history_data.car_idx as usize)
else {
Expand All @@ -396,7 +396,7 @@ impl F1LiveTelemetryService {
};

if now.duration_since(*last_update) > HISTORY_INTERVAL {
self.data_manager.save_lap_history(history_data);
self.packet_handler.save_lap_history(history_data);
*last_update = now;
}
}
Expand All @@ -411,23 +411,23 @@ impl F1LiveTelemetryService {
return Ok(());
};

self.data_manager
self.packet_handler
.save_final_classification(final_classification);

Ok(())
}

#[inline]
fn handle_car_damage_packet(&mut self, car_damage: &PacketCarDamageData, now: Instant) {
if now.duration_since(self.last_updates.car_damage) > TELEMETRY_INTERVAL {
self.data_manager.save_car_damage(car_damage);
if now.duration_since(self.timestamps.car_damage) > TELEMETRY_INTERVAL {
self.packet_handler.save_car_damage(car_damage);
}
}

#[inline]
fn handle_car_status_packet(&mut self, car_status: &PacketCarStatusData, now: Instant) {
if now.duration_since(self.last_updates.car_status) > TELEMETRY_INTERVAL {
self.data_manager.save_car_status(car_status);
if now.duration_since(self.timestamps.car_status) > TELEMETRY_INTERVAL {
self.packet_handler.save_car_status(car_status);
}
}

Expand All @@ -437,8 +437,8 @@ impl F1LiveTelemetryService {
car_telemetry: &PacketCarTelemetryData,
now: Instant,
) {
if now.duration_since(self.last_updates.car_telemetry) > TELEMETRY_INTERVAL {
self.data_manager.save_car_telemetry(car_telemetry);
if now.duration_since(self.timestamps.car_telemetry) > TELEMETRY_INTERVAL {
self.packet_handler.save_car_telemetry(car_telemetry);
}
}

Expand Down Expand Up @@ -520,7 +520,7 @@ impl F1LiveTelemetryService {
impl F1SessionBroadcaster {
/// Creates a new F1SessionBroadcaster instance
pub fn new(
session_manager: F1TelemetryPacketHandler,
packet_handler: F1TelemetryPacketHandler,
global_channel: Sender<Bytes>,
shutdown: oneshot::Sender<()>,
) -> Self {
Expand All @@ -532,53 +532,60 @@ impl F1SessionBroadcaster {

Self {
inner,
session_manager,
packet_handler,
shutdown: Some(shutdown),
}
}

/// Retrieves the cached data from the session manager
#[inline]
pub fn cache(&self) -> Option<Bytes> {
self.session_manager.cache()
self.packet_handler.cache()
}

/// Subscribes to the global broadcast channel
#[inline]
pub fn global_sub(&self) -> Receiver<Bytes> {
self.global_subscribers.fetch_add(1, Ordering::Relaxed);
self.global_channel.subscribe()
}

/// Subscribes to a team-specific broadcast channel
#[inline]
pub fn team_sub(&self, team_id: u8) -> Option<Receiver<Bytes>> {
let receiver = self.session_manager.get_team_receiver(team_id)?;
let receiver = self.packet_handler.get_team_receiver(team_id)?;
let mut team_subs = self.team_subscribers.write();
*team_subs.entry(team_id).or_insert(0) += 1;
Some(receiver)
}

/// Gets the current number of global subscribers
#[inline]
pub fn global_count(&self) -> u32 {
self.global_subscribers.load(Ordering::Relaxed)
}

/// Gets the current number of subscribers for all teams
#[inline]
pub fn all_team_count(&self) -> u32 {
self.team_subscribers.read().values().sum()
}

/// Gets the current number of subscribers for a specific team
#[inline]
#[allow(unused)]
pub fn team_count(&self, team_id: u8) -> u32 {
*self.team_subscribers.read().get(&team_id).unwrap_or(&0)
}

/// Decrements the global subscriber count
#[inline]
pub fn global_unsub(&self) {
self.global_subscribers.fetch_sub(1, Ordering::Relaxed);
}

/// Decrements the team subscriber count
#[inline]
pub fn team_unsub(&self, team_id: u8) {
let mut team_subs = self.team_subscribers.write();
if let Some(count) = team_subs.get_mut(&team_id) {
Expand Down
2 changes: 1 addition & 1 deletion crates/token_manager/src/token_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use utils::current_timestamp_s;
pub use token::{Token, TokenIntent};

const MAX_TOKENS_PER_USER: usize = 10;
const PURGE_INTERVAL: Duration = Duration::from_secs(900);
const PURGE_INTERVAL: Duration = Duration::from_secs(600);

pub struct TokenManager {
tokens: DashMap<Token, TokenEntry>,
Expand Down

0 comments on commit ac36841

Please sign in to comment.