Skip to content

Commit

Permalink
telemetry changes
Browse files Browse the repository at this point in the history
  • Loading branch information
GPeaky committed Oct 21, 2024
1 parent c24b0a7 commit c5e212d
Show file tree
Hide file tree
Showing 18 changed files with 618 additions and 620 deletions.
5 changes: 5 additions & 0 deletions .github/workflows/deployment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@ on:
branches: [ main ]
types:
- completed


concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true

jobs:
build_and_deploy:
Expand Down
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ members = [
"crates/error",
"crates/id-generator",
"crates/structs",
"crates/telemetry",
"crates/f1-telemetry",
"crates/token_manager",
"crates/utils",
"crates/discord",
Expand All @@ -34,7 +34,7 @@ id-generator = { path = "crates/id-generator" }
intelli-core = { path = "crates/intelli-core" }
password-hash = { path = "crates/password-hash" }
structs = { path = "crates/structs" }
telemetry = { path = "crates/telemetry" }
f1-telemetry = { path = "crates/f1-telemetry" }
token_manager = { path = "crates/token_manager" }
utils = { path = "crates/utils" }

Expand Down
2 changes: 1 addition & 1 deletion crates/api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ token_manager.workspace = true
error.workspace = true
structs.workspace = true
entities.workspace = true
telemetry.workspace = true
f1-telemetry.workspace = true
intelli-core.workspace = true
ntex.workspace = true
tokio.workspace = true
Expand Down
2 changes: 1 addition & 1 deletion crates/api/src/config/local_tracing.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use telemetry::FirewallService;
use f1_telemetry::FirewallService;
use tokio::runtime::Builder;
use tracing::error;
use tracing_log::LogTracer;
Expand Down
4 changes: 2 additions & 2 deletions crates/api/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ use tracing::info;

use config::{initialize_tracing_subscriber, setup_panic_handler};
use db::Database;
use f1_telemetry::FirewallService;
use states::AppState;
use telemetry::FirewallService;

#[cfg(not(test))]
#[global_allocator]
Expand Down Expand Up @@ -71,7 +71,7 @@ async fn main() -> std::io::Result<()> {
})
.await?;

info!("Stoping service, cleaning up firewall rules");
info!("Stopping service, cleaning up firewall rules");
firewall_svc.close_all().await.unwrap();

Ok(())
Expand Down
8 changes: 3 additions & 5 deletions crates/api/src/states/app.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,16 @@
use db::Database;
use error::AppResult;

use f1_telemetry::{F1ChampionshipManager, F1State, FirewallService};
use intelli_core::{
repositories::{
ChampionshipRepository, DiscordRepository, DriverRepository, ServerRepository,
UserRepository,
},
services::{ChampionshipService, DriverService, EmailService, UserService},
};
use telemetry::{F1ServiceHandler, F1State, FirewallService};
use token_manager::TokenManager;

// F1ServiceHandler, FirewallService

