diff --git a/core/startos/src/auth.rs b/core/startos/src/auth.rs index a6b624b70..9085709ab 100644 --- a/core/startos/src/auth.rs +++ b/core/startos/src/auth.rs @@ -187,9 +187,8 @@ pub fn check_password_against_db(db: &DatabaseModel, password: &str) -> Result<( Ok(()) } -#[derive(Deserialize, Serialize, Parser, TS)] +#[derive(Deserialize, Serialize, TS)] #[serde(rename_all = "camelCase")] -#[command(rename_all = "kebab-case")] #[ts(export)] pub struct LoginParams { password: Option, diff --git a/core/startos/src/bins/container_cli.rs b/core/startos/src/bins/container_cli.rs index b0da1cb00..118133f55 100644 --- a/core/startos/src/bins/container_cli.rs +++ b/core/startos/src/bins/container_cli.rs @@ -4,7 +4,7 @@ use rpc_toolkit::CliApp; use serde_json::Value; use crate::service::cli::{ContainerCliContext, ContainerClientConfig}; -use crate::util::logger::EmbassyLogger; +use crate::util::logger::LOGGER; use crate::version::{Current, VersionT}; lazy_static::lazy_static! { @@ -12,7 +12,7 @@ lazy_static::lazy_static! { } pub fn main(args: impl IntoIterator) { - EmbassyLogger::init(); + LOGGER.enable(); if let Err(e) = CliApp::new( |cfg: ContainerClientConfig| Ok(ContainerCliContext::init(cfg)), crate::service::effects::handler(), diff --git a/core/startos/src/bins/registry.rs b/core/startos/src/bins/registry.rs index 132e0984a..a455167fc 100644 --- a/core/startos/src/bins/registry.rs +++ b/core/startos/src/bins/registry.rs @@ -8,7 +8,7 @@ use tracing::instrument; use crate::net::web_server::WebServer; use crate::prelude::*; use crate::registry::context::{RegistryConfig, RegistryContext}; -use crate::util::logger::EmbassyLogger; +use crate::util::logger::LOGGER; #[instrument(skip_all)] async fn inner_main(config: &RegistryConfig) -> Result<(), Error> { @@ -63,7 +63,7 @@ async fn inner_main(config: &RegistryConfig) -> Result<(), Error> { } pub fn main(args: impl IntoIterator) { - EmbassyLogger::init(); + LOGGER.enable(); let config = RegistryConfig::parse_from(args).load().unwrap(); diff --git a/core/startos/src/bins/start_cli.rs b/core/startos/src/bins/start_cli.rs index 2e92e0cc0..bda5e00d3 100644 --- a/core/startos/src/bins/start_cli.rs +++ b/core/startos/src/bins/start_cli.rs @@ -5,7 +5,7 @@ use serde_json::Value; use crate::context::config::ClientConfig; use crate::context::CliContext; -use crate::util::logger::EmbassyLogger; +use crate::util::logger::LOGGER; use crate::version::{Current, VersionT}; lazy_static::lazy_static! { @@ -13,7 +13,8 @@ lazy_static::lazy_static! { } pub fn main(args: impl IntoIterator) { - EmbassyLogger::init(); + LOGGER.enable(); + if let Err(e) = CliApp::new( |cfg: ClientConfig| Ok(CliContext::init(cfg.load()?)?), crate::expanded_api(), diff --git a/core/startos/src/bins/start_init.rs b/core/startos/src/bins/start_init.rs index 394d42c8d..fc65feae9 100644 --- a/core/startos/src/bins/start_init.rs +++ b/core/startos/src/bins/start_init.rs @@ -178,6 +178,7 @@ async fn setup_or_init( tracing::info!("Loaded Disk"); if requires_reboot.0 { + tracing::info!("Rebooting..."); let mut reboot_phase = handle.add_phase("Rebooting".into(), Some(1)); reboot_phase.start(); return Ok(Err(Shutdown { diff --git a/core/startos/src/bins/startd.rs b/core/startos/src/bins/startd.rs index d383f3091..e93857c38 100644 --- a/core/startos/src/bins/startd.rs +++ b/core/startos/src/bins/startd.rs @@ -6,6 +6,7 @@ use std::sync::Arc; use clap::Parser; use color_eyre::eyre::eyre; use futures::{FutureExt, TryFutureExt}; +use tokio::fs::OpenOptions; use tokio::signal::unix::signal; use tracing::instrument; @@ -15,7 +16,8 @@ use crate::context::{DiagnosticContext, InitContext, RpcContext}; use crate::net::web_server::WebServer; use crate::shutdown::Shutdown; use crate::system::launch_metrics_task; -use crate::util::logger::EmbassyLogger; +use crate::util::io::append_file; +use crate::util::logger::LOGGER; use crate::{Error, ErrorKind, ResultExt}; #[instrument(skip_all)] @@ -27,6 +29,9 @@ async fn inner_main( .await .is_ok() { + LOGGER.set_logfile(Some( + append_file("/run/startos/init.log").await?.into_std().await, + )); let (ctx, handle) = match super::start_init::main(server, &config).await? { Err(s) => return Ok(Some(s)), Ok(ctx) => ctx, @@ -34,6 +39,7 @@ async fn inner_main( tokio::fs::write("/run/startos/initialized", "").await?; server.serve_main(ctx.clone()); + LOGGER.set_logfile(None); handle.complete(); ctx @@ -131,7 +137,7 @@ async fn inner_main( } pub fn main(args: impl IntoIterator) { - EmbassyLogger::init(); + LOGGER.enable(); let config = ServerConfig::parse_from(args).load().unwrap(); diff --git a/core/startos/src/net/network_interface.rs b/core/startos/src/net/network_interface.rs index 6071875f3..dfb1f34b2 100644 --- a/core/startos/src/net/network_interface.rs +++ b/core/startos/src/net/network_interface.rs @@ -4,10 +4,10 @@ use std::net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV6}; use std::pin::Pin; use std::sync::{Arc, Weak}; use std::task::Poll; +use std::time::Duration; use clap::Parser; -use futures::future::pending; -use futures::{FutureExt, TryFutureExt}; +use futures::{Stream, StreamExt, TryStreamExt}; use getifaddrs::if_nametoindex; use helpers::NonDetachingJoinHandle; use imbl_value::InternedString; @@ -18,9 +18,8 @@ use rpc_toolkit::{from_fn_async, Context, HandlerArgs, HandlerExt, ParentHandler use serde::{Deserialize, Serialize}; use tokio::net::{TcpListener, TcpStream}; use tokio::sync::watch; -use tokio_stream::StreamExt; use ts_rs::TS; -use zbus::proxy::{PropertyStream, SignalStream}; +use zbus::proxy::{PropertyChanged, PropertyStream, SignalStream}; use zbus::zvariant::{ DeserializeDict, OwnedObjectPath, OwnedValue, Type as ZType, Value as ZValue, }; @@ -31,6 +30,7 @@ use crate::db::model::public::{IpInfo, NetworkInterfaceInfo}; use crate::db::model::Database; use crate::net::network_interface::active_connection::ActiveConnectionProxy; use crate::prelude::*; +use crate::util::future::Until; use crate::util::serde::{display_serializable, HandlerExtSerde}; use crate::util::sync::SyncMutex; @@ -222,53 +222,31 @@ trait Device { #[zbus(property)] fn ip6_config(&self) -> Result; + #[zbus(property, name = "State")] + fn _state(&self) -> Result; + #[zbus(signal)] fn state_changed(&self) -> Result<(), Error>; } -struct WatchPropertyStream<'a, T> { - stream: PropertyStream<'a, T>, - signals: Vec>, +trait StubStream<'a> { + fn stub(self) -> impl Stream> + 'a; } -impl<'a, T> WatchPropertyStream<'a, T> +impl<'a, T> StubStream<'a> for PropertyStream<'a, T> where - T: Unpin + TryFrom, + T: Unpin + TryFrom + std::fmt::Debug + 'a, T::Error: Into, { - fn new(stream: PropertyStream<'a, T>) -> Self { - Self { - stream, - signals: Vec::new(), - } - } - - fn with_signal(mut self, stream: SignalStream<'a>) -> Self { - self.signals.push(stream); - self + fn stub(self) -> impl Stream> + 'a { + StreamExt::then(self, |d| async move { + PropertyChanged::get(&d).await.map(|_| ()) + }) + .map_err(Error::from) } - - async fn until_changed>>( - &mut self, - fut: Fut, - ) -> Result<(), Error> { - let next = self.stream.next(); - let signal = if !self.signals.is_empty() { - futures::future::select_all(self.signals.iter_mut().map(|s| s.next().boxed())).boxed() - } else { - futures::future::pending().boxed() - }; - tokio::select! { - changed = next => { - changed.ok_or_else(|| Error::new(eyre!("stream is empty"), ErrorKind::DBus))?.get().await?; - Ok(()) - }, - _ = signal => { - Ok(()) - }, - res = fut.and_then(|_| pending()) => { - res - } - } +} +impl<'a> StubStream<'a> for SignalStream<'a> { + fn stub(self) -> impl Stream> + 'a { + self.map(|_| Ok(())) } } @@ -277,17 +255,30 @@ async fn watcher(write_to: watch::Sender = async { let connection = Connection::system().await?; + let netman_proxy = NetworkManagerProxy::new(&connection).await?; - let mut devices_sub = - WatchPropertyStream::new(netman_proxy.receive_devices_changed().await) - .with_signal(netman_proxy.receive_device_added().await?.into_inner()) - .with_signal(netman_proxy.receive_device_removed().await?.into_inner()); + let mut until = Until::new() + .with_stream(netman_proxy.receive_devices_changed().await.stub()) + .with_stream( + netman_proxy + .receive_device_added() + .await? + .into_inner() + .stub(), + ) + .with_stream( + netman_proxy + .receive_device_removed() + .await? + .into_inner() + .stub(), + ); loop { let devices = netman_proxy.devices().await?; - devices_sub - .until_changed(async { + until + .run(async { let mut ifaces = BTreeSet::new(); let mut jobs = Vec::new(); for device in devices { @@ -352,6 +343,7 @@ async fn get_wan_ipv4(local_addr: Ipv4Addr) -> Result, Error> { .local_address(Some(IpAddr::V4(local_addr))) .build()? .get("http://ip4only.me/api/") + .timeout(Duration::from_secs(10)) .send() .await? .error_for_status()? @@ -373,31 +365,40 @@ async fn watch_ip( iface: InternedString, write_to: &watch::Sender>, ) -> Result<(), Error> { - let mut con_sub = - WatchPropertyStream::new(device_proxy.receive_active_connection_changed().await) - .with_signal(device_proxy.receive_state_changed().await?.into_inner()) - .with_signal( - active_connection_proxy - .receive_state_changed() - .await? - .into_inner(), - ); - let mut ip4_config_sub = - WatchPropertyStream::new(device_proxy.receive_ip4_config_changed().await); - let mut ip6_config_sub = - WatchPropertyStream::new(device_proxy.receive_ip6_config_changed().await); + let mut until = Until::new() + .with_stream( + device_proxy + .receive_active_connection_changed() + .await + .stub(), + ) + .with_stream( + device_proxy + .receive_state_changed() + .await? + .into_inner() + .stub(), + ) + .with_stream( + active_connection_proxy + .receive_state_changed() + .await? + .into_inner() + .stub(), + ) + .with_stream(device_proxy.receive_ip4_config_changed().await.stub()) + .with_stream(device_proxy.receive_ip6_config_changed().await.stub()); loop { let ip4_config = device_proxy.ip4_config().await?; let ip6_config = device_proxy.ip6_config().await?; - ip4_config_sub - .until_changed(ip6_config_sub.until_changed(async { + until + .run(async { let ip4_proxy = Ip4ConfigProxy::new(&connection, ip4_config).await?; - let mut address4_sub = - WatchPropertyStream::new(ip4_proxy.receive_address_data_changed().await); let ip6_proxy = Ip6ConfigProxy::new(&connection, ip6_config).await?; - let mut address6_sub = - WatchPropertyStream::new(ip6_proxy.receive_address_data_changed().await); + let mut until = Until::new() + .with_stream(ip4_proxy.receive_address_data_changed().await.stub()) + .with_stream(ip6_proxy.receive_address_data_changed().await.stub()); loop { let addresses = ip4_proxy @@ -406,8 +407,11 @@ async fn watch_ip( .into_iter() .chain(ip6_proxy.address_data().await?) .collect_vec(); - address4_sub - .until_changed(address6_sub.until_changed(async { + if iface == "enp1s0" { + dbg!(&addresses); + } + until + .run(async { let scope_id = if_nametoindex(&*iface).with_kind(ErrorKind::Network)?; let subnets: BTreeSet = addresses.into_iter().map(TryInto::try_into).try_collect()?; @@ -448,20 +452,20 @@ async fn watch_ip( let public = m.get(&iface).map_or(None, |i| i.public); m.insert( iface.clone(), - NetworkInterfaceInfo { + dbg!(NetworkInterfaceInfo { public, ip_info: ip_info.clone(), - }, + }), ) - .filter(|old| &old.ip_info == &ip_info) + .filter(|old| &dbg!(old).ip_info == &ip_info) .is_none() }); Ok::<_, Error>(()) - })) + }) .await?; } - })) + }) .await?; } } @@ -675,7 +679,6 @@ impl NetworkInterfaceListener { loop { if self.needs_update { let ip_info = self.ip_info.borrow().clone(); - dbg!("changed", &ip_info); self.listeners.update(&ip_info, public).await?; self.needs_update = false; } diff --git a/core/startos/src/prelude.rs b/core/startos/src/prelude.rs index dddc1ecda..a6a78a58d 100644 --- a/core/startos/src/prelude.rs +++ b/core/startos/src/prelude.rs @@ -6,3 +6,12 @@ pub use tracing::instrument; pub use crate::db::prelude::*; pub use crate::ensure_code; pub use crate::error::{Error, ErrorCollection, ErrorKind, ResultExt}; + +#[macro_export] +macro_rules! dbg { + ($e:expr) => {{ + let e = $e; + tracing::debug!("[{}:{}:{}] $e = {e:?}", file!(), line!(), column!()); + e + }}; +} diff --git a/core/startos/src/util/future.rs b/core/startos/src/util/future.rs index f40e847bf..c690f9754 100644 --- a/core/startos/src/util/future.rs +++ b/core/startos/src/util/future.rs @@ -1,11 +1,13 @@ use std::pin::Pin; use std::task::{Context, Poll}; -use futures::future::abortable; -use futures::stream::{AbortHandle, Abortable}; -use futures::Future; +use futures::future::{abortable, pending, BoxFuture, FusedFuture}; +use futures::stream::{AbortHandle, Abortable, BoxStream}; +use futures::{Future, FutureExt, Stream, StreamExt}; use tokio::sync::watch; +use crate::prelude::*; + #[pin_project::pin_project(PinnedDrop)] pub struct DropSignaling { #[pin] @@ -102,6 +104,60 @@ impl CancellationHandle { } } +#[derive(Default)] +pub struct Until<'a> { + streams: Vec>>, + async_fns: Vec BoxFuture<'a, Result<(), Error>> + Send + 'a>>, +} +impl<'a> Until<'a> { + pub fn new() -> Self { + Self::default() + } + + pub fn with_stream( + mut self, + stream: impl Stream> + Send + 'a, + ) -> Self { + self.streams.push(stream.boxed()); + self + } + + pub fn with_async_fn(mut self, mut f: F) -> Self + where + F: FnMut() -> Fut + Send + 'a, + Fut: Future> + FusedFuture + Send + 'a, + { + self.async_fns.push(Box::new(move || f().boxed())); + self + } + + pub async fn run> + Send>( + &mut self, + fut: Fut, + ) -> Result<(), Error> { + let (res, _, _) = futures::future::select_all( + self.streams + .iter_mut() + .map(|s| { + async { + s.next().await.transpose()?.ok_or_else(|| { + Error::new(eyre!("stream is empty"), ErrorKind::Cancelled) + }) + } + .boxed() + }) + .chain(self.async_fns.iter_mut().map(|f| f())) + .chain([async { + fut.await?; + pending().await + } + .boxed()]), + ) + .await; + res + } +} + #[tokio::test] async fn test_cancellable() { use std::sync::Arc; diff --git a/core/startos/src/util/io.rs b/core/startos/src/util/io.rs index 8e270c912..9018b3344 100644 --- a/core/startos/src/util/io.rs +++ b/core/startos/src/util/io.rs @@ -15,7 +15,7 @@ use futures::future::{BoxFuture, Fuse}; use futures::{AsyncSeek, FutureExt, Stream, TryStreamExt}; use helpers::NonDetachingJoinHandle; use nix::unistd::{Gid, Uid}; -use tokio::fs::File; +use tokio::fs::{File, OpenOptions}; use tokio::io::{ duplex, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, DuplexStream, ReadBuf, WriteHalf, }; @@ -935,6 +935,21 @@ pub async fn create_file(path: impl AsRef) -> Result { .with_ctx(|_| (ErrorKind::Filesystem, lazy_format!("create {path:?}"))) } +pub async fn append_file(path: impl AsRef) -> Result { + let path = path.as_ref(); + if let Some(parent) = path.parent() { + tokio::fs::create_dir_all(parent) + .await + .with_ctx(|_| (ErrorKind::Filesystem, lazy_format!("mkdir -p {parent:?}")))?; + } + OpenOptions::new() + .create(true) + .append(true) + .open(path) + .await + .with_ctx(|_| (ErrorKind::Filesystem, lazy_format!("create {path:?}"))) +} + pub async fn delete_file(path: impl AsRef) -> Result<(), Error> { let path = path.as_ref(); tokio::fs::remove_file(path) diff --git a/core/startos/src/util/logger.rs b/core/startos/src/util/logger.rs index c464b328d..816721d5b 100644 --- a/core/startos/src/util/logger.rs +++ b/core/startos/src/util/logger.rs @@ -1,13 +1,62 @@ -use std::io; +use std::fs::File; +use std::io::{self, Write}; +use std::sync::{Arc, Mutex, MutexGuard}; +use lazy_static::lazy_static; use tracing::Subscriber; +use tracing_subscriber::fmt::MakeWriter; use tracing_subscriber::util::SubscriberInitExt; +lazy_static! { + pub static ref LOGGER: StartOSLogger = StartOSLogger::init(); +} + #[derive(Clone)] -pub struct EmbassyLogger {} +pub struct StartOSLogger { + logfile: LogFile, +} + +#[derive(Clone, Default)] +struct LogFile(Arc>>); +impl<'a> MakeWriter<'a> for LogFile { + type Writer = Box; + fn make_writer(&'a self) -> Self::Writer { + let f = self.0.lock().unwrap(); + if f.is_some() { + struct TeeWriter<'a>(MutexGuard<'a, Option>); + impl<'a> Write for TeeWriter<'a> { + fn write(&mut self, buf: &[u8]) -> io::Result { + let n = if let Some(f) = &mut *self.0 { + f.write(buf)? + } else { + buf.len() + }; + io::stderr().write_all(&buf[..n])?; + Ok(n) + } + fn flush(&mut self) -> io::Result<()> { + if let Some(f) = &mut *self.0 { + f.flush()?; + } + Ok(()) + } + } + Box::new(TeeWriter(f)) + } else { + drop(f); + Box::new(io::stderr()) + } + } +} + +impl StartOSLogger { + pub fn enable(&self) {} + + pub fn set_logfile(&self, logfile: Option) { + *self.logfile.0.lock().unwrap() = logfile; + } -impl EmbassyLogger { - fn base_subscriber() -> impl Subscriber { + fn base_subscriber(logfile: LogFile) -> impl Subscriber { use tracing_error::ErrorLayer; use tracing_subscriber::prelude::*; use tracing_subscriber::{fmt, EnvFilter}; @@ -24,7 +73,7 @@ impl EmbassyLogger { .add_directive("tokio=trace".parse().unwrap()) .add_directive("runtime=trace".parse().unwrap()); let fmt_layer = fmt::layer() - .with_writer(io::stderr) + .with_writer(logfile) .with_line_number(true) .with_file(true) .with_target(true); @@ -39,11 +88,12 @@ impl EmbassyLogger { sub } - pub fn init() -> Self { - Self::base_subscriber().init(); + fn init() -> Self { + let logfile = LogFile::default(); + Self::base_subscriber(logfile.clone()).init(); color_eyre::install().unwrap_or_else(|_| tracing::warn!("tracing too many times")); - EmbassyLogger {} + StartOSLogger { logfile } } } diff --git a/debian/postinst b/debian/postinst index 3714df8d4..176bdb6b2 100755 --- a/debian/postinst +++ b/debian/postinst @@ -86,6 +86,8 @@ sed -i '/^\s*#\?\s*issue_discards\s*=\s*/c\issue_discards = 1' /etc/lvm/lvm.conf sed -i '/\(^\|#\)\s*unqualified-search-registries\s*=\s*/c\unqualified-search-registries = ["docker.io"]' /etc/containers/registries.conf sed -i 's/\(#\|\^\)\s*\([^=]\+\)=\(suspend\|hibernate\)\s*$/\2=ignore/g' /etc/systemd/logind.conf sed -i '/\(^\|#\)MulticastDNS=/c\MulticastDNS=no' /etc/systemd/resolved.conf +sed -i 's/\[Service\]/[Service]\nEnvironment=SYSTEMD_LOG_LEVEL=debug/' /lib/systemd/system/systemd-timesyncd.service +sed -i '/\(^\|#\)RootDistanceMaxSec=/c\RootDistanceMaxSec=10' /etc/systemd/timesyncd.conf mkdir -p /etc/nginx/ssl diff --git a/patch-db b/patch-db index 99076d349..2600a784a 160000 --- a/patch-db +++ b/patch-db @@ -1 +1 @@ -Subproject commit 99076d349c6768000483ea8d47216d273586552e +Subproject commit 2600a784a9899a6f8e0c9abe0bf4c4ce48cb85a9 diff --git a/sdk/base/lib/dependencies/setupDependencies.ts b/sdk/base/lib/dependencies/setupDependencies.ts index 6b15ef0d1..710ec96ed 100644 --- a/sdk/base/lib/dependencies/setupDependencies.ts +++ b/sdk/base/lib/dependencies/setupDependencies.ts @@ -51,7 +51,7 @@ export function setupDependencies( dependencies: Object.entries(dependencyType).map( ([id, { versionRange, ...x }, ,]) => ({ - // id, + id, ...x, versionRange: versionRange.toString(), }) as T.DependencyRequirement,