Skip to content

Commit

Permalink
fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
dr-bonez committed Dec 11, 2024
1 parent 282a451 commit 1c90ddb
Show file tree
Hide file tree
Showing 14 changed files with 241 additions and 99 deletions.
3 changes: 1 addition & 2 deletions core/startos/src/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,9 +187,8 @@ pub fn check_password_against_db(db: &DatabaseModel, password: &str) -> Result<(
Ok(())
}

#[derive(Deserialize, Serialize, Parser, TS)]
#[derive(Deserialize, Serialize, TS)]
#[serde(rename_all = "camelCase")]
#[command(rename_all = "kebab-case")]
#[ts(export)]
pub struct LoginParams {
password: Option<PasswordType>,
Expand Down
4 changes: 2 additions & 2 deletions core/startos/src/bins/container_cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@ use rpc_toolkit::CliApp;
use serde_json::Value;

use crate::service::cli::{ContainerCliContext, ContainerClientConfig};
use crate::util::logger::EmbassyLogger;
use crate::util::logger::LOGGER;
use crate::version::{Current, VersionT};

lazy_static::lazy_static! {
static ref VERSION_STRING: String = Current::default().semver().to_string();
}

pub fn main(args: impl IntoIterator<Item = OsString>) {
EmbassyLogger::init();
LOGGER.enable();
if let Err(e) = CliApp::new(
|cfg: ContainerClientConfig| Ok(ContainerCliContext::init(cfg)),
crate::service::effects::handler(),
Expand Down
4 changes: 2 additions & 2 deletions core/startos/src/bins/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use tracing::instrument;
use crate::net::web_server::WebServer;
use crate::prelude::*;
use crate::registry::context::{RegistryConfig, RegistryContext};
use crate::util::logger::EmbassyLogger;
use crate::util::logger::LOGGER;

#[instrument(skip_all)]
async fn inner_main(config: &RegistryConfig) -> Result<(), Error> {
Expand Down Expand Up @@ -63,7 +63,7 @@ async fn inner_main(config: &RegistryConfig) -> Result<(), Error> {
}

pub fn main(args: impl IntoIterator<Item = OsString>) {
EmbassyLogger::init();
LOGGER.enable();

let config = RegistryConfig::parse_from(args).load().unwrap();

Expand Down
5 changes: 3 additions & 2 deletions core/startos/src/bins/start_cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,16 @@ use serde_json::Value;

use crate::context::config::ClientConfig;
use crate::context::CliContext;
use crate::util::logger::EmbassyLogger;
use crate::util::logger::LOGGER;
use crate::version::{Current, VersionT};

lazy_static::lazy_static! {
static ref VERSION_STRING: String = Current::default().semver().to_string();
}

pub fn main(args: impl IntoIterator<Item = OsString>) {
EmbassyLogger::init();
LOGGER.enable();

if let Err(e) = CliApp::new(
|cfg: ClientConfig| Ok(CliContext::init(cfg.load()?)?),
crate::expanded_api(),
Expand Down
1 change: 1 addition & 0 deletions core/startos/src/bins/start_init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ async fn setup_or_init(
tracing::info!("Loaded Disk");

if requires_reboot.0 {
tracing::info!("Rebooting...");
let mut reboot_phase = handle.add_phase("Rebooting".into(), Some(1));
reboot_phase.start();
return Ok(Err(Shutdown {
Expand Down
10 changes: 8 additions & 2 deletions core/startos/src/bins/startd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::sync::Arc;
use clap::Parser;
use color_eyre::eyre::eyre;
use futures::{FutureExt, TryFutureExt};
use tokio::fs::OpenOptions;
use tokio::signal::unix::signal;
use tracing::instrument;

Expand All @@ -15,7 +16,8 @@ use crate::context::{DiagnosticContext, InitContext, RpcContext};
use crate::net::web_server::WebServer;
use crate::shutdown::Shutdown;
use crate::system::launch_metrics_task;
use crate::util::logger::EmbassyLogger;
use crate::util::io::append_file;
use crate::util::logger::LOGGER;
use crate::{Error, ErrorKind, ResultExt};

#[instrument(skip_all)]
Expand All @@ -27,13 +29,17 @@ async fn inner_main(
.await
.is_ok()
{
LOGGER.set_logfile(Some(
append_file("/run/startos/init.log").await?.into_std().await,
));
let (ctx, handle) = match super::start_init::main(server, &config).await? {
Err(s) => return Ok(Some(s)),
Ok(ctx) => ctx,
};
tokio::fs::write("/run/startos/initialized", "").await?;

server.serve_main(ctx.clone());
LOGGER.set_logfile(None);
handle.complete();

ctx
Expand Down Expand Up @@ -131,7 +137,7 @@ async fn inner_main(
}

pub fn main(args: impl IntoIterator<Item = OsString>) {
EmbassyLogger::init();
LOGGER.enable();

let config = ServerConfig::parse_from(args).load().unwrap();

Expand Down
153 changes: 78 additions & 75 deletions core/startos/src/net/network_interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ use std::net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV6};
use std::pin::Pin;
use std::sync::{Arc, Weak};
use std::task::Poll;
use std::time::Duration;

use clap::Parser;
use futures::future::pending;
use futures::{FutureExt, TryFutureExt};
use futures::{Stream, StreamExt, TryStreamExt};
use getifaddrs::if_nametoindex;
use helpers::NonDetachingJoinHandle;
use imbl_value::InternedString;
Expand All @@ -18,9 +18,8 @@ use rpc_toolkit::{from_fn_async, Context, HandlerArgs, HandlerExt, ParentHandler
use serde::{Deserialize, Serialize};
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::watch;
use tokio_stream::StreamExt;
use ts_rs::TS;
use zbus::proxy::{PropertyStream, SignalStream};
use zbus::proxy::{PropertyChanged, PropertyStream, SignalStream};
use zbus::zvariant::{
DeserializeDict, OwnedObjectPath, OwnedValue, Type as ZType, Value as ZValue,
};
Expand All @@ -31,6 +30,7 @@ use crate::db::model::public::{IpInfo, NetworkInterfaceInfo};
use crate::db::model::Database;
use crate::net::network_interface::active_connection::ActiveConnectionProxy;
use crate::prelude::*;
use crate::util::future::Until;
use crate::util::serde::{display_serializable, HandlerExtSerde};
use crate::util::sync::SyncMutex;

Expand Down Expand Up @@ -222,53 +222,31 @@ trait Device {
#[zbus(property)]
fn ip6_config(&self) -> Result<OwnedObjectPath, Error>;

#[zbus(property, name = "State")]
fn _state(&self) -> Result<u32, Error>;

#[zbus(signal)]
fn state_changed(&self) -> Result<(), Error>;
}

struct WatchPropertyStream<'a, T> {
stream: PropertyStream<'a, T>,
signals: Vec<SignalStream<'a>>,
trait StubStream<'a> {
fn stub(self) -> impl Stream<Item = Result<(), Error>> + 'a;
}
impl<'a, T> WatchPropertyStream<'a, T>
impl<'a, T> StubStream<'a> for PropertyStream<'a, T>
where
T: Unpin + TryFrom<OwnedValue>,
T: Unpin + TryFrom<OwnedValue> + std::fmt::Debug + 'a,
T::Error: Into<zbus::Error>,
{
fn new(stream: PropertyStream<'a, T>) -> Self {
Self {
stream,
signals: Vec::new(),
}
}

fn with_signal(mut self, stream: SignalStream<'a>) -> Self {
self.signals.push(stream);
self
fn stub(self) -> impl Stream<Item = Result<(), Error>> + 'a {
StreamExt::then(self, |d| async move {
PropertyChanged::get(&d).await.map(|_| ())
})
.map_err(Error::from)
}

async fn until_changed<Fut: Future<Output = Result<(), Error>>>(
&mut self,
fut: Fut,
) -> Result<(), Error> {
let next = self.stream.next();
let signal = if !self.signals.is_empty() {
futures::future::select_all(self.signals.iter_mut().map(|s| s.next().boxed())).boxed()
} else {
futures::future::pending().boxed()
};
tokio::select! {
changed = next => {
changed.ok_or_else(|| Error::new(eyre!("stream is empty"), ErrorKind::DBus))?.get().await?;
Ok(())
},
_ = signal => {
Ok(())
},
res = fut.and_then(|_| pending()) => {
res
}
}
}
impl<'a> StubStream<'a> for SignalStream<'a> {
fn stub(self) -> impl Stream<Item = Result<(), Error>> + 'a {
self.map(|_| Ok(()))
}
}

Expand All @@ -277,17 +255,30 @@ async fn watcher(write_to: watch::Sender<BTreeMap<InternedString, NetworkInterfa
loop {
let res: Result<(), Error> = async {
let connection = Connection::system().await?;

let netman_proxy = NetworkManagerProxy::new(&connection).await?;

let mut devices_sub =
WatchPropertyStream::new(netman_proxy.receive_devices_changed().await)
.with_signal(netman_proxy.receive_device_added().await?.into_inner())
.with_signal(netman_proxy.receive_device_removed().await?.into_inner());
let mut until = Until::new()
.with_stream(netman_proxy.receive_devices_changed().await.stub())
.with_stream(
netman_proxy
.receive_device_added()
.await?
.into_inner()
.stub(),
)
.with_stream(
netman_proxy
.receive_device_removed()
.await?
.into_inner()
.stub(),
);

loop {
let devices = netman_proxy.devices().await?;
devices_sub
.until_changed(async {
until
.run(async {
let mut ifaces = BTreeSet::new();
let mut jobs = Vec::new();
for device in devices {
Expand Down Expand Up @@ -352,6 +343,7 @@ async fn get_wan_ipv4(local_addr: Ipv4Addr) -> Result<Option<Ipv4Addr>, Error> {
.local_address(Some(IpAddr::V4(local_addr)))
.build()?
.get("http://ip4only.me/api/")
.timeout(Duration::from_secs(10))
.send()
.await?
.error_for_status()?
Expand All @@ -373,31 +365,40 @@ async fn watch_ip(
iface: InternedString,
write_to: &watch::Sender<BTreeMap<InternedString, NetworkInterfaceInfo>>,
) -> Result<(), Error> {
let mut con_sub =
WatchPropertyStream::new(device_proxy.receive_active_connection_changed().await)
.with_signal(device_proxy.receive_state_changed().await?.into_inner())
.with_signal(
active_connection_proxy
.receive_state_changed()
.await?
.into_inner(),
);
let mut ip4_config_sub =
WatchPropertyStream::new(device_proxy.receive_ip4_config_changed().await);
let mut ip6_config_sub =
WatchPropertyStream::new(device_proxy.receive_ip6_config_changed().await);
let mut until = Until::new()
.with_stream(
device_proxy
.receive_active_connection_changed()
.await
.stub(),
)
.with_stream(
device_proxy
.receive_state_changed()
.await?
.into_inner()
.stub(),
)
.with_stream(
active_connection_proxy
.receive_state_changed()
.await?
.into_inner()
.stub(),
)
.with_stream(device_proxy.receive_ip4_config_changed().await.stub())
.with_stream(device_proxy.receive_ip6_config_changed().await.stub());

loop {
let ip4_config = device_proxy.ip4_config().await?;
let ip6_config = device_proxy.ip6_config().await?;
ip4_config_sub
.until_changed(ip6_config_sub.until_changed(async {
until
.run(async {
let ip4_proxy = Ip4ConfigProxy::new(&connection, ip4_config).await?;
let mut address4_sub =
WatchPropertyStream::new(ip4_proxy.receive_address_data_changed().await);
let ip6_proxy = Ip6ConfigProxy::new(&connection, ip6_config).await?;
let mut address6_sub =
WatchPropertyStream::new(ip6_proxy.receive_address_data_changed().await);
let mut until = Until::new()
.with_stream(ip4_proxy.receive_address_data_changed().await.stub())
.with_stream(ip6_proxy.receive_address_data_changed().await.stub());

loop {
let addresses = ip4_proxy
Expand All @@ -406,8 +407,11 @@ async fn watch_ip(
.into_iter()
.chain(ip6_proxy.address_data().await?)
.collect_vec();
address4_sub
.until_changed(address6_sub.until_changed(async {
if iface == "enp1s0" {
dbg!(&addresses);
}
until
.run(async {
let scope_id = if_nametoindex(&*iface).with_kind(ErrorKind::Network)?;
let subnets: BTreeSet<IpNet> =
addresses.into_iter().map(TryInto::try_into).try_collect()?;
Expand Down Expand Up @@ -448,20 +452,20 @@ async fn watch_ip(
let public = m.get(&iface).map_or(None, |i| i.public);
m.insert(
iface.clone(),
NetworkInterfaceInfo {
dbg!(NetworkInterfaceInfo {
public,
ip_info: ip_info.clone(),
},
}),
)
.filter(|old| &old.ip_info == &ip_info)
.filter(|old| &dbg!(old).ip_info == &ip_info)
.is_none()
});

Ok::<_, Error>(())
}))
})
.await?;
}
}))
})
.await?;
}
}
Expand Down Expand Up @@ -675,7 +679,6 @@ impl NetworkInterfaceListener {
loop {
if self.needs_update {
let ip_info = self.ip_info.borrow().clone();
dbg!("changed", &ip_info);
self.listeners.update(&ip_info, public).await?;
self.needs_update = false;
}
Expand Down
9 changes: 9 additions & 0 deletions core/startos/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,12 @@ pub use tracing::instrument;
pub use crate::db::prelude::*;
pub use crate::ensure_code;
pub use crate::error::{Error, ErrorCollection, ErrorKind, ResultExt};

#[macro_export]
macro_rules! dbg {
($e:expr) => {{
let e = $e;
tracing::debug!("[{}:{}:{}] $e = {e:?}", file!(), line!(), column!());
e
}};
}
Loading

0 comments on commit 1c90ddb

Please sign in to comment.