#[derive(Clone)]
pub struct AppState {
pub user_svc: &'static UserService,
Expand All @@ -25,7 +23,7 @@ pub struct AppState {
#[allow(unused)]
pub driver_svc: &'static DriverService,
pub email_svc: EmailService,
pub f1_svc: F1ServiceHandler,
pub f1_svc: F1ChampionshipManager,
pub discord_repo: &'static DiscordRepository,
pub server_repo: ServerRepository,
}
Expand Down Expand Up @@ -59,7 +57,7 @@ impl AppState {

Ok(Self {
user_svc,
f1_svc: F1ServiceHandler::new(f1_state),
f1_svc: F1ChampionshipManager::new(f1_state),
user_repo,
token_mgr,
championship_svc,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
[package]
name = "telemetry"
name = "f1-telemetry"
version = "0.1.0"
edition = "2021"

[lints]
workspace = true

[lib]
path = "src/telemetry.rs"
path = "src/championship.rs"
doctest = false

[build-dependencies]
Expand Down
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
mod f1;
mod firewall;
mod manager;
mod service;
mod handler;
mod live_service;
mod types;

use dashmap::DashMap;
use intelli_core::{
Expand All @@ -16,17 +16,17 @@ use tokio::sync::{
use tracing::{info, warn};

use error::{AppResult, F1ServiceError};
use manager::F1SessionDataManager;
use service::{F1Service, F1ServiceData};
use handler::F1TelemetryPacketHandler;
use live_service::{F1LiveTelemetryService, F1SessionBroadcaster};
use structs::ServiceStatus;

pub use firewall::FirewallService;
pub use manager::DriverInfo;
pub use handler::DriverInfo;

/// Manages F1 championship services, including caching, subscriptions, and service lifecycle.
#[derive(Clone)]
pub struct F1ServiceHandler {
services: &'static DashMap<i32, F1ServiceData>,
pub struct F1ChampionshipManager {
services: &'static DashMap<i32, F1SessionBroadcaster>,
f1_state: &'static F1State,
}

Expand All @@ -39,7 +39,7 @@ pub struct F1State {
pub championship_svc: &'static ChampionshipService,
}

impl F1ServiceHandler {
impl F1ChampionshipManager {
/// Creates a new F1ServiceHandler instance.
pub fn new(f1_state: &'static F1State) -> Self {
let services = Box::leak(Box::new(DashMap::with_capacity(10)));
Expand Down Expand Up @@ -102,9 +102,10 @@ impl F1ServiceHandler {

let (otx, orx) = oneshot::channel::<()>();
let (tx, _) = channel::<Bytes>(50);
let session_manager = F1SessionDataManager::new(tx.clone());
let service_data = F1ServiceData::new(session_manager.clone(), tx, otx);
let mut service = F1Service::new(session_manager, orx, self.services, self.f1_state).await;
let session_manager = F1TelemetryPacketHandler::new(tx.clone());
let service_data = F1SessionBroadcaster::new(session_manager.clone(), tx, otx);
let mut service =
F1LiveTelemetryService::new(session_manager, orx, self.services, self.f1_state).await;

service.initialize(port, championship_id, 0).await?;

Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use tokio::sync::{
};
use tracing::{error, warn};

use crate::f1::{
use crate::types::{
CarDamageData as F1CarDamageData, CarMotionData as F1CarMotionData,
CarStatusData as F1CarStatusData, CarTelemetryData as F1CarTelemetryData, EventCode,
EventDataDetails as F1EventDataDetails, FinalClassificationData as F1FinalClassificationData,
Expand Down Expand Up @@ -50,8 +50,13 @@ pub struct DriverInfo {
pub team_id: u8,
}

#[derive(Clone)]
pub struct F1TelemetryPacketHandler {
inner: Arc<F1TelemetryPacketHandlerInner>,
}

#[derive(Debug)]
pub struct F1SessionDataManagerInner {
pub struct F1TelemetryPacketHandlerInner {
driver_info: RwLock<AHashMap<usize, DriverInfo>>,
general: RwLock<F1GeneralInfo>,
telemetry: RwLock<F1TelemetryInfo>,
Expand All @@ -62,24 +67,19 @@ pub struct F1SessionDataManagerInner {
stop_sender: Mutex<Option<oneshot::Sender<()>>>,
}

#[derive(Clone)]
pub struct F1SessionDataManager {
inner: Arc<F1SessionDataManagerInner>,
}

// Implementations
impl Deref for F1SessionDataManager {
type Target = F1SessionDataManagerInner;
impl Deref for F1TelemetryPacketHandler {
type Target = F1TelemetryPacketHandlerInner;

fn deref(&self) -> &Self::Target {
&self.inner
}
}

impl F1SessionDataManager {
/// Creates a new F1SessionDataManager instance
impl F1TelemetryPacketHandler {
/// Creates a new F1TelemetryPacketHandler instance
pub fn new(tx: Sender<Bytes>) -> Self {
let inner = Arc::new(F1SessionDataManagerInner {
let inner = Arc::new(F1TelemetryPacketHandlerInner {
driver_info: RwLock::new(AHashMap::new()),
general: RwLock::new(F1GeneralInfo::default()),
telemetry: RwLock::new(F1TelemetryInfo::default()),
Expand Down Expand Up @@ -298,7 +298,7 @@ impl F1SessionDataManager {

/// Sends general updates
#[inline]
fn send_general_updates(inner: &Arc<F1SessionDataManagerInner>, tx: &Sender<Bytes>) {
fn send_general_updates(inner: &Arc<F1TelemetryPacketHandlerInner>, tx: &Sender<Bytes>) {
if tx.receiver_count() == 0 {
return;
}
Expand All @@ -319,7 +319,7 @@ impl F1SessionDataManager {

/// Sends telemetry updates
#[inline]
fn send_telemetry_updates(inner: &Arc<F1SessionDataManagerInner>) {
fn send_telemetry_updates(inner: &Arc<F1TelemetryPacketHandlerInner>) {
let driver_info = inner.driver_info.read();
let telemetry = inner.telemetry.read();
let mut last_telemetry = inner.last_telemetry.write();
Expand Down Expand Up @@ -391,7 +391,7 @@ impl F1SessionDataManager {
}
}

impl Drop for F1SessionDataManager {
impl Drop for F1TelemetryPacketHandler {
fn drop(&mut self) {
if let Some(sender) = self.stop_sender.lock().take() {
let _ = sender.send(());
Expand Down
Loading

0 comments on commit c5e212d

Please sign in to comment.