Skip to content

Commit a31e0d9

Browse files
committed
Handle allocation request retransmissions
1 parent c413d53 commit a31e0d9

File tree

2 files changed

+63
-18
lines changed

2 files changed

+63
-18
lines changed

lib/rel/allocation_handler.ex

Lines changed: 34 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,17 +14,40 @@ defmodule Rel.AllocationHandler do
1414
@type five_tuple() ::
1515
{:inet.ip_address(), :inet.port_number(), :inet.ip_address(), :inet.port_number(), :udp}
1616

17+
@typedoc """
18+
Allocation handler init args.
19+
20+
* `t_id` is the origin allocation request transaction id
21+
* `response` is the origin response for the origin alloaction request
22+
"""
23+
@type alloc_args() :: [
24+
five_tuple: five_tuple(),
25+
alloc_socket: :gen_udp.socket(),
26+
turn_socket: :gen_udp.socket(),
27+
username: binary(),
28+
time_to_expiry: integer(),
29+
t_id: integer(),
30+
response: binary()
31+
]
32+
1733
@permission_lifetime Application.compile_env!(:rel, :permission_lifetime)
1834
@channel_lifetime Application.compile_env!(:rel, :channel_lifetime)
1935

20-
@spec start_link(term()) :: GenServer.on_start()
21-
def start_link([five_tuple, alloc_socket | _rest] = args) do
36+
@spec start_link(alloc_args()) :: GenServer.on_start()
37+
def start_link(args) do
38+
alloc_socket = Keyword.fetch!(args, :alloc_socket)
39+
five_tuple = Keyword.fetch!(args, :five_tuple)
40+
t_id = Keyword.fetch!(args, :t_id)
41+
response = Keyword.fetch!(args, :response)
42+
2243
{:ok, {_alloc_ip, alloc_port}} = :inet.sockname(alloc_socket)
2344

45+
alloc_origin_state = %{alloc_port: alloc_port, t_id: t_id, response: response}
46+
2447
GenServer.start_link(
2548
__MODULE__,
2649
args,
27-
name: {:via, Registry, {Registry.Allocations, five_tuple, alloc_port}}
50+
name: {:via, Registry, {Registry.Allocations, five_tuple, alloc_origin_state}}
2851
)
2952
end
3053

@@ -34,7 +57,13 @@ defmodule Rel.AllocationHandler do
3457
end
3558

