Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Remove blocking for BEAM thread #98

Merged
merged 7 commits into from
Dec 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
206 changes: 164 additions & 42 deletions lib/nif.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,77 @@ defmodule Mediasoup.Nif do

alias Mediasoup.{Worker, Router}

# construct worker
@spec create_worker() :: {:ok, reference()} | {:error, String.t()}
def create_worker(), do: :erlang.nif_error(:nif_not_loaded)
# async nif functions

@spec create_worker(Worker.Settings.t()) ::
{:ok, reference()} | {:error, String.t()}
def create_worker(_option), do: :erlang.nif_error(:nif_not_loaded)
## worker with async
defp create_worker_async(), do: :erlang.nif_error(:nif_not_loaded)
defp create_worker_async(_option), do: :erlang.nif_error(:nif_not_loaded)
defp worker_create_router_async(_worker, _option), do: :erlang.nif_error(:nif_not_loaded)
defp worker_dump_async(_worker), do: :erlang.nif_error(:nif_not_loaded)
defp worker_update_settings_async(_worker, _option), do: :erlang.nif_error(:nif_not_loaded)

## router with async
defp router_create_pipe_transport_async(
_reference,
_option
),
do: :erlang.nif_error(:nif_not_loaded)

defp router_create_webrtc_transport_async(_router, _option),
do: :erlang.nif_error(:nif_not_loaded)

defp router_dump_async(_router), do: :erlang.nif_error(:nif_not_loaded)

## pipe_transport with async
defp pipe_transport_get_stats_async(_transport), do: :erlang.nif_error(:nif_not_loaded)
defp pipe_transport_dump_async(_transport), do: :erlang.nif_error(:nif_not_loaded)
defp pipe_transport_consume_async(_transport, _option), do: :erlang.nif_error(:nif_not_loaded)
defp pipe_transport_connect_async(_transport, _option), do: :erlang.nif_error(:nif_not_loaded)
defp pipe_transport_produce_async(_transport, _option), do: :erlang.nif_error(:nif_not_loaded)

## webrtc_transport with async
defp webrtc_transport_get_stats_async(_transport), do: :erlang.nif_error(:nif_not_loaded)
defp webrtc_transport_dump_async(_transport), do: :erlang.nif_error(:nif_not_loaded)
defp webrtc_transport_restart_ice_async(_transport), do: :erlang.nif_error(:nif_not_loaded)

defp webrtc_transport_set_max_incoming_bitrate_async(_transport, _bitrate),
do: :erlang.nif_error(:nif_not_loaded)

defp webrtc_transport_set_max_outgoing_bitrate_async(_transport, _bitrate),
do: :erlang.nif_error(:nif_not_loaded)

defp webrtc_transport_consume_async(_transport, _option), do: :erlang.nif_error(:nif_not_loaded)
defp webrtc_transport_connect_async(_transport, _option), do: :erlang.nif_error(:nif_not_loaded)
defp webrtc_transport_produce_async(_transport, _option), do: :erlang.nif_error(:nif_not_loaded)

## consumer with async
defp consumer_get_stats_async(_consumer), do: :erlang.nif_error(:nif_not_loaded)
defp consumer_pause_async(_consumer), do: :erlang.nif_error(:nif_not_loaded)
defp consumer_resume_async(_consumer), do: :erlang.nif_error(:nif_not_loaded)

defp consumer_set_preferred_layers_async(_consumer, _referred_layers),
do: :erlang.nif_error(:nif_not_loaded)

defp consumer_set_priority_async(_consumer, _priority), do: :erlang.nif_error(:nif_not_loaded)
defp consumer_unset_priority_async(_consumer), do: :erlang.nif_error(:nif_not_loaded)
defp consumer_request_key_frame_async(_consumer), do: :erlang.nif_error(:nif_not_loaded)
defp consumer_dump_async(_consumer), do: :erlang.nif_error(:nif_not_loaded)

## producer with async
defp producer_pause_async(_producer), do: :erlang.nif_error(:nif_not_loaded)
defp producer_resume_async(_producer), do: :erlang.nif_error(:nif_not_loaded)
defp producer_get_stats_async(_producer), do: :erlang.nif_error(:nif_not_loaded)
defp producer_dump_async(_producer), do: :erlang.nif_error(:nif_not_loaded)

# construct worker
def create_worker(), do: create_worker_async() |> handle_async_nif_result()
def create_worker(option), do: create_worker_async(option) |> handle_async_nif_result()

