Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions apps/desktop/src-tauri/src/camera_legacy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ pub async fn create_camera_preview_ws() -> (Sender<FFmpegVideoFrame>, u16, Cance
width: frame.width(),
height: frame.height(),
stride: frame.stride(0) as u32,
frame_number: 0,
target_time_ns: 0,
created_at: Instant::now(),
})
.ok();
Expand Down
89 changes: 37 additions & 52 deletions apps/desktop/src-tauri/src/editor_window.rs
Original file line number Diff line number Diff line change
@@ -1,40 +1,15 @@
use std::{collections::HashMap, ops::Deref, path::PathBuf, sync::Arc, time::Instant};
use tauri::{AppHandle, Manager, Runtime, Window, ipc::CommandArg};
use tokio::sync::{RwLock, watch};
use tokio::sync::{Mutex, RwLock, mpsc};
use tokio_util::sync::CancellationToken;
use tracing::debug;

use cap_rendering::RenderedFrame;

use crate::{
create_editor_instance_impl,
frame_ws::{WSFrame, create_watch_frame_ws},
frame_ws::{WSFrame, create_mpsc_frame_ws},
};

fn strip_frame_padding(frame: RenderedFrame) -> Result<(Vec<u8>, u32), &'static str> {
let expected_stride = frame
.width
.checked_mul(4)
.ok_or("overflow computing expected_stride")?;

if frame.padded_bytes_per_row == expected_stride {
Ok((frame.data, expected_stride))
} else {
let capacity = expected_stride
.checked_mul(frame.height)
.ok_or("overflow computing buffer capacity")?;
let mut stripped = Vec::with_capacity(capacity as usize);
for row in 0..frame.height {
let start = row
.checked_mul(frame.padded_bytes_per_row)
.ok_or("overflow computing row start")? as usize;
let end = start + expected_stride as usize;
stripped.extend_from_slice(&frame.data[start..end]);
}

Ok((stripped, expected_stride))
}
}
const FRAME_CHANNEL_BUFFER: usize = 8;

