Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions packages/cli/src/process/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ pub struct Args {

#[arg(long)]
pub size: Option<u64>,

#[arg(long)]
pub stream: Option<tg::process::log::Stream>,
}

impl Cli {
Expand All @@ -38,6 +41,7 @@ impl Cli {
position: args.position.map(std::io::SeekFrom::Start),
remotes: args.remotes.remotes,
size: args.size,
stream: args.stream,
};
let mut log = process.log(&handle, arg).await.map_err(
|source| tg::error!(!source, id = %args.process, "failed to get the process log"),
Expand Down
2 changes: 1 addition & 1 deletion packages/cli/test.nu
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ export def artifact [artifact] {
xattr_write $pair.key $pair.value $path
}
}
'symlink' => {
'symlink' => {
ln -s $artifact.path $path
}
}
Expand Down
15 changes: 14 additions & 1 deletion packages/client/src/process/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,22 @@ use {
pub mod get;
pub mod post;

#[derive(Clone, Copy, Debug, serde_with::DeserializeFromStr, serde_with::SerializeDisplay)]
#[derive(
Clone,
Copy,
Debug,
serde_with::DeserializeFromStr,
serde_with::SerializeDisplay,
Eq,
PartialEq,
tangram_serialize::Serialize,
tangram_serialize::Deserialize,
)]
pub enum Stream {
#[tangram_serialize(id = 2)]
Stderr,

#[tangram_serialize(id = 1)]
Stdout,
}

Expand Down
3 changes: 3 additions & 0 deletions packages/client/src/process/log/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ pub struct Arg {

#[serde(default, skip_serializing_if = "Option::is_none")]
pub size: Option<u64>,

#[serde(default, skip_serializing_if = "Option::is_none")]
pub stream: Option<tg::process::log::Stream>,
}

#[derive(Clone, Debug)]
Expand Down
1 change: 1 addition & 0 deletions packages/server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -93,3 +93,4 @@ tracing = { workspace = true }
unicode-width = { workspace = true }
uuid = { workspace = true }
xattr = { workspace = true }
zerocopy = { workspace = true }
1 change: 1 addition & 0 deletions packages/server/src/process/log.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod get;
pub mod post;
pub mod reader;
pub mod serialized;
4 changes: 1 addition & 3 deletions packages/server/src/process/log/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,7 @@ impl Server {
let mut events = stream::select_all([log, status, interval]).boxed();

// Create the reader.
let mut reader = Reader::new(self, id)
.await
.map_err(|source| tg::error!(!source, "failed to create the log reader"))?;
let mut reader = Reader::new(self, id, arg.stream).await?;

// Seek the reader.
let seek = if let Some(position) = arg.position {
Expand Down
38 changes: 18 additions & 20 deletions packages/server/src/process/log/post.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
use {
crate::{Context, Server},
bytes::Bytes,
tangram_client::prelude::*,
tangram_http::{Body, request::Ext as _, response::builder::Ext as _},
tangram_messenger::prelude::*,
tokio::io::AsyncWriteExt as _,
tangram_store::Store,
};

impl Server {
Expand Down Expand Up @@ -42,13 +41,28 @@ impl Server {
.ok_or_else(|| tg::error!("not found"))?
.data;

// Compute the timestamp.
let timestamp = time::OffsetDateTime::now_utc().unix_timestamp()
- data
.started_at
.ok_or_else(|| tg::error!("expected the process to be started"))?;

// Verify the process is local and started.
if data.status != tg::process::Status::Started {
return Err(tg::error!("failed to find the process"));
}

// Write to the log file.
self.post_process_log_to_file(id, arg.bytes.clone()).await?;
// Write to the store.
let arg = tangram_store::PutLogArg {
bytes: arg.bytes,
process: id.clone(),
stream: arg.stream,
timestamp,
};
self.store
.put_log(arg)
.await
.map_err(|source| tg::error!(!source, "failed to store the log"))?;

// Publish the message.
tokio::spawn({
Expand All @@ -67,22 +81,6 @@ impl Server {
Ok(())
}

async fn post_process_log_to_file(&self, id: &tg::process::Id, bytes: Bytes) -> tg::Result<()> {
let path = self.logs_path().join(format!("{id}"));
let mut file = tokio::fs::File::options()
.create(true)
.append(true)
.open(&path)
.await
.map_err(
|source| tg::error!(!source, path = %path.display(), "failed to open the log file"),
)?;
file.write_all(&bytes).await.map_err(
|source| tg::error!(!source, path = %path.display(), "failed to write to the log file"),
)?;
Ok(())
}

pub(crate) async fn handle_post_process_log_request(
&self,
request: http::Request<Body>,
Expand Down
Loading