# worker
@spec worker_create_router(reference, Router.create_option()) :: {:ok, reference()} | {:error}
def worker_create_router(_worker, _option), do: :erlang.nif_error(:nif_not_loaded)
def worker_create_router(worker, option),
do: worker_create_router_async(worker, option) |> handle_async_nif_result()

@spec worker_id(reference) :: String.t()
def worker_id(_worker), do: :erlang.nif_error(:nif_not_loaded)
@spec worker_close(reference) :: {:ok} | {:error}
Expand All @@ -27,9 +87,12 @@ defmodule Mediasoup.Nif do
@spec worker_closed(reference) :: boolean
def worker_closed(_worker), do: :erlang.nif_error(:nif_not_loaded)
@spec worker_update_settings(reference, Worker.update_option()) :: {:ok} | {:error}
def worker_update_settings(_worker, _option), do: :erlang.nif_error(:nif_not_loaded)
def worker_update_settings(worker, option),
do: worker_update_settings_async(worker, option) |> handle_async_nif_result()

@spec worker_dump(reference) :: map | {:error}
def worker_dump(_worker), do: :erlang.nif_error(:nif_not_loaded)
def worker_dump(worker),
do: worker_dump_async(worker) |> handle_async_nif_result() |> unwrap_ok()

# router
@spec router_id(reference) :: String.t()
Expand All @@ -40,14 +103,15 @@ defmodule Mediasoup.Nif do
def router_closed(_router), do: :erlang.nif_error(:nif_not_loaded)

def router_create_pipe_transport(
_reference,
_option
router,
option
),
do: :erlang.nif_error(:nif_not_loaded)
do: router_create_pipe_transport_async(router, option) |> handle_async_nif_result()

@spec router_create_webrtc_transport(reference, map) ::
{:ok, reference()} | {:error, String.t()}
def router_create_webrtc_transport(_router, _option), do: :erlang.nif_error(:nif_not_loaded)
def router_create_webrtc_transport(router, option),
do: router_create_webrtc_transport_async(router, option) |> handle_async_nif_result()

@spec router_can_consume(reference, String.t(), Router.rtpCapabilities()) :: boolean
def router_can_consume(_router, _producer_id, _rtp_capabilities),
Expand All @@ -59,7 +123,8 @@ defmodule Mediasoup.Nif do
@spec router_event(reference, pid, [atom()]) :: {:ok} | {:error}
def router_event(_router, _pid, _event_types), do: :erlang.nif_error(:nif_not_loaded)
@spec router_dump(reference) :: any
def router_dump(_router), do: :erlang.nif_error(:nif_not_loaded)
def router_dump(router),
do: router_dump_async(router) |> handle_async_nif_result() |> unwrap_ok()

# webrtc_transport
@spec webrtc_transport_id(reference) :: String.t()
Expand All @@ -70,49 +135,77 @@ defmodule Mediasoup.Nif do
def webrtc_transport_closed(_transport), do: :erlang.nif_error(:nif_not_loaded)

@spec webrtc_transport_consume(reference, any) :: {:ok, reference()} | {:error, String.t()}
def webrtc_transport_consume(_transport, _option), do: :erlang.nif_error(:nif_not_loaded)
def webrtc_transport_consume(transport, option),
do: webrtc_transport_consume_async(transport, option) |> handle_async_nif_result()

@spec webrtc_transport_connect(reference, any) :: {:ok} | {:error, String.t()}
def webrtc_transport_connect(_transport, _option), do: :erlang.nif_error(:nif_not_loaded)
def webrtc_transport_connect(transport, option),
do: webrtc_transport_connect_async(transport, option) |> handle_async_nif_result()

@spec webrtc_transport_produce(reference, any) :: {:ok, reference()} | {:error, String.t()}
def webrtc_transport_produce(_transport, _option), do: :erlang.nif_error(:nif_not_loaded)
def webrtc_transport_produce(transport, option),
do: webrtc_transport_produce_async(transport, option) |> handle_async_nif_result()

def webrtc_transport_ice_parameters(_transport), do: :erlang.nif_error(:nif_not_loaded)
def webrtc_transport_ice_candidates(_transport), do: :erlang.nif_error(:nif_not_loaded)
def webrtc_transport_ice_role(_transport), do: :erlang.nif_error(:nif_not_loaded)
def webrtc_transport_sctp_parameters(_transport), do: :erlang.nif_error(:nif_not_loaded)

