Skip to content

Commit

Permalink
Solution: Faster startup for non-global (quantum-elixir#376) (quantum…
Browse files Browse the repository at this point in the history
  • Loading branch information
maennchen authored and c-rack committed Nov 22, 2018
1 parent f658a7d commit 7c2fa0c
Show file tree
Hide file tree
Showing 7 changed files with 157 additions and 52 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ This project adheres to [Semantic Versioning](http://semver.org/).

## Unreleased

### Fixed
- Faster Startup duration for non-global

Diff for [unreleased]

## 2.3.3 - 2018-09-06
Expand Down
53 changes: 48 additions & 5 deletions lib/quantum/cluster_task_supervisor_registry.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ defmodule Quantum.ClusterTaskSupervisorRegistry do
struct!(
InitOpts,
opts
|> Map.take([:task_supervisor_reference, :group_name])
|> Map.take([:task_supervisor_reference, :group_name, :global])
|> Map.put_new(:group_name, Module.concat(name, Group))
),
name: name
Expand All @@ -23,7 +23,11 @@ defmodule Quantum.ClusterTaskSupervisorRegistry do

@doc false
@impl true
def init(%InitOpts{task_supervisor_reference: task_supervisor, group_name: group_name}) do
def init(%InitOpts{
task_supervisor_reference: task_supervisor,
group_name: group_name,
global: true
}) do
task_supervisor_pid = GenServer.whereis(task_supervisor)

monitor_ref = Process.monitor(task_supervisor_pid)
Expand All @@ -41,30 +45,69 @@ defmodule Quantum.ClusterTaskSupervisorRegistry do
%State{
group_name: group_name,
task_supervisor_pid: task_supervisor_pid,
monitor_ref: monitor_ref
monitor_ref: monitor_ref,
global: true
}}
end

def init(%InitOpts{
task_supervisor_reference: task_supervisor,
group_name: group_name,
global: false
}) do
task_supervisor_pid = GenServer.whereis(task_supervisor)

monitor_ref = Process.monitor(task_supervisor_pid)

{:ok,
%State{
group_name: group_name,
task_supervisor_pid: task_supervisor_pid,
monitor_ref: monitor_ref,
global: false
}}
end

@doc false
@impl true
def handle_call(:pids, _from, %State{group_name: group_name} = state) do
def handle_call(:pids, _from, %State{group_name: group_name, global: true} = state) do
{:reply, Swarm.members(group_name), state}
end

def handle_call(
:pids,
_from,
%State{task_supervisor_pid: task_supervisor_pid, global: false} = state
) do
{:reply, [task_supervisor_pid], state}
end

@doc false
@impl true
def handle_info(
{:DOWN, monitor_ref, :process, task_supervisor_pid, _reason},
%State{
group_name: group_name,
task_supervisor_pid: task_supervisor_pid,
monitor_ref: monitor_ref
monitor_ref: monitor_ref,
global: true
} = state
) do
Swarm.leave(group_name, task_supervisor_pid)
{:stop, :terminate, state}
end

def handle_info(
{:DOWN, monitor_ref, :process, task_supervisor_pid, _reason},
%State{
task_supervisor_pid: task_supervisor_pid,
monitor_ref: monitor_ref,
global: false
} = state
) do
{:stop, :terminate, state}
end

@doc false
# Retrieve pids running the linked gen server
def pids(server \\ __MODULE__) do
Expand Down
5 changes: 3 additions & 2 deletions lib/quantum/cluster_task_supervisor_registry/init_opts.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ defmodule Quantum.ClusterTaskSupervisorRegistry.InitOpts do

@type t :: %__MODULE__{
task_supervisor_reference: GenServer.server(),
group_name: atom()
group_name: atom(),
global: boolean()
}

@enforce_keys [:task_supervisor_reference, :group_name]
@enforce_keys [:task_supervisor_reference, :group_name, :global]
defstruct @enforce_keys
end
5 changes: 3 additions & 2 deletions lib/quantum/cluster_task_supervisor_registry/start_opts.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ defmodule Quantum.ClusterTaskSupervisorRegistry.StartOpts do
@type t :: %__MODULE__{
name: GenServer.server(),
task_supervisor_reference: GenServer.server(),
group_name: atom() | nil
group_name: atom() | nil,
global: boolean()
}

@enforce_keys [:name, :task_supervisor_reference]
@enforce_keys [:name, :task_supervisor_reference, :global]
defstruct @enforce_keys ++ [:group_name]
end
5 changes: 3 additions & 2 deletions lib/quantum/cluster_task_supervisor_registry/state.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ defmodule Quantum.ClusterTaskSupervisorRegistry.State do
@type t :: %__MODULE__{
group_name: atom(),
task_supervisor_pid: GenServer.server(),
monitor_ref: reference
monitor_ref: reference,
global: boolean()
}

@enforce_keys [:group_name, :task_supervisor_pid, :monitor_ref]
@enforce_keys [:group_name, :task_supervisor_pid, :monitor_ref, :global]
defstruct @enforce_keys
end
2 changes: 1 addition & 1 deletion lib/quantum/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ defmodule Quantum.Supervisor do
struct!(
ClusterTaskSupervisorRegistryStartOpts,
opts
|> Map.take([:task_supervisor_reference])
|> Map.take([:task_supervisor_reference, :global])
|> Map.put(:name, cluster_task_supervisor_registry_name)
)

Expand Down
136 changes: 96 additions & 40 deletions test/quantum/cluster_task_supervisor_registry_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -6,53 +6,109 @@ defmodule Quantum.ClusterTaskSupervisorRegistryTest do
alias Quantum.ClusterTaskSupervisorRegistry
alias Quantum.ClusterTaskSupervisorRegistry.StartOpts

test "should register name", %{test: test} do
{:ok, task_supervisor_pid} = start_supervised({Task.Supervisor, name: test})

{:ok, registry_pid} =
start_supervised(
{ClusterTaskSupervisorRegistry,
%StartOpts{
name: Module.concat([__MODULE__, test, Registry]),
task_supervisor_reference: test,
group_name: Module.concat([__MODULE__, test, Group])
}}
)

Process.sleep(5_000)

registered_pids = ClusterTaskSupervisorRegistry.pids(registry_pid)
registered_nodes = ClusterTaskSupervisorRegistry.nodes(registry_pid)

assert Enum.count(registered_pids) == 1
assert Enum.member?(registered_pids, task_supervisor_pid)
assert Enum.count(registered_nodes) == 1
assert Enum.member?(registered_nodes, Node.self())
describe "global" do
test "should register name", %{test: test} do
{:ok, task_supervisor_pid} = start_supervised({Task.Supervisor, name: test})

{:ok, registry_pid} =
start_supervised(
{ClusterTaskSupervisorRegistry,
%StartOpts{
name: Module.concat([__MODULE__, test, Registry]),
task_supervisor_reference: test,
group_name: Module.concat([__MODULE__, test, Group]),
global: true
}}
)

Process.sleep(5_000)

registered_pids = ClusterTaskSupervisorRegistry.pids(registry_pid)
registered_nodes = ClusterTaskSupervisorRegistry.nodes(registry_pid)

assert Enum.count(registered_pids) == 1
assert Enum.member?(registered_pids, task_supervisor_pid)
assert Enum.count(registered_nodes) == 1
assert Enum.member?(registered_nodes, Node.self())
end

test "should quit when task_supervisor quits", %{test: test} do
test_pid = self()

spawn(fn ->
send(test_pid, Task.Supervisor.start_link(name: test))

send(
test_pid,
ClusterTaskSupervisorRegistry.start_link(%StartOpts{
name: Module.concat([__MODULE__, test, Registry]),
task_supervisor_reference: test,
group_name: Module.concat([__MODULE__, test, Group]),
global: true
})
)
end)

assert_receive {:ok, task_supervisor_pid}, 10_000
assert_receive {:ok, registry_pid}, 10_000

ref = Process.monitor(registry_pid)

Process.exit(task_supervisor_pid, :kill)

assert_receive {:DOWN, ^ref, :process, ^registry_pid, :terminate}
end
end

test "should quit when task_supervisor quits", %{test: test} do
test_pid = self()
describe "local" do
test "should register name", %{test: test} do
{:ok, task_supervisor_pid} = start_supervised({Task.Supervisor, name: test})

{:ok, registry_pid} =
start_supervised(
{ClusterTaskSupervisorRegistry,
%StartOpts{
name: Module.concat([__MODULE__, test, Registry]),
task_supervisor_reference: test,
group_name: Module.concat([__MODULE__, test, Group]),
global: false
}}
)

registered_pids = ClusterTaskSupervisorRegistry.pids(registry_pid)
registered_nodes = ClusterTaskSupervisorRegistry.nodes(registry_pid)

assert Enum.count(registered_pids) == 1
assert Enum.member?(registered_pids, task_supervisor_pid)
assert Enum.count(registered_nodes) == 1
assert Enum.member?(registered_nodes, Node.self())
end

test "should quit when task_supervisor quits", %{test: test} do
test_pid = self()

spawn(fn ->
send(test_pid, Task.Supervisor.start_link(name: test))
spawn(fn ->
send(test_pid, Task.Supervisor.start_link(name: test))

send(
test_pid,
ClusterTaskSupervisorRegistry.start_link(%StartOpts{
name: Module.concat([__MODULE__, test, Registry]),
task_supervisor_reference: test,
group_name: Module.concat([__MODULE__, test, Group])
})
)
end)
send(
test_pid,
ClusterTaskSupervisorRegistry.start_link(%StartOpts{
name: Module.concat([__MODULE__, test, Registry]),
task_supervisor_reference: test,
group_name: Module.concat([__MODULE__, test, Group]),
global: false
})
)
end)

assert_receive {:ok, task_supervisor_pid}, 10_000
assert_receive {:ok, registry_pid}, 10_000
assert_receive {:ok, task_supervisor_pid}, 10_000
assert_receive {:ok, registry_pid}, 10_000

ref = Process.monitor(registry_pid)
ref = Process.monitor(registry_pid)

Process.exit(task_supervisor_pid, :kill)
Process.exit(task_supervisor_pid, :kill)

assert_receive {:DOWN, ^ref, :process, ^registry_pid, :terminate}
assert_receive {:DOWN, ^ref, :process, ^registry_pid, :terminate}
end
end
end

0 comments on commit 7c2fa0c

Please sign in to comment.