Skip to content

Commit

Permalink
fix: create_worker to async
Browse files Browse the repository at this point in the history
  • Loading branch information
satoren committed Dec 3, 2021
1 parent b0799c7 commit 297d23d
Show file tree
Hide file tree
Showing 7 changed files with 74 additions and 24 deletions.
5 changes: 2 additions & 3 deletions lib/nif.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,10 @@ defmodule Mediasoup.Nif do
alias Mediasoup.{Worker, Router}

# construct worker
@spec create_worker() :: {:ok, reference()} | {:error, String.t()}
@spec create_worker() :: :ok | {:error, String.t()}
def create_worker(), do: :erlang.nif_error(:nif_not_loaded)

@spec create_worker(Worker.Settings.t()) ::
{:ok, reference()} | {:error, String.t()}
@spec create_worker(Worker.Settings.t()) :: :ok | {:error, String.t()}
def create_worker(_option), do: :erlang.nif_error(:nif_not_loaded)

# worker
Expand Down
22 changes: 19 additions & 3 deletions lib/worker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -204,14 +204,30 @@ defmodule Mediasoup.Worker do
end

defp create_worker(settings) when is_nil(settings) do
Nif.create_worker()
case Nif.create_worker() do
:ok ->
receive do
{:create_worker, msg} -> msg
end

error ->
error
end
end

defp create_worker(%Worker.Settings{} = settings) do
Nif.create_worker(settings)
case Nif.create_worker(settings) do
:ok ->
receive do
{:create_worker, msg} -> msg
end

error ->
error
end
end

defp create_worker(option) do
Nif.create_worker(Worker.Settings.from_map(option))
create_worker(Worker.Settings.from_map(option))
end
end
1 change: 1 addition & 0 deletions native/mediasoup_elixir/src/atoms.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,5 @@ rustler::atoms! {
on_layers_change,
audio,
video,
create_worker,
}
22 changes: 21 additions & 1 deletion native/mediasoup_elixir/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,14 @@ use crate::worker::{
worker_dump, worker_event, worker_id, worker_update_settings,
};

use futures_lite::future;
use mediasoup::consumer::Consumer;
use mediasoup::pipe_transport::PipeTransport;
use mediasoup::producer::Producer;
use mediasoup::router::Router;
use mediasoup::webrtc_transport::WebRtcTransport;
use mediasoup::worker::Worker;
use rustler::{Env, LocalPid, OwnedEnv, Term};
use rustler::{Atom, Encoder, Env, LocalPid, OwnedEnv, Term};

pub fn send_msg_from_other_thread<T>(pid: LocalPid, value: T)
where
Expand All @@ -66,6 +67,25 @@ where
});
}

pub fn send_async_nif_result<T, E, Fut>(env: Env, msg: Atom, future: Fut)
where
T: Encoder,
E: Encoder,
Fut: future::Future<Output = Result<T, E>> + Send + 'static,
{
let pid = env.pid();
let mut my_env = OwnedEnv::new();
std::thread::spawn(move || {
let result = future::block_on(future);
match result {
Ok(worker) => {
my_env.send_and_clear(&pid, |env| (msg, (atoms::ok(), worker)).encode(env))
}
Err(err) => my_env.send_and_clear(&pid, |env| (msg, (atoms::error(), err)).encode(env)),
}
});
}

rustler::init! {
"Elixir.Mediasoup.Nif",
[
Expand Down
33 changes: 16 additions & 17 deletions native/mediasoup_elixir/src/worker.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
use crate::atoms;
use crate::json_serde::JsonSerdeWrap;
use crate::router::RouterOptionsStruct;
use crate::{send_msg_from_other_thread, RouterRef, WorkerRef};
use crate::{send_async_nif_result, send_msg_from_other_thread, RouterRef, WorkerRef};
use futures_lite::future;
use mediasoup::worker::{
WorkerDtlsFiles, WorkerDump, WorkerId, WorkerLogLevel, WorkerLogTag, WorkerSettings,
WorkerUpdateSettings,
};
use mediasoup::worker_manager::WorkerManager;
use rustler::{Error, NifResult, NifStruct, ResourceArc};
use rustler::{Env, Error, NifResult, NifStruct, ResourceArc};
use std::path::PathBuf;

#[rustler::nif]
Expand Down Expand Up @@ -110,29 +110,28 @@ pub fn worker_event(
Ok((atoms::ok(),))
}

fn create_worker_impl(
settings: WorkerSettings,
) -> NifResult<(rustler::Atom, ResourceArc<WorkerRef>)> {
let worker_manager = WorkerManager::new();
fn create_worker_impl(env: Env, settings: WorkerSettings) -> NifResult<rustler::Atom> {
send_async_nif_result(env, atoms::create_worker(), async move {
let worker_manager = WorkerManager::new();
worker_manager
.create_worker(settings)
.await
.map(|worker| WorkerRef::resource(worker))
.map_err(|error| format!("{}", error))
});

let worker = future::block_on(async move {
return worker_manager.create_worker(settings).await;
})
.map_err(|error| Error::Term(Box::new(format!("{}", error))))?;
Ok((atoms::ok(), WorkerRef::resource(worker)))
Ok(atoms::ok())
}

#[rustler::nif(name = "create_worker")]
pub fn create_worker_no_arg() -> NifResult<(rustler::Atom, ResourceArc<WorkerRef>)> {
create_worker_impl(WorkerSettings::default())
pub fn create_worker_no_arg(env: Env) -> NifResult<rustler::Atom> {
create_worker_impl(env, WorkerSettings::default())
}

#[rustler::nif]
pub fn create_worker(
settings: WorkerSettingsStruct,
) -> NifResult<(rustler::Atom, ResourceArc<WorkerRef>)> {
pub fn create_worker(env: Env, settings: WorkerSettingsStruct) -> NifResult<rustler::Atom> {
let settings = settings.try_to_setting()?;
create_worker_impl(settings)
create_worker_impl(env, settings)
}

#[derive(NifStruct)]
Expand Down
11 changes: 11 additions & 0 deletions test/integration/test_worker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -128,4 +128,15 @@ defmodule IntegrateTest.WorkerTest do

refute Worker.closed?(worker)
end

def create_many_worker() do
1..1000
|> Enum.map(fn _ ->
{:ok, worker} = Worker.start_link()
worker
end)
|> Enum.map(fn worker ->
Worker.close(worker)
end)
end
end
4 changes: 4 additions & 0 deletions test/worker_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,8 @@ defmodule WorkerTest do
test "close_router" do
IntegrateTest.WorkerTest.close_router()
end

test "create_many_worker" do
IntegrateTest.WorkerTest.create_many_worker()
end
end

0 comments on commit 297d23d

Please sign in to comment.