Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions src/html/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,7 @@ <h4>Pipeline Visualization:</h4>
response += ', Disable Mavlink: ' + video_and_stream.stream_information.extended_configuration?.disable_mavlink ?? false
response += ', Disable Zenoh: ' + video_and_stream.stream_information.extended_configuration?.disable_zenoh ?? false
response += ', Disable Thumbnails: ' + video_and_stream.stream_information.extended_configuration?.disable_thumbnails ?? false
response += ', Disable Recording: ' + video_and_stream.stream_information.extended_configuration?.disable_recording ?? false
return response
},
requestData: async function() {
Expand Down Expand Up @@ -399,6 +400,7 @@ <h4>Pipeline Visualization:</h4>
"disable_mavlink": Boolean(stream.extended_configuration.disable_mavlink),
"disable_zenoh": Boolean(stream.extended_configuration.disable_zenoh),
"disable_thumbnails": Boolean(stream.extended_configuration.disable_thumbnails),
"disable_recording": Boolean(stream.extended_configuration.disable_recording),
},
}
}
Expand Down Expand Up @@ -733,6 +735,7 @@ <h4>Pipeline Visualization:</h4>
this.stream_setting.extended_configuration.disable_mavlink = Boolean(this.stream.video_and_stream.stream_information.extended_configuration?.disable_mavlink)
this.stream_setting.extended_configuration.disable_zenoh = Boolean(this.stream.video_and_stream.stream_information.extended_configuration?.disable_zenoh)
this.stream_setting.extended_configuration.disable_thumbnails = Boolean(this.stream.video_and_stream.stream_information.extended_configuration?.disable_thumbnails)
this.stream_setting.extended_configuration.disable_recording = Boolean(this.stream.video_and_stream.stream_information.extended_configuration?.disable_recording)
},
deep: true
},
Expand Down Expand Up @@ -793,6 +796,7 @@ <h4>Pipeline Visualization:</h4>
disable_mavlink: undefined,
disable_zenoh: undefined,
disable_thumbnails: undefined,
disable_recording: undefined,
},
},
stream_options: {
Expand Down Expand Up @@ -886,6 +890,13 @@ <h4>Pipeline Visualization:</h4>
v-model="stream_setting.extended_configuration.disable_thumbnails"
>
</div>
<div>
<label>Disable Recording: </label>
<input
type="checkbox"
v-model="stream_setting.extended_configuration.disable_recording"
>
</div>

<p>
<label>Endpoints: </label>
Expand Down
30 changes: 30 additions & 0 deletions src/lib/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use webrtc::signalling_protocol::PeerId;

use crate::{
mavlink::mavlink_camera::MavlinkCamera,
stream::sink::create_file_sink,
video::{
types::{FrameInterval, VideoEncodeType, VideoSourceType},
video_source::cameras_available,
Expand Down Expand Up @@ -499,6 +500,35 @@ impl StreamState {
}
}

if !video_and_stream_information
.stream_information
.extended_configuration
.as_ref()
.map(|e| e.disable_recording)
.unwrap_or_default()
&& crate::cli::manager::enable_zenoh()
{
let sink_id = Arc::new(Manager::generate_uuid(None));
match create_file_sink(sink_id.clone(), &video_and_stream_information) {
Ok(sink) => {
if let Some(pipeline) = stream.pipeline.as_mut() {
if let Err(reason) = pipeline.add_sink(sink).await {
return Err(anyhow!(
"Failed to add Sink of type File to the Pipeline. Reason: {reason}"
));
}
} else {
return Err(anyhow!("No Pipeline available to add File sink"));
}
}
Err(reason) => {
return Err(anyhow!(
"Failed to create Sink of type File. Reason: {reason}"
));
}
}
}

// Start the pipeline. This will automatically start sinks with linked proxy-isolated pipelines
stream
.pipeline
Expand Down
2 changes: 1 addition & 1 deletion src/lib/stream/pipeline/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ impl PipelineState {
// Request a new src pad for the used Tee
// Note: Here we choose if the sink will receive a Video or RTP packages
let tee = match sink {
Sink::Image(_) | Sink::Zenoh(_) => &self.video_tee,
Sink::Image(_) | Sink::Zenoh(_) | Sink::File(_) => &self.video_tee,
Sink::Udp(_) | Sink::Rtsp(_) | Sink::WebRTC(_) => &self.rtp_tee,
};

Expand Down
212 changes: 212 additions & 0 deletions src/lib/stream/sink/file_sink.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
use std::sync::Arc;

use anyhow::{anyhow, Result};
use gst::prelude::*;
use tracing::*;

use crate::{
stream::{pipeline::runner::PipelineRunner, types::CaptureConfiguration},
video_stream::types::VideoAndStreamInformation,
};

use super::{link_sink_to_tee, unlink_sink_from_tee, SinkInterface};

#[derive(Debug)]
pub struct FileSink {
sink_id: Arc<uuid::Uuid>,
pipeline: gst::Pipeline,
queue: gst::Element,
proxysink: gst::Element,
_proxysrc: gst::Element,
_matroskamux: gst::Element,
_filesink: gst::Element,
tee_src_pad: Option<gst::Pad>,
pipeline_runner: PipelineRunner,
}

impl SinkInterface for FileSink {
#[instrument(level = "debug", skip(self, pipeline))]
fn link(
&mut self,
pipeline: &gst::Pipeline,
pipeline_id: &Arc<uuid::Uuid>,
tee_src_pad: gst::Pad,
) -> Result<()> {
if self.tee_src_pad.is_some() {
return Err(anyhow!(
"Tee's src pad from Sink {:?} has already been configured",
self.get_id()
));
}
self.tee_src_pad.replace(tee_src_pad);
let Some(tee_src_pad) = &self.tee_src_pad else {
unreachable!()
};

let elements = &[&self.queue, &self.proxysink];
link_sink_to_tee(tee_src_pad, pipeline, elements)?;

Ok(())
}

#[instrument(level = "debug", skip(self, pipeline))]
fn unlink(&self, pipeline: &gst::Pipeline, pipeline_id: &Arc<uuid::Uuid>) -> Result<()> {
let Some(tee_src_pad) = &self.tee_src_pad else {
warn!("Tried to unlink Sink from a pipeline without a Tee src pad.");
return Ok(());
};

let elements = &[&self.queue, &self.proxysink];
unlink_sink_from_tee(tee_src_pad, pipeline, elements)?;

if let Err(error) = self.pipeline.set_state(::gst::State::Null) {
warn!("Failed setting sink Pipeline state to Null: {error:?}");
}

Ok(())
}

#[instrument(level = "debug", skip(self))]
fn get_id(&self) -> Arc<uuid::Uuid> {
self.sink_id.clone()
}

#[instrument(level = "trace", skip(self))]
fn get_sdp(&self) -> Result<gst_sdp::SDPMessage> {
Err(anyhow!(
"Not available. Reason: Image Sink doesn't provide endpoints"
))
}

#[instrument(level = "debug", skip(self))]
fn start(&self) -> Result<()> {
self.pipeline_runner.start()
}

#[instrument(level = "debug", skip(self))]
fn eos(&self) {
let pipeline_weak = self.pipeline.downgrade();
if let Err(error) = std::thread::Builder::new()
.name("EOS".to_string())
.spawn(move || {
let pipeline = pipeline_weak.upgrade().unwrap();
if let Err(error) = pipeline.post_message(gst::message::Eos::new()) {
error!("Failed posting Eos message into Sink bus. Reason: {error:?}");
}
})
.expect("Failed spawning EOS thread")
.join()
{
error!(
"EOS Thread Panicked with: {:?}",
error.downcast_ref::<String>()
);
}
}

fn pipeline(&self) -> Option<&gst::Pipeline> {
Some(&self.pipeline)
}
}

impl FileSink {
#[instrument(level = "debug", skip_all)]
pub fn try_new(
sink_id: Arc<uuid::Uuid>,
video_and_stream_information: &VideoAndStreamInformation,
) -> Result<Self> {
let queue = gst::ElementFactory::make("queue")
.property_from_str("leaky", "downstream") // Throw away any data
.property("silent", true)
.property("flush-on-eos", true)
.property("max-size-buffers", 0u32) // Disable buffers
.build()?;

// Create a pair of proxies. The proxysink will be used in the source's pipeline,
// while the proxysrc will be used in this sink's pipeline
let proxysink = gst::ElementFactory::make("proxysink").build()?;
let _proxysrc = gst::ElementFactory::make("proxysrc")
.property("proxysink", &proxysink)
.build()?;

// Configure proxysrc's queue, skips if fails
match _proxysrc.downcast_ref::<gst::Bin>() {
Some(bin) => {
let elements = bin.children();
match elements
.iter()
.find(|element| element.name().starts_with("queue"))
{
Some(element) => {
element.set_property_from_str("leaky", "downstream"); // Throw away any data
element.set_property("silent", true);
element.set_property("flush-on-eos", true);
element.set_property("max-size-buffers", 0u32); // Disable buffers
}
None => {
warn!("Failed to customize proxysrc's queue: Failed to find queue in proxysrc");
}
}
}
None => {
warn!("Failed to customize proxysrc's queue: Failed to downcast element to bin")
}
}

let encoding = match &video_and_stream_information
.stream_information
.configuration
{
CaptureConfiguration::Video(video_configuraiton) => video_configuraiton.encode.clone(),
CaptureConfiguration::Redirect(_) => {
return Err(anyhow!(
"Redirect CaptureConfiguration means the stream was not initialized yet"
));
}
};

let _matroskamux = gst::ElementFactory::make("matroskamux").build()?;

let location = format!("/test_record.mkv"); // TODO: DEFINE THE PATH AND NAME SOMEHOW
let _filesink = gst::ElementFactory::make("filesink")
.property("location", location)
.property("sync", false)
.build()?;

// Create the pipeline
let pipeline = gst::Pipeline::builder()
.name(format!("pipeline-sink-{sink_id}"))
.build();

// Add Sink elements to the Sink's Pipeline
let elements = &vec![&_proxysrc, &_matroskamux, &_filesink];
if let Err(add_err) = pipeline.add_many(elements) {
return Err(anyhow!(
"Failed adding FileSink's elements to Sink Pipeline: {add_err:?}"
));
}

// Link Sink's elements
if let Err(link_err) = gst::Element::link_many(elements) {
if let Err(remove_err) = pipeline.remove_many(elements) {
warn!("Failed removing elements from FileSink Pipeline: {remove_err:?}")
};
return Err(anyhow!("Failed linking FileSink's elements: {link_err:?}"));
}

let pipeline_runner =
PipelineRunner::try_new(&pipeline, &sink_id, true, video_and_stream_information)?;

Ok(Self {
sink_id: sink_id.clone(),
pipeline,
queue,
proxysink,
_proxysrc,
_matroskamux,
_filesink,
tee_src_pad: Default::default(),
pipeline_runner,
})
}
}
16 changes: 15 additions & 1 deletion src/lib/stream/sink/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod file_sink;
pub mod image_sink;
pub mod rtsp_sink;
pub mod types;
Expand All @@ -11,7 +12,7 @@ use gst::prelude::*;
use std::{ops::Deref, sync::Arc};
use tracing::*;

use crate::video_stream::types::VideoAndStreamInformation;
use crate::{stream::sink::file_sink::FileSink, video_stream::types::VideoAndStreamInformation};

use image_sink::ImageSink;
use rtsp_sink::RtspSink;
Expand Down Expand Up @@ -60,6 +61,7 @@ pub enum Sink {
WebRTC(WebRTCSink),
Image(ImageSink),
Zenoh(ZenohSink),
File(FileSink),
}

impl std::fmt::Display for Sink {
Expand All @@ -72,6 +74,7 @@ impl std::fmt::Display for Sink {
Sink::WebRTC(_) => write!(f, "WebRTCSink sink_id={sink_id}"),
Sink::Image(_) => write!(f, "ImageSink sink_id={sink_id}"),
Sink::Zenoh(_) => write!(f, "ZenohSink sink_id={sink_id}"),
Sink::File(_) => write!(f, "FileSink sink_id={sink_id}"),
}
}
}
Expand Down Expand Up @@ -121,6 +124,17 @@ pub async fn create_zenoh_sink(
))
}

#[instrument(level = "debug")]
pub fn create_file_sink(
id: Arc<uuid::Uuid>,
video_and_stream_information: &VideoAndStreamInformation,
) -> Result<Sink> {
Ok(Sink::File(FileSink::try_new(
id,
video_and_stream_information,
)?))
}

#[instrument(level = "debug", skip_all)]
pub fn link_sink_to_tee(
tee_src_pad: &gst::Pad,
Expand Down
7 changes: 7 additions & 0 deletions src/lib/stream/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,20 @@ pub enum CaptureConfiguration {
Redirect(RedirectCaptureConfiguration),
}

pub enum TriggerKind {
Manual,
Arm,
Always,
}

#[derive(Apiv2Schema, Clone, Debug, PartialEq, Deserialize, Serialize, Default)]
#[serde(default)]
pub struct ExtendedConfiguration {
pub thermal: bool,
pub disable_mavlink: bool,
pub disable_zenoh: bool,
pub disable_thumbnails: bool,
pub disable_recording: bool,
}

#[derive(Clone, Debug, PartialEq, Deserialize, Serialize, Apiv2Schema)]
Expand Down
Loading