Skip to content
This repository has been archived by the owner on Aug 4, 2022. It is now read-only.

Commit

Permalink
Bug 1429847 - Allow promoting CpuPool threads for audio remoting sepa…
Browse files Browse the repository at this point in the history
…rately from the client creation. r=kinetik

Differential Revision: https://phabricator.services.mozilla.com/D34887
  • Loading branch information
padenot committed Jun 21, 2019
1 parent fb67307 commit 50faef2
Show file tree
Hide file tree
Showing 6 changed files with 143 additions and 21 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions media/audioipc/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,6 @@ libc = "0.2"
log = "0.4"
tokio-core = "0.1"
tokio-uds = "0.1.7"
audio_thread_priority = "0.13.0"
lazy_static = "1.2.0"
cfg-if = "0.1.0"
77 changes: 56 additions & 21 deletions media/audioipc/client/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
// accompanying file LICENSE for details

use assert_not_in_callback;
use audio_thread_priority::promote_current_thread_to_real_time;
use audioipc::codec::LengthDelimitedCodec;
use audioipc::platformhandle_passing::{framed_with_platformhandles, FramedWithPlatformHandles};
use audioipc::{core, rpc};
Expand All @@ -13,15 +14,20 @@ use cubeb_backend::{
Stream, StreamParams, StreamParamsRef,
};
use futures::Future;
use futures_cpupool::{self, CpuPool};
use futures_cpupool::CpuPool;
use std::ffi::{CStr, CString};
use std::os::raw::c_void;
use std::sync::mpsc;
use std::thread;
use std::{fmt, io, mem, ptr};
use stream;
use tokio_core::reactor::{Handle, Remote};
use {ClientStream, CPUPOOL_INIT_PARAMS, G_SERVER_FD};
use {ClientStream, CpuPoolInitParams, CPUPOOL_INIT_PARAMS, G_SERVER_FD};
cfg_if! {
if #[cfg(target_os = "linux")] {
use {G_THREAD_POOL};
}
}

struct CubebClient;

Expand Down Expand Up @@ -79,6 +85,50 @@ fn open_server_stream() -> Result<audioipc::MessageStream> {
}
}

fn register_thread(callback: Option<extern "C" fn(*const ::std::os::raw::c_char)>) {
match promote_current_thread_to_real_time(0, 48000) {
Ok(_) => {
debug!("Audio thread promoted to real-time.");
}
Err(_) => {
error!("Could not promote thread to real-time.");
}
}
if let Some(func) = callback {
let thr = thread::current();
let name = CString::new(thr.name().unwrap()).unwrap();
func(name.as_ptr());
}
}

fn create_thread_pool(init_params: CpuPoolInitParams) -> CpuPool {
futures_cpupool::Builder::new()
.name_prefix("AudioIPC")
.after_start(move || register_thread(init_params.thread_create_callback))
.pool_size(init_params.pool_size)
.stack_size(init_params.stack_size)
.create()
}

cfg_if! {
if #[cfg(target_os = "linux")] {
fn get_thread_pool(init_params: CpuPoolInitParams) -> CpuPool {
let mut guard = G_THREAD_POOL.lock().unwrap();
if guard.is_some() {
// Sandbox is on, and the thread pool was created earlier, before the lockdown.
guard.take().unwrap()
} else {
// Sandbox is off, let's create the pool now, promoting the threads will work.
create_thread_pool(init_params)
}
}
} else {
fn get_thread_pool(init_params: CpuPoolInitParams) -> CpuPool {
create_thread_pool(init_params)
}
}
}