3659
@impl true
37-
def init([five_tuple, socket, turn_socket, username, time_to_expiry]) do
60+
def init(args) do
61+
five_tuple = Keyword.fetch!(args, :five_tuple)
62+
alloc_socket = Keyword.fetch!(args, :alloc_socket)
63+
turn_socket = Keyword.fetch!(args, :turn_socket)
64+
username = Keyword.fetch!(args, :username)
65+
time_to_expiry = Keyword.fetch!(args, :time_to_expiry)
66+
3867
{c_ip, c_port, s_ip, s_port, _transport} = five_tuple
3968
alloc_id = "(#{:inet.ntoa(c_ip)}:#{c_port}, #{:inet.ntoa(s_ip)}:#{s_port}, UDP)"
4069
Logger.metadata(alloc: alloc_id)
@@ -48,7 +77,7 @@ defmodule Rel.AllocationHandler do
4877
%{
4978
alloc_id: alloc_id,
5079
turn_socket: turn_socket,
51-
socket: socket,
80+
socket: alloc_socket,
5281
five_tuple: five_tuple,
5382
username: username,
5483
expiry_timestamp: System.os_time(:second) + time_to_expiry,

lib/rel/listener.ex

Lines changed: 29 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -124,8 +124,13 @@ defmodule Rel.Listener do
124124
Logger.info("Received new allocation request")
125125
{c_ip, c_port, _, _, _} = five_tuple
126126

127+
handle_error = fn reason, socket, c_ip, c_port, msg ->
128+
{response, log_msg} = Utils.build_error(reason, msg.transaction_id, msg.type.method)
129+
Logger.warning(log_msg)
130+
:ok = :gen_udp.send(socket, c_ip, c_port, response)
131+
end
132+
127133
with {:ok, key} <- Auth.authenticate(msg),
128-
:ok <- is_not_retransmited?(msg, key, []),
129134
:ok <- refute_allocation(five_tuple),
130135
:ok <- check_requested_transport(msg),
131136
:ok <- check_dont_fragment(msg),
@@ -169,24 +174,39 @@ defmodule Rel.Listener do
169174
{:ok, alloc_pid} =
170175
DynamicSupervisor.start_child(
171176
Rel.AllocationSupervisor,
172-
{Rel.AllocationHandler, [five_tuple, alloc_socket, socket, username, lifetime]}
177+
{Rel.AllocationHandler,
178+
[
179+
five_tuple: five_tuple,
180+
alloc_socket: alloc_socket,
181+
turn_socket: socket,
182+
username: username,
183+
time_to_expiry: lifetime,
184+
t_id: msg.transaction_id,
185+
response: response
186+
]}
173187
)
174188

175189
:ok = :gen_udp.controlling_process(alloc_socket, alloc_pid)
176190

177191
:ok = :gen_udp.send(socket, c_ip, c_port, response)
178192
else
193+
{:error, :allocation_exists, %{t_id: origin_t_id, response: origin_response}}
194+
when origin_t_id == msg.transaction_id ->
195+
Logger.info("Allocation request retransmission")
196+
:ok = :gen_udp.send(socket, c_ip, c_port, origin_response)
197+
198+
{:error, :allocation_exists, _alloc_origin_state} ->
199+
handle_error.(:allocation_exists, socket, c_ip, c_port, msg)
200+
179201
{:error, reason} ->
180-
{response, log_msg} = Utils.build_error(reason, msg.transaction_id, msg.type.method)
181-
Logger.warning(log_msg)
182-
:ok = :gen_udp.send(socket, c_ip, c_port, response)
202+
handle_error.(reason, socket, c_ip, c_port, msg)
183203
end
184204
end
185205

186206
defp handle_message(socket, five_tuple, msg) do
187207
# TODO: are Registry entries removed fast enough?
188208
case fetch_allocation(five_tuple) do
189-
{:ok, alloc} ->
209+
{:ok, alloc, _alloc_origin_state} ->
190210
AllocationHandler.process_message(alloc, msg)
191211

192212
{:error, :allocation_not_found = reason} ->
@@ -209,22 +229,17 @@ defmodule Rel.Listener do
209229
end
210230
end
211231

212-
defp is_not_retransmited?(_msg, _key, _allocation_requests) do
213-
# TODO: handle retransmitions, RFC 5766 6.2
214-
:ok
215-
end
216-
217232
defp fetch_allocation(five_tuple) do
218233
case Registry.lookup(Registry.Allocations, five_tuple) do
219-
[{allocation, _value}] -> {:ok, allocation}
234+
[{alloc, alloc_origin_state}] -> {:ok, alloc, alloc_origin_state}
220235
[] -> {:error, :allocation_not_found}
221236
end
222237
end
223238

224239
defp refute_allocation(five_tuple) do
225240
case fetch_allocation(five_tuple) do
226241
{:error, :allocation_not_found} -> :ok
227-
{:ok, _alloc} -> {:error, :allocation_exists}
242+
{:ok, _alloc, alloc_origin_state} -> {:error, :allocation_exists, alloc_origin_state}
228243
end
229244
end
230245

@@ -295,6 +310,7 @@ defmodule Rel.Listener do
295310
used_alloc_ports =
296311
Registry.Allocations
297312
|> Registry.select([{{:_, :_, :"$3"}, [], [:"$3"]}])
313+
|> Enum.map(fn alloc_origin_state -> Map.fetch!(alloc_origin_state, :alloc_port) end)
298314
|> MapSet.new()
299315

300316
available_alloc_ports = MapSet.difference(@default_alloc_ports, used_alloc_ports)

0 commit comments

Comments
 (0)