Skip to content

Commit

Permalink
fix: create_worker to async
Browse files Browse the repository at this point in the history
  • Loading branch information
satoren committed Dec 3, 2021
1 parent 620bf69 commit e68d817
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 21 deletions.
21 changes: 16 additions & 5 deletions lib/nif.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,11 @@ defmodule Mediasoup.Nif do
alias Mediasoup.{Worker, Router}

# construct worker
@spec create_worker() :: {:ok, reference()} | {:error, String.t()}
def create_worker(), do: :erlang.nif_error(:nif_not_loaded)
defp create_worker_async(), do: :erlang.nif_error(:nif_not_loaded)
defp create_worker_async(_option), do: :erlang.nif_error(:nif_not_loaded)

@spec create_worker(Worker.Settings.t()) ::
{:ok, reference()} | {:error, String.t()}
def create_worker(_option), do: :erlang.nif_error(:nif_not_loaded)
def create_worker(), do: create_worker_async() |> handle_async_nif_result()
def create_worker(option), do: create_worker_async(option) |> handle_async_nif_result()

# worker
@spec worker_create_router(reference, Router.create_option()) :: {:ok, reference()} | {:error}
Expand Down Expand Up @@ -185,4 +184,16 @@ defmodule Mediasoup.Nif do
@spec producer_event(reference, pid, [atom()]) :: {:ok} | {:error}
def producer_event(_producer, _pid, _event_types), do: :erlang.nif_error(:nif_not_loaded)
def producer_dump(_producer), do: :erlang.nif_error(:nif_not_loaded)

def handle_async_nif_result(result) do
case result do
{:ok, result_key} ->
receive do
{^result_key, msg} -> msg
end

error ->
error
end
end
end
1 change: 1 addition & 0 deletions native/mediasoup_elixir/src/atoms.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,5 @@ rustler::atoms! {
on_layers_change,
audio,
video,
create_worker,
}
22 changes: 21 additions & 1 deletion native/mediasoup_elixir/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,14 @@ use crate::worker::{
worker_dump, worker_event, worker_id, worker_update_settings,
};

use futures_lite::future;
use mediasoup::consumer::Consumer;
use mediasoup::pipe_transport::PipeTransport;
use mediasoup::producer::Producer;
use mediasoup::router::Router;
use mediasoup::webrtc_transport::WebRtcTransport;
use mediasoup::worker::Worker;
use rustler::{Env, LocalPid, OwnedEnv, Term};
use rustler::{Atom, Encoder, Env, LocalPid, OwnedEnv, Term};

pub fn send_msg_from_other_thread<T>(pid: LocalPid, value: T)
where
Expand All @@ -66,6 +67,25 @@ where
});
}

pub fn send_async_nif_result<T, E, Fut>(env: Env, msg: Atom, future: Fut)
where
T: Encoder,
E: Encoder,
Fut: future::Future<Output = Result<T, E>> + Send + 'static,
{
let pid = env.pid();
let mut my_env = OwnedEnv::new();
std::thread::spawn(move || {
let result = future::block_on(future);
match result {
Ok(worker) => {
my_env.send_and_clear(&pid, |env| (msg, (atoms::ok(), worker)).encode(env))
}
Err(err) => my_env.send_and_clear(&pid, |env| (msg, (atoms::error(), err)).encode(env)),
}
});
}

rustler::init! {
"Elixir.Mediasoup.Nif",
[
Expand Down
35 changes: 20 additions & 15 deletions native/mediasoup_elixir/src/worker.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
use crate::atoms;
use crate::json_serde::JsonSerdeWrap;
use crate::router::RouterOptionsStruct;
use crate::{send_msg_from_other_thread, RouterRef, WorkerRef};
use crate::{send_async_nif_result, send_msg_from_other_thread, RouterRef, WorkerRef};
use futures_lite::future;
use mediasoup::worker::{
WorkerDtlsFiles, WorkerDump, WorkerId, WorkerLogLevel, WorkerLogTag, WorkerSettings,
WorkerUpdateSettings,
};
use mediasoup::worker_manager::WorkerManager;
use rustler::{Error, NifResult, NifStruct, ResourceArc};
use rustler::{Env, Error, NifResult, NifStruct, ResourceArc};
use std::path::PathBuf;

#[rustler::nif]
Expand Down Expand Up @@ -111,28 +111,33 @@ pub fn worker_event(
}

fn create_worker_impl(
env: Env,
settings: WorkerSettings,
) -> NifResult<(rustler::Atom, ResourceArc<WorkerRef>)> {
let worker_manager = WorkerManager::new();
) -> NifResult<(rustler::Atom, rustler::Atom)> {
send_async_nif_result(env, atoms::create_worker(), async move {
let worker_manager = WorkerManager::new();
worker_manager
.create_worker(settings)
.await
.map(WorkerRef::resource)
.map_err(|error| format!("{}", error))
});

let worker = future::block_on(async move {
return worker_manager.create_worker(settings).await;
})
.map_err(|error| Error::Term(Box::new(format!("{}", error))))?;
Ok((atoms::ok(), WorkerRef::resource(worker)))
Ok((atoms::ok(), atoms::create_worker()))
}

#[rustler::nif(name = "create_worker")]
pub fn create_worker_no_arg() -> NifResult<(rustler::Atom, ResourceArc<WorkerRef>)> {
create_worker_impl(WorkerSettings::default())
#[rustler::nif(name = "create_worker_async")]
pub fn create_worker_no_arg(env: Env) -> NifResult<(rustler::Atom, rustler::Atom)> {
create_worker_impl(env, WorkerSettings::default())
}

#[rustler::nif]
#[rustler::nif(name = "create_worker_async")]
pub fn create_worker(
env: Env,
settings: WorkerSettingsStruct,
) -> NifResult<(rustler::Atom, ResourceArc<WorkerRef>)> {
) -> NifResult<(rustler::Atom, rustler::Atom)> {
let settings = settings.try_to_setting()?;
create_worker_impl(settings)
create_worker_impl(env, settings)
}

#[derive(NifStruct)]
Expand Down
11 changes: 11 additions & 0 deletions test/integration/test_worker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -128,4 +128,15 @@ defmodule IntegrateTest.WorkerTest do

refute Worker.closed?(worker)
end

def create_many_worker() do
1..200
|> Enum.map(fn _ ->
{:ok, worker} = Worker.start_link()
worker
end)
|> Enum.map(fn worker ->
Worker.close(worker)
end)
end
end
4 changes: 4 additions & 0 deletions test/worker_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,8 @@ defmodule WorkerTest do
test "close_router" do
IntegrateTest.WorkerTest.close_router()
end

test "create_many_worker" do
IntegrateTest.WorkerTest.create_many_worker()
end
end

0 comments on commit e68d817

Please sign in to comment.