def webrtc_transport_set_max_incoming_bitrate(_transport, _bitrate),
do: :erlang.nif_error(:nif_not_loaded)
def webrtc_transport_set_max_incoming_bitrate(transport, bitrate),
do:
webrtc_transport_set_max_incoming_bitrate_async(transport, bitrate)
|> handle_async_nif_result()

def webrtc_transport_set_max_outgoing_bitrate(_transport, _bitrate),
do: :erlang.nif_error(:nif_not_loaded)
def webrtc_transport_set_max_outgoing_bitrate(transport, bitrate),
do:
webrtc_transport_set_max_outgoing_bitrate_async(transport, bitrate)
|> handle_async_nif_result()

def webrtc_transport_ice_state(_transport), do: :erlang.nif_error(:nif_not_loaded)
def webrtc_transport_restart_ice(_transport), do: :erlang.nif_error(:nif_not_loaded)

def webrtc_transport_restart_ice(transport),
do: webrtc_transport_restart_ice_async(transport) |> handle_async_nif_result()

def webrtc_transport_ice_selected_tuple(_transport), do: :erlang.nif_error(:nif_not_loaded)
def webrtc_transport_dtls_parameters(_transport), do: :erlang.nif_error(:nif_not_loaded)
def webrtc_transport_dtls_state(_transport), do: :erlang.nif_error(:nif_not_loaded)
def webrtc_transport_sctp_state(_transport), do: :erlang.nif_error(:nif_not_loaded)
def webrtc_transport_get_stats(_transport), do: :erlang.nif_error(:nif_not_loaded)

def webrtc_transport_get_stats(transport),
do: webrtc_transport_get_stats_async(transport) |> handle_async_nif_result() |> unwrap_ok()

@spec webrtc_transport_event(reference, pid, [atom()]) :: {:ok} | {:error}
def webrtc_transport_event(_transport, _pid, _event_types),
do: :erlang.nif_error(:nif_not_loaded)

def webrtc_transport_dump(_transport), do: :erlang.nif_error(:nif_not_loaded)
def webrtc_transport_dump(transport),
do: webrtc_transport_dump_async(transport) |> handle_async_nif_result() |> unwrap_ok()

# pipe_transport
def pipe_transport_id(_transport), do: :erlang.nif_error(:nif_not_loaded)
def pipe_transport_close(_transport), do: :erlang.nif_error(:nif_not_loaded)
@spec pipe_transport_closed(reference) :: boolean
def pipe_transport_closed(_transport), do: :erlang.nif_error(:nif_not_loaded)
def pipe_transport_consume(_transport, _option), do: :erlang.nif_error(:nif_not_loaded)
def pipe_transport_connect(_transport, _option), do: :erlang.nif_error(:nif_not_loaded)
def pipe_transport_produce(_transport, _option), do: :erlang.nif_error(:nif_not_loaded)
def pipe_transport_get_stats(_transport), do: :erlang.nif_error(:nif_not_loaded)

def pipe_transport_consume(transport, option),
do: pipe_transport_consume_async(transport, option) |> handle_async_nif_result()

def pipe_transport_connect(transport, option),
do: pipe_transport_connect_async(transport, option) |> handle_async_nif_result()

def pipe_transport_produce(transport, option),
do: pipe_transport_produce_async(transport, option) |> handle_async_nif_result()

def pipe_transport_get_stats(transport),
do: pipe_transport_get_stats_async(transport) |> handle_async_nif_result() |> unwrap_ok()

def pipe_transport_tuple(_transport), do: :erlang.nif_error(:nif_not_loaded)
def pipe_transport_sctp_parameters(_transport), do: :erlang.nif_error(:nif_not_loaded)
def pipe_transport_sctp_state(_transport), do: :erlang.nif_error(:nif_not_loaded)
def pipe_transport_srtp_parameters(_transport), do: :erlang.nif_error(:nif_not_loaded)
def pipe_transport_dump(_transport), do: :erlang.nif_error(:nif_not_loaded)

def pipe_transport_dump(transport),
do: pipe_transport_dump_async(transport) |> handle_async_nif_result() |> unwrap_ok()

