Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions crates/fail/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,26 @@ macro_rules! fail_err {
}
};
}
#[macro_export]
macro_rules! fail_ret {
($name:literal) => {
#[cfg(debug_assertions)]
{
const NAME: &'static str = concat!(env!("CARGO_PKG_NAME"), "::", $name);

$crate::private::inventory::submit! {
$crate::Fail { name: NAME }
}

let should_fail = $crate::private::should_fail(NAME);

if should_fail {
eprintln!("Purposely returned at '{NAME}'");
return;
}
}
};
}

#[doc(hidden)]
pub mod private {
Expand Down
122 changes: 85 additions & 37 deletions crates/recording/src/sources/screen_capture/macos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,19 @@ use crate::{
};
use anyhow::{Context, anyhow};
use cidre::*;
use futures::{FutureExt, channel::mpsc, future::BoxFuture};
use std::sync::{
Arc,
atomic::{self, AtomicBool},
use futures::{FutureExt as _, channel::mpsc, future::BoxFuture};
use std::{
sync::{
Arc,
atomic::{self, AtomicBool, AtomicU32},
},
time::Duration,
};
use tokio::sync::broadcast;
use tokio_util::{
future::FutureExt as _,
sync::{CancellationToken, DropGuard},
};
use tracing::debug;

#[derive(Debug)]
Expand Down Expand Up @@ -149,8 +156,11 @@ impl ScreenCaptureConfig<CMSampleBufferCapture> {
ns::Error::with_domain(ns::ErrorDomain::os_status(), 69420, None)
);

let video_frame_counter: Arc<AtomicU32> = Arc::new(AtomicU32::new(0));

let builder = scap_screencapturekit::Capturer::builder(content_filter, settings)
.with_output_sample_buf_cb({
let video_frame_count = video_frame_counter.clone();
move |frame| {
let sample_buffer = frame.sample_buf();

Expand All @@ -166,14 +176,9 @@ impl ScreenCaptureConfig<CMSampleBufferCapture> {
return;
}

let check_skip_send = || {
cap_fail::fail_err!(
"media::sources::screen_capture::skip_send",
()
);
cap_fail::fail_ret!("screen_capture video frame skip");

Ok::<(), ()>(())
};
video_frame_count.fetch_add(1, atomic::Ordering::Relaxed);

let _ = video_tx.try_send(VideoFrame {
sample_buf: sample_buffer.retained(),
Expand All @@ -183,13 +188,7 @@ impl ScreenCaptureConfig<CMSampleBufferCapture> {
scap_screencapturekit::Frame::Audio(_) => {
use ffmpeg::ChannelLayout;

let res = || {
cap_fail::fail_err!("screen_capture audio skip", ());
Ok::<(), ()>(())
};
if res().is_err() {
return;
}
cap_fail::fail_ret!("screen_capture audio frame skip");

let Some(audio_tx) = &mut audio_tx else {
return;
Expand Down Expand Up @@ -218,17 +217,30 @@ impl ScreenCaptureConfig<CMSampleBufferCapture> {
}
}
})
.with_stop_with_err_cb(move |_, err| {
let _ = error_tx.send(err.retained());
.with_stop_with_err_cb({
let video_frame_count = video_frame_counter.clone();
move |_, err| {
debug!(
"Capturer stopping after creating {} video frames",
video_frame_count.load(atomic::Ordering::Relaxed)
);

let _ = error_tx.send(err.retained());
}
});

let cancel_token = CancellationToken::new();
let capturer = Capturer::new(Arc::new(builder.build()?));

Ok((
VideoSourceConfig(
ChannelVideoSourceConfig::new(self.video_info, video_rx),
capturer.clone(),
error_rx.resubscribe(),
),
VideoSourceConfig {
inner: ChannelVideoSourceConfig::new(self.video_info, video_rx),
capturer: capturer.clone(),
error_rx: error_rx.resubscribe(),
video_frame_counter: video_frame_counter.clone(),
cancel_token: cancel_token.clone(),
drop_guard: cancel_token.drop_guard(),
},
audio_rx.map(|rx| {
SystemAudioSourceConfig(
ChannelAudioSourceConfig::new(self.audio_info(), rx),
Expand Down Expand Up @@ -297,12 +309,21 @@ impl Capturer {
}
}

pub struct VideoSourceConfig(
ChannelVideoSourceConfig<VideoFrame>,
Capturer,
broadcast::Receiver<arc::R<ns::Error>>,
);
pub struct VideoSource(ChannelVideoSource<VideoFrame>, Capturer);
pub struct VideoSourceConfig {
inner: ChannelVideoSourceConfig<VideoFrame>,
capturer: Capturer,
error_rx: broadcast::Receiver<arc::R<ns::Error>>,
cancel_token: CancellationToken,
drop_guard: DropGuard,
video_frame_counter: Arc<AtomicU32>,
}
pub struct VideoSource {
inner: ChannelVideoSource<VideoFrame>,
capturer: Capturer,
cancel_token: CancellationToken,
video_frame_counter: Arc<AtomicU32>,
_drop_guard: DropGuard,
}

impl output_pipeline::VideoSource for VideoSource {
type Config = VideoSourceConfig;
Expand All @@ -317,21 +338,42 @@ impl output_pipeline::VideoSource for VideoSource {
Self: Sized,
{
ctx.tasks().spawn("screen-capture", async move {
if let Ok(err) = config.2.recv().await {
if let Ok(err) = config.error_rx.recv().await {
return Err(anyhow!("{err}"));
}

Ok(())
});

ChannelVideoSource::setup(config.0, video_tx, ctx)
ChannelVideoSource::setup(config.inner, video_tx, ctx)
.await
.map(|source| Self(source, config.1))
.map(|source| Self {
inner: source,
capturer: config.capturer,
cancel_token: config.cancel_token,
_drop_guard: config.drop_guard,
video_frame_counter: config.video_frame_counter,
})
}

fn start(&mut self) -> BoxFuture<'_, anyhow::Result<()>> {
async move {
self.1.start().await?;
self.capturer.start().await?;

tokio::spawn({
let video_frame_count = self.video_frame_counter.clone();
async move {
loop {
tokio::time::sleep(Duration::from_secs(5)).await;
debug!(
"Captured {} frames",
video_frame_count.load(atomic::Ordering::Relaxed)
);
}
}
.with_cancellation_token_owned(self.cancel_token.clone())
.in_current_span()
});

Ok(())
}
Expand All @@ -340,15 +382,21 @@ impl output_pipeline::VideoSource for VideoSource {

fn stop(&mut self) -> BoxFuture<'_, anyhow::Result<()>> {
async move {
self.1.stop().await?;
debug!(
"Capturer stopping after creating {} video frames",
self.video_frame_counter.load(atomic::Ordering::Relaxed)
);
self.capturer.stop().await?;

self.cancel_token.cancel();

Ok(())
}
.boxed()
}

fn video_info(&self) -> VideoInfo {
self.0.video_info()
self.inner.video_info()
}
}

Expand Down
52 changes: 43 additions & 9 deletions crates/recording/src/sources/screen_capture/windows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,17 @@ use scap_ffmpeg::*;
use scap_targets::{Display, DisplayId};
use std::{
collections::VecDeque,
sync::{
Arc,
atomic::{self, AtomicU32},
},
time::{Duration, Instant},
};
use tracing::{error, info, trace};
use tokio_util::{
future::FutureExt as _,
sync::{CancellationToken, DropGuard},
};
use tracing::*;

const WINDOW_DURATION: Duration = Duration::from_secs(3);
const LOG_INTERVAL: Duration = Duration::from_secs(5);
Expand Down Expand Up @@ -174,6 +182,8 @@ impl output_pipeline::VideoSource for VideoSource {
let (mut error_tx, mut error_rx) = mpsc::channel(1);
let (ctrl_tx, ctrl_rx) = std::sync::mpsc::sync_channel::<VideoControl>(1);

let tokio_rt = tokio::runtime::Handle::current();

ctx.tasks().spawn_thread("d3d-capture-thread", move || {
cap_mediafoundation_utils::thread_init();

Expand All @@ -199,17 +209,24 @@ impl output_pipeline::VideoSource for VideoSource {
}
};

let video_frame_counter: Arc<AtomicU32> = Arc::new(AtomicU32::new(0));
let cancel_token = CancellationToken::new();

let res = scap_direct3d::Capturer::new(
capture_item,
settings,
move |frame| {
let timestamp = frame.inner().SystemRelativeTime()?;
let timestamp = Timestamp::PerformanceCounter(
PerformanceCounterTimestamp::new(timestamp.Duration),
);
let _ = video_tx.try_send(VideoFrame { frame, timestamp });

Ok(())
{
let video_frame_counter = video_frame_counter.clone();
move |frame| {
video_frame_counter.fetch_add(1, atomic::Ordering::Relaxed);
let timestamp = frame.inner().SystemRelativeTime()?;
let timestamp = Timestamp::PerformanceCounter(
PerformanceCounterTimestamp::new(timestamp.Duration),
);
let _ = video_tx.try_send(VideoFrame { frame, timestamp });

Ok(())
}
},
{
let mut error_tx = error_tx.clone();
Expand Down Expand Up @@ -240,6 +257,21 @@ impl output_pipeline::VideoSource for VideoSource {
return;
};

tokio_rt.spawn(
async move {
loop {
tokio::time::sleep(Duration::from_secs(5)).await;
debug!(
"Captured {} frames",
video_frame_counter.load(atomic::Ordering::Relaxed)
);
}
}
.with_cancellation_token_owned(cancel_token.clone())
.in_current_span()
);
let drop_guard = cancel_token.drop_guard();

trace!("Starting D3D capturer");
let start_result = capturer.start().map_err(Into::into);
if let Err(ref e) = start_result {
Expand All @@ -258,6 +290,8 @@ impl output_pipeline::VideoSource for VideoSource {
if reply.send(capturer.stop().map_err(Into::into)).is_err() {
return;
}

drop(drop_guard)
});

ctx.tasks().spawn("d3d-capture", async move {
Expand Down
Loading