From 297d23d4277703fc02432edf50bcfb8bf9b51d1c Mon Sep 17 00:00:00 2001 From: Masataka Shiraki Date: Fri, 3 Dec 2021 17:24:45 +0900 Subject: [PATCH] fix: create_worker to async --- lib/nif.ex | 5 ++-- lib/worker.ex | 22 +++++++++++++++--- native/mediasoup_elixir/src/atoms.rs | 1 + native/mediasoup_elixir/src/lib.rs | 22 +++++++++++++++++- native/mediasoup_elixir/src/worker.rs | 33 +++++++++++++-------------- test/integration/test_worker.ex | 11 +++++++++ test/worker_test.exs | 4 ++++ 7 files changed, 74 insertions(+), 24 deletions(-) diff --git a/lib/nif.ex b/lib/nif.ex index d0c97ef8..bb1243e1 100644 --- a/lib/nif.ex +++ b/lib/nif.ex @@ -8,11 +8,10 @@ defmodule Mediasoup.Nif do alias Mediasoup.{Worker, Router} # construct worker - @spec create_worker() :: {:ok, reference()} | {:error, String.t()} + @spec create_worker() :: :ok | {:error, String.t()} def create_worker(), do: :erlang.nif_error(:nif_not_loaded) - @spec create_worker(Worker.Settings.t()) :: - {:ok, reference()} | {:error, String.t()} + @spec create_worker(Worker.Settings.t()) :: :ok | {:error, String.t()} def create_worker(_option), do: :erlang.nif_error(:nif_not_loaded) # worker diff --git a/lib/worker.ex b/lib/worker.ex index 49bdaeb0..4f7ce0d5 100644 --- a/lib/worker.ex +++ b/lib/worker.ex @@ -204,14 +204,30 @@ defmodule Mediasoup.Worker do end defp create_worker(settings) when is_nil(settings) do - Nif.create_worker() + case Nif.create_worker() do + :ok -> + receive do + {:create_worker, msg} -> msg + end + + error -> + error + end end defp create_worker(%Worker.Settings{} = settings) do - Nif.create_worker(settings) + case Nif.create_worker(settings) do + :ok -> + receive do + {:create_worker, msg} -> msg + end + + error -> + error + end end defp create_worker(option) do - Nif.create_worker(Worker.Settings.from_map(option)) + create_worker(Worker.Settings.from_map(option)) end end diff --git a/native/mediasoup_elixir/src/atoms.rs b/native/mediasoup_elixir/src/atoms.rs index 018b9627..64ecc6eb 100644 --- a/native/mediasoup_elixir/src/atoms.rs +++ b/native/mediasoup_elixir/src/atoms.rs @@ -27,4 +27,5 @@ rustler::atoms! { on_layers_change, audio, video, + create_worker, } diff --git a/native/mediasoup_elixir/src/lib.rs b/native/mediasoup_elixir/src/lib.rs index 622e4cdb..0d611359 100644 --- a/native/mediasoup_elixir/src/lib.rs +++ b/native/mediasoup_elixir/src/lib.rs @@ -48,13 +48,14 @@ use crate::worker::{ worker_dump, worker_event, worker_id, worker_update_settings, }; +use futures_lite::future; use mediasoup::consumer::Consumer; use mediasoup::pipe_transport::PipeTransport; use mediasoup::producer::Producer; use mediasoup::router::Router; use mediasoup::webrtc_transport::WebRtcTransport; use mediasoup::worker::Worker; -use rustler::{Env, LocalPid, OwnedEnv, Term}; +use rustler::{Atom, Encoder, Env, LocalPid, OwnedEnv, Term}; pub fn send_msg_from_other_thread(pid: LocalPid, value: T) where @@ -66,6 +67,25 @@ where }); } +pub fn send_async_nif_result(env: Env, msg: Atom, future: Fut) +where + T: Encoder, + E: Encoder, + Fut: future::Future> + Send + 'static, +{ + let pid = env.pid(); + let mut my_env = OwnedEnv::new(); + std::thread::spawn(move || { + let result = future::block_on(future); + match result { + Ok(worker) => { + my_env.send_and_clear(&pid, |env| (msg, (atoms::ok(), worker)).encode(env)) + } + Err(err) => my_env.send_and_clear(&pid, |env| (msg, (atoms::error(), err)).encode(env)), + } + }); +} + rustler::init! { "Elixir.Mediasoup.Nif", [ diff --git a/native/mediasoup_elixir/src/worker.rs b/native/mediasoup_elixir/src/worker.rs index a6ede90e..7f72c381 100644 --- a/native/mediasoup_elixir/src/worker.rs +++ b/native/mediasoup_elixir/src/worker.rs @@ -1,14 +1,14 @@ use crate::atoms; use crate::json_serde::JsonSerdeWrap; use crate::router::RouterOptionsStruct; -use crate::{send_msg_from_other_thread, RouterRef, WorkerRef}; +use crate::{send_async_nif_result, send_msg_from_other_thread, RouterRef, WorkerRef}; use futures_lite::future; use mediasoup::worker::{ WorkerDtlsFiles, WorkerDump, WorkerId, WorkerLogLevel, WorkerLogTag, WorkerSettings, WorkerUpdateSettings, }; use mediasoup::worker_manager::WorkerManager; -use rustler::{Error, NifResult, NifStruct, ResourceArc}; +use rustler::{Env, Error, NifResult, NifStruct, ResourceArc}; use std::path::PathBuf; #[rustler::nif] @@ -110,29 +110,28 @@ pub fn worker_event( Ok((atoms::ok(),)) } -fn create_worker_impl( - settings: WorkerSettings, -) -> NifResult<(rustler::Atom, ResourceArc)> { - let worker_manager = WorkerManager::new(); +fn create_worker_impl(env: Env, settings: WorkerSettings) -> NifResult { + send_async_nif_result(env, atoms::create_worker(), async move { + let worker_manager = WorkerManager::new(); + worker_manager + .create_worker(settings) + .await + .map(|worker| WorkerRef::resource(worker)) + .map_err(|error| format!("{}", error)) + }); - let worker = future::block_on(async move { - return worker_manager.create_worker(settings).await; - }) - .map_err(|error| Error::Term(Box::new(format!("{}", error))))?; - Ok((atoms::ok(), WorkerRef::resource(worker))) + Ok(atoms::ok()) } #[rustler::nif(name = "create_worker")] -pub fn create_worker_no_arg() -> NifResult<(rustler::Atom, ResourceArc)> { - create_worker_impl(WorkerSettings::default()) +pub fn create_worker_no_arg(env: Env) -> NifResult { + create_worker_impl(env, WorkerSettings::default()) } #[rustler::nif] -pub fn create_worker( - settings: WorkerSettingsStruct, -) -> NifResult<(rustler::Atom, ResourceArc)> { +pub fn create_worker(env: Env, settings: WorkerSettingsStruct) -> NifResult { let settings = settings.try_to_setting()?; - create_worker_impl(settings) + create_worker_impl(env, settings) } #[derive(NifStruct)] diff --git a/test/integration/test_worker.ex b/test/integration/test_worker.ex index 19d36c36..d1f0387e 100644 --- a/test/integration/test_worker.ex +++ b/test/integration/test_worker.ex @@ -128,4 +128,15 @@ defmodule IntegrateTest.WorkerTest do refute Worker.closed?(worker) end + + def create_many_worker() do + 1..1000 + |> Enum.map(fn _ -> + {:ok, worker} = Worker.start_link() + worker + end) + |> Enum.map(fn worker -> + Worker.close(worker) + end) + end end diff --git a/test/worker_test.exs b/test/worker_test.exs index b3aff5f3..33b0437f 100644 --- a/test/worker_test.exs +++ b/test/worker_test.exs @@ -36,4 +36,8 @@ defmodule WorkerTest do test "close_router" do IntegrateTest.WorkerTest.close_router() end + + test "create_many_worker" do + IntegrateTest.WorkerTest.create_many_worker() + end end