def pipe_transport_event(_transport, _pid, _event_types),
do: :erlang.nif_error(:nif_not_loaded)
Expand Down Expand Up @@ -143,18 +236,28 @@ defmodule Mediasoup.Nif do
def consumer_score(_consumer), do: :erlang.nif_error(:nif_not_loaded)
def consumer_preferred_layers(_consumer), do: :erlang.nif_error(:nif_not_loaded)
def consumer_current_layers(_consumer), do: :erlang.nif_error(:nif_not_loaded)
def consumer_get_stats(_consumer), do: :erlang.nif_error(:nif_not_loaded)
def consumer_pause(_consumer), do: :erlang.nif_error(:nif_not_loaded)
def consumer_resume(_consumer), do: :erlang.nif_error(:nif_not_loaded)

def consumer_set_preferred_layers(_consumer, _referred_layers),
do: :erlang.nif_error(:nif_not_loaded)
def consumer_get_stats(consumer),
do: consumer_get_stats_async(consumer) |> handle_async_nif_result() |> unwrap_ok()

def consumer_pause(consumer), do: consumer_pause_async(consumer) |> handle_async_nif_result()
def consumer_resume(consumer), do: consumer_resume_async(consumer) |> handle_async_nif_result()

def consumer_set_preferred_layers(consumer, referred_layers),
do:
consumer_set_preferred_layers_async(consumer, referred_layers) |> handle_async_nif_result()

def consumer_set_priority(consumer, priority),
do: consumer_set_priority_async(consumer, priority) |> handle_async_nif_result()

def consumer_set_priority(_consumer, _priority), do: :erlang.nif_error(:nif_not_loaded)
def consumer_unset_priority(_consumer), do: :erlang.nif_error(:nif_not_loaded)
def consumer_request_key_frame(_consumer), do: :erlang.nif_error(:nif_not_loaded)
def consumer_unset_priority(consumer),
do: consumer_unset_priority_async(consumer) |> handle_async_nif_result()

def consumer_dump(_consumer), do: :erlang.nif_error(:nif_not_loaded)
def consumer_request_key_frame(consumer),
do: consumer_request_key_frame_async(consumer) |> handle_async_nif_result()

def consumer_dump(consumer),
do: consumer_dump_async(consumer) |> handle_async_nif_result() |> unwrap_ok()

# producer
@spec producer_id(reference) :: String.t()
Expand All @@ -168,9 +271,9 @@ defmodule Mediasoup.Nif do
@spec producer_close(reference) :: {:ok} | {:error}
def producer_close(_consumer), do: :erlang.nif_error(:nif_not_loaded)
@spec producer_pause(reference) :: {:ok} | {:error}
def producer_pause(_producer), do: :erlang.nif_error(:nif_not_loaded)
def producer_pause(producer), do: producer_pause_async(producer) |> handle_async_nif_result()
@spec producer_resume(reference) :: {:ok} | {:error}
def producer_resume(_producer), do: :erlang.nif_error(:nif_not_loaded)
def producer_resume(producer), do: producer_resume_async(producer) |> handle_async_nif_result()

@spec producer_closed(reference) :: boolean()
def producer_closed(_producer), do: :erlang.nif_error(:nif_not_loaded)
Expand All @@ -180,9 +283,28 @@ defmodule Mediasoup.Nif do
@spec producer_score(reference) :: list() | {:error}
def producer_score(_producer), do: :erlang.nif_error(:nif_not_loaded)
@spec producer_get_stats(reference) :: list() | {:error}
def producer_get_stats(_producer), do: :erlang.nif_error(:nif_not_loaded)
def producer_get_stats(producer),
do: producer_get_stats_async(producer) |> handle_async_nif_result() |> unwrap_ok()

@spec producer_event(reference, pid, [atom()]) :: {:ok} | {:error}
def producer_event(_producer, _pid, _event_types), do: :erlang.nif_error(:nif_not_loaded)
def producer_dump(_producer), do: :erlang.nif_error(:nif_not_loaded)

def producer_dump(producer),
do: producer_dump_async(producer) |> handle_async_nif_result() |> unwrap_ok()

defp handle_async_nif_result(result) do
case result do
{:ok, result_key} ->
receive do
{^result_key, {:ok, {}}} -> {:ok}
{^result_key, msg} -> msg
end

error ->
error
end
end

defp unwrap_ok({:ok, result}), do: result
defp unwrap_ok(result), do: result
end
1 change: 1 addition & 0 deletions native/mediasoup_elixir/src/atoms.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,5 @@ rustler::atoms! {
on_layers_change,
audio,
video,
mediasoup_async_nif_result,
}
Loading