pub struct EditorInstance {
inner: Arc<cap_editor::EditorInstance>,
Expand All @@ -49,25 +24,30 @@ type PendingReceiver = tokio::sync::watch::Receiver<Option<PendingResult>>;
pub struct PendingEditorInstances(Arc<RwLock<HashMap<String, PendingReceiver>>>);

async fn do_prewarm(app: AppHandle, path: PathBuf) -> PendingResult {
let (frame_tx, frame_rx) = watch::channel(None);
let (frame_tx, frame_rx) = mpsc::channel(FRAME_CHANNEL_BUFFER);
let frame_rx = Arc::new(Mutex::new(frame_rx));

let (ws_port, ws_shutdown_token) = create_watch_frame_ws(frame_rx).await;
let (ws_port, ws_shutdown_token) = create_mpsc_frame_ws(frame_rx).await;
let inner = create_editor_instance_impl(
&app,
path,
Box::new(move |frame| {
let width = frame.width;
let height = frame.height;
if let Ok((data, stride)) = strip_frame_padding(frame)
&& let Err(e) = frame_tx.send(Some(WSFrame {
data,
width,
height,
stride,
created_at: Instant::now(),
}))
{
debug!("Frame receiver dropped during prewarm: {e}");
let stride = frame.padded_bytes_per_row;
let frame_number = frame.frame_number;
let target_time_ns = frame.target_time_ns;
let data = frame.data;
if let Err(e) = frame_tx.try_send(WSFrame {
data,
width,
height,
stride,
frame_number,
target_time_ns,
created_at: Instant::now(),
}) {
debug!("Frame channel full or closed during prewarm: {e}");
}
}),
)
Expand Down Expand Up @@ -211,25 +191,30 @@ impl EditorInstances {
}
}

let (frame_tx, frame_rx) = watch::channel(None);
let (frame_tx, frame_rx) = mpsc::channel(FRAME_CHANNEL_BUFFER);
let frame_rx = Arc::new(Mutex::new(frame_rx));

let (ws_port, ws_shutdown_token) = create_watch_frame_ws(frame_rx).await;
let (ws_port, ws_shutdown_token) = create_mpsc_frame_ws(frame_rx).await;
let instance = create_editor_instance_impl(
window.app_handle(),
path,
Box::new(move |frame| {
let width = frame.width;
let height = frame.height;
if let Ok((data, stride)) = strip_frame_padding(frame)
&& let Err(e) = frame_tx.send(Some(WSFrame {
data,
width,
height,
stride,
created_at: Instant::now(),
}))
{
debug!("Frame receiver dropped in get_or_create: {e}");
let stride = frame.padded_bytes_per_row;
let frame_number = frame.frame_number;
let target_time_ns = frame.target_time_ns;
let data = frame.data;
if let Err(e) = frame_tx.try_send(WSFrame {
data,
width,
height,
stride,
frame_number,
target_time_ns,
created_at: Instant::now(),
}) {
debug!("Frame channel full or closed in get_or_create: {e}");
}
}),
)
Expand Down
141 changes: 136 additions & 5 deletions apps/desktop/src-tauri/src/frame_ws.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,22 @@
use std::sync::Arc;
use std::time::Instant;
use tokio::sync::{broadcast, watch};
use tokio::sync::{Mutex, broadcast, mpsc, watch};
use tokio_util::sync::CancellationToken;

fn pack_frame_data(mut data: Vec<u8>, stride: u32, height: u32, width: u32) -> Vec<u8> {
fn pack_frame_data(
mut data: Vec<u8>,
stride: u32,
height: u32,
width: u32,
frame_number: u32,
target_time_ns: u64,
) -> Vec<u8> {
data.reserve_exact(24);
data.extend_from_slice(&stride.to_le_bytes());
data.extend_from_slice(&height.to_le_bytes());
data.extend_from_slice(&width.to_le_bytes());
data.extend_from_slice(&frame_number.to_le_bytes());
data.extend_from_slice(&target_time_ns.to_le_bytes());
data
}

Expand All @@ -15,6 +26,8 @@ pub struct WSFrame {
pub width: u32,
pub height: u32,
pub stride: u32,
pub frame_number: u32,
pub target_time_ns: u64,
#[allow(dead_code)]
pub created_at: Instant,
}
Expand Down Expand Up @@ -49,7 +62,14 @@ pub async fn create_watch_frame_ws(
{
let frame_opt = camera_rx.borrow().clone();
if let Some(frame) = frame_opt {
let packed = pack_frame_data(frame.data, frame.stride, frame.height, frame.width);
let packed = pack_frame_data(
frame.data,
frame.stride,
frame.height,
frame.width,
frame.frame_number,
frame.target_time_ns,
);

if let Err(e) = socket.send(Message::Binary(packed)).await {
tracing::error!("Failed to send initial frame to socket: {:?}", e);
Expand Down Expand Up @@ -82,7 +102,14 @@ pub async fn create_watch_frame_ws(
}
let frame_opt = camera_rx.borrow().clone();
if let Some(frame) = frame_opt {
let packed = pack_frame_data(frame.data, frame.stride, frame.height, frame.width);
let packed = pack_frame_data(
frame.data,
frame.stride,
frame.height,
frame.width,
frame.frame_number,
frame.target_time_ns,
);

if let Err(e) = socket.send(Message::Binary(packed)).await {
tracing::error!("Failed to send frame to socket: {:?}", e);
Expand Down Expand Up @@ -121,6 +148,103 @@ pub async fn create_watch_frame_ws(
(port, cancel_token_child)
}

pub async fn create_mpsc_frame_ws(
frame_rx: Arc<Mutex<mpsc::Receiver<WSFrame>>>,
) -> (u16, CancellationToken) {
use axum::{
extract::{
State,
ws::{Message, WebSocket, WebSocketUpgrade},
},
response::IntoResponse,
routing::get,
};

type RouterState = Arc<Mutex<mpsc::Receiver<WSFrame>>>;

#[axum::debug_handler]
async fn ws_handler(
ws: WebSocketUpgrade,
State(state): State<RouterState>,
) -> impl IntoResponse {
ws.on_upgrade(move |socket| handle_socket(socket, state))
}

async fn handle_socket(mut socket: WebSocket, frame_rx: RouterState) {
tracing::info!("Socket connection established (mpsc)");
let now = std::time::Instant::now();

loop {
let frame_opt = {
let mut rx = frame_rx.lock().await;
tokio::select! {
biased;
msg = socket.recv() => {
match msg {
Some(Ok(Message::Close(_))) | None => {
tracing::info!("WebSocket closed");
break;
}
Some(Ok(_)) => {
continue;
}
Some(Err(e)) => {
tracing::error!("WebSocket error: {:?}", e);
break;
}
}
},
frame = rx.recv() => frame,
}
};
Comment on lines +178 to +199
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: Potential deadlock: holding the mutex lock across the entire tokio::select! block could block other tasks trying to access the frame receiver. Is this intentional to ensure atomic processing of each frame, or could the lock scope be reduced?

Prompt To Fix With AI
This is a comment left during a code review.
Path: apps/desktop/src-tauri/src/frame_ws.rs
Line: 178:199

Comment:
**logic:** Potential deadlock: holding the mutex lock across the entire `tokio::select!` block could block other tasks trying to access the frame receiver. Is this intentional to ensure atomic processing of each frame, or could the lock scope be reduced?

How can I resolve this? If you propose a fix, please make it concise.


let Some(frame) = frame_opt else {
tracing::info!("Frame channel closed");
break;
};

let packed = pack_frame_data(
frame.data,
frame.stride,
frame.height,
frame.width,
frame.frame_number,
frame.target_time_ns,
);

if let Err(e) = socket.send(Message::Binary(packed)).await {
tracing::error!("Failed to send frame to socket: {:?}", e);
break;
}
}

let elapsed = now.elapsed();
tracing::info!("Websocket closing after {elapsed:.2?}");
}

let router = axum::Router::new()
.route("/", get(ws_handler))
.with_state(frame_rx);

let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let port = listener.local_addr().unwrap().port();
tracing::info!("WebSocket server (mpsc) listening on port {}", port);

let cancel_token = CancellationToken::new();
let cancel_token_child = cancel_token.child_token();
tokio::spawn(async move {
let server = axum::serve(listener, router.into_make_service());
tokio::select! {
_ = server => {},
_ = cancel_token.cancelled() => {
tracing::info!("WebSocket server shutting down");
}
}
});

(port, cancel_token_child)
}

pub async fn create_frame_ws(frame_tx: broadcast::Sender<WSFrame>) -> (u16, CancellationToken) {
use axum::{
extract::{
Expand Down Expand Up @@ -167,7 +291,14 @@ pub async fn create_frame_ws(frame_tx: broadcast::Sender<WSFrame>) -> (u16, Canc
incoming_frame = camera_rx.recv() => {
match incoming_frame {
Ok(frame) => {
let packed = pack_frame_data(frame.data, frame.stride, frame.height, frame.width);
let packed = pack_frame_data(
frame.data,
frame.stride,
frame.height,
frame.width,
frame.frame_number,
frame.target_time_ns,
);

if let Err(e) = socket.send(Message::Binary(packed)).await {
tracing::error!("Failed to send frame to socket: {:?}", e);
Expand Down
2 changes: 2 additions & 0 deletions apps/desktop/src-tauri/src/screenshot_editor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,8 @@ impl ScreenshotEditorInstances {
width: frame.width,
height: frame.height,
stride: frame.padded_bytes_per_row,
frame_number: frame.frame_number,
target_time_ns: frame.target_time_ns,
created_at: Instant::now(),
}));
}
Expand Down
2 changes: 2 additions & 0 deletions apps/desktop/src/routes/editor/Editor.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ function Inner() {
setEditorState,
previewResolutionBase,
dialog,
canvasControls,
} = useEditorContext();

const isExportMode = () => {
Expand Down Expand Up @@ -208,6 +209,7 @@ function Inner() {
const updateConfigAndRender = throttle(async (time: number) => {
const config = serializeProjectConfiguration(project);
await commands.updateProjectConfigInMemory(config);
canvasControls()?.resetFrameState();
renderFrame(time);
}, 1000 / FPS);
createEffect(
Expand Down
Loading