impl ContextOps for ClientContext {
fn init(_context_name: Option<&CStr>) -> Result<Context> {
fn bind_and_send_client(
Expand All @@ -100,20 +150,10 @@ impl ContextOps for ClientContext {

let params = CPUPOOL_INIT_PARAMS.with(|p| p.replace(None).unwrap());

let thread_create_callback = params.thread_create_callback;

let register_thread = move || {
if let Some(func) = thread_create_callback {
let thr = thread::current();
let name = CString::new(thr.name().unwrap()).unwrap();
func(name.as_ptr());
}
};

let core = t!(core::spawn_thread("AudioIPC Client RPC", move || {
let handle = core::handle();

register_thread();
register_thread(params.thread_create_callback);

open_server_stream()
.ok()
Expand All @@ -129,22 +169,17 @@ impl ContextOps for ClientContext {

let rpc = t!(rx_rpc.recv());

let cpupool = futures_cpupool::Builder::new()
.name_prefix("AudioIPC")
.after_start(register_thread)
.pool_size(params.pool_size)
.stack_size(params.stack_size)
.create();

// Don't let errors bubble from here. Later calls against this context
// will return errors the caller expects to handle.
let _ = send_recv!(rpc, ClientConnect(std::process::id()) => ClientConnected);

let pool = get_thread_pool(params);

let ctx = Box::new(ClientContext {
_ops: &CLIENT_OPS as *const _,
rpc: rpc,
core: core,
cpu_pool: cpupool,
cpu_pool: pool,
});
Ok(unsafe { Context::from_ptr(Box::into_raw(ctx) as *mut _) })
}
Expand Down
73 changes: 73 additions & 0 deletions media/audioipc/client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ extern crate libc;
extern crate log;
extern crate tokio_core;
extern crate tokio_uds;
extern crate audio_thread_priority;
#[macro_use]
extern crate lazy_static;
#[macro_use]
extern crate cfg_if;

#[macro_use]
mod send_recv;
Expand All @@ -25,11 +30,27 @@ use context::ClientContext;
use cubeb_backend::{capi, ffi};
use std::os::raw::{c_char, c_int};
use stream::ClientStream;
use std::sync::{Mutex};
use futures_cpupool::CpuPool;
use audio_thread_priority::RtPriorityHandle;
cfg_if! {
if #[cfg(target_os = "linux")] {
use std::sync::{Arc, Condvar};
use std::ffi::CString;
use std::thread;
use audio_thread_priority::promote_current_thread_to_real_time;
}
}

type InitParamsTls = std::cell::RefCell<Option<CpuPoolInitParams>>;

thread_local!(static IN_CALLBACK: std::cell::RefCell<bool> = std::cell::RefCell::new(false));
thread_local!(static CPUPOOL_INIT_PARAMS: InitParamsTls = std::cell::RefCell::new(None));
thread_local!(static G_PRIORITY_HANDLES: std::cell::RefCell<Vec<RtPriorityHandle>> = std::cell::RefCell::new(vec![]));

lazy_static! {
static ref G_THREAD_POOL: Mutex<Option<CpuPool>> = Mutex::new(None);
}

// This must match the definition of AudioIpcInitParams in
// dom/media/CubebUtils.cpp in Gecko.
Expand Down Expand Up @@ -84,6 +105,58 @@ where

static mut G_SERVER_FD: Option<PlatformHandle> = None;

cfg_if! {
if #[cfg(target_os = "linux")] {
#[no_mangle]
pub unsafe extern "C" fn audioipc_init_threads(init_params: *const AudioIpcInitParams) {
let thread_create_callback = (*init_params).thread_create_callback;

// It is critical that this function waits until the various threads are created, promoted to
// real-time, and _then_ return, because the sandbox lockdown happens right after returning
// from here.
let pair = Arc::new((Mutex::new((*init_params).pool_size), Condvar::new()));
let pair2 = pair.clone();

let register_thread = move || {
if let Some(func) = thread_create_callback {
match promote_current_thread_to_real_time(0, 48000) {
Ok(handle) => {
G_PRIORITY_HANDLES.with(|handles| {
(handles.borrow_mut()).push(handle);
});
}
Err(_) => {
error!("Could not promote audio threads to real-time during initialization.");
}
}
let thr = thread::current();
let name = CString::new(thr.name().unwrap()).unwrap();
func(name.as_ptr());
let &(ref lock, ref cvar) = &*pair2;
let mut count = lock.lock().unwrap();
*count -= 1;
cvar.notify_one();
}
};

let mut pool = G_THREAD_POOL.lock().unwrap();

*pool = Some(futures_cpupool::Builder::new()
.name_prefix("AudioIPC")
.after_start(register_thread)
.pool_size((*init_params).pool_size)
.stack_size((*init_params).stack_size)
.create());

let &(ref lock, ref cvar) = &*pair;
let mut count = lock.lock().unwrap();
while *count != 0 {
count = cvar.wait(count).unwrap();
}
}
}
}

#[no_mangle]
/// Entry point from C code.
pub unsafe extern "C" fn audioipc_client_init(
Expand Down
1 change: 1 addition & 0 deletions media/audioipc/server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ slab = "0.3.0"
futures = "0.1.18"
tokio-core = "0.1"
tokio-uds = "0.1.7"
audio_thread_priority = "0.13.0"

[dependencies.error-chain]
version = "0.11.0"
Expand Down
8 changes: 8 additions & 0 deletions media/audioipc/server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ extern crate libc;
extern crate slab;
extern crate tokio_core;
extern crate tokio_uds;
extern crate audio_thread_priority;

use audioipc::core;
use audioipc::platformhandle_passing::framed_with_platformhandles;
Expand All @@ -28,6 +29,7 @@ use futures::Future;
use std::error::Error;
use std::os::raw::c_void;
use std::ptr;
use audio_thread_priority::promote_current_thread_to_real_time;

mod server;

Expand Down Expand Up @@ -57,6 +59,12 @@ fn run() -> Result<ServerWrapper> {

let callback_thread = try!(
core::spawn_thread("AudioIPC Callback RPC", || {
match promote_current_thread_to_real_time(0, 48000) {
Ok(_) => { }
Err(_) => {
debug!("Failed to promote audio callback thread to real-time.");
}
}
trace!("Starting up cubeb audio callback event loop thread...");
Ok(())
}).or_else(|e| {
Expand Down

0 comments on commit 50faef2

Please sign in to comment.