Skip to content

Commit

Permalink
Extract Node Selection into own Stage
Browse files Browse the repository at this point in the history
  • Loading branch information
maennchen committed Feb 26, 2020
1 parent 065c022 commit f04a95e
Show file tree
Hide file tree
Showing 13 changed files with 300 additions and 129 deletions.
2 changes: 1 addition & 1 deletion lib/quantum/execution_broadcaster.ex
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ defmodule Quantum.ExecutionBroadcaster do

defp add_to_state(%State{execution_timeline: execution_timeline} = state, time, date, job) do
unless NaiveDateTime.compare(time, date) in [:lt, :eq] do
raise JobInPastError
raise Quantum.ExecutionBroadcaster.JobInPastError
end

%{state | execution_timeline: add_job_at_date(execution_timeline, date, job)}
Expand Down
123 changes: 40 additions & 83 deletions lib/quantum/executor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,38 +7,29 @@ defmodule Quantum.Executor do
require Logger

alias Quantum.{
ExecutionBroadcaster.Event,
Job,
RunStrategy.NodeList,
NodeSelectorBroadcaster.Event,
TaskRegistry
}

alias __MODULE__.StartOpts

@doc false
@spec start_link(StartOpts.t(), Event.t()) :: {:ok, pid}
def start_link(opts, %Event{job: job}) do
def start_link(opts, %Event{job: job, node: node}) do
Task.start_link(fn ->
execute(opts, job)
execute(opts, job, node)
end)
end

@spec execute(StartOpts.t(), Job.t()) :: :ok
@spec execute(StartOpts.t(), Job.t(), Node.t()) :: :ok
# Execute task on all given nodes without checking for overlap
defp execute(
%StartOpts{
task_supervisor_reference: task_supervisor,
task_registry_reference: _task_registry,
debug_logging: debug_logging
},
%Job{overlap: true, run_strategy: run_strategy} = job
%StartOpts{task_supervisor_reference: task_supervisor, debug_logging: debug_logging},
%Job{overlap: true} = job,
node
) do
# Find Nodes to run on
# Check if Node is up and running
# Run Task
job
|> nodes(run_strategy, task_supervisor)
|> Enum.each(&run(&1, job, task_supervisor, debug_logging))
run(node, job, task_supervisor, debug_logging)

:ok
end
Expand All @@ -50,43 +41,35 @@ defmodule Quantum.Executor do
task_registry_reference: task_registry,
debug_logging: debug_logging
},
%Job{overlap: false, run_strategy: run_strategy, name: job_name} = job
%Job{overlap: false, name: job_name} = job,
node
) do
debug_logging &&
Logger.debug(fn ->
"[#{inspect(Node.self())}][#{__MODULE__}] Start execution of job #{inspect(job_name)}"
end)

# Find Nodes to run on
# Mark Running and only continue with item if it worked
# Check if Node is up and running
# Run Task
# Mark Task as finished
job
|> nodes(run_strategy, task_supervisor)
|> Enum.filter(&(TaskRegistry.mark_running(task_registry, job_name, &1) == :marked_running))
|> Enum.map(&run(&1, job, task_supervisor, debug_logging))
|> Enum.each(fn {node, %Task{ref: ref}} ->
receive do
{^ref, _} ->
TaskRegistry.mark_finished(task_registry, job_name, node)

{:DOWN, ^ref, _, _, _} ->
TaskRegistry.mark_finished(task_registry, job_name, node)
end
end)
case TaskRegistry.mark_running(task_registry, job_name, node) do
:marked_running ->
%Task{ref: ref} = run(node, job, task_supervisor, debug_logging)

:ok
end
receive do
{^ref, _} ->
TaskRegistry.mark_finished(task_registry, job_name, node)

{:DOWN, ^ref, _, _, _} ->
TaskRegistry.mark_finished(task_registry, job_name, node)

:ok
end

defp nodes(job, run_strategy, task_supervisor) do
run_strategy
|> NodeList.nodes(job)
|> Enum.filter(&check_node(&1, task_supervisor, job))
_ ->
:ok
end
end

# Ececute the given function on a given node via the task supervisor
@spec run(Node.t(), Job.t(), GenServer.server(), boolean()) :: {Node.t(), Task.t()}
@spec run(Node.t(), Job.t(), GenServer.server(), boolean()) :: Task.t()
defp run(node, %{name: job_name, task: task}, task_supervisor, debug_logging) do
debug_logging &&
Logger.debug(fn ->
Expand All @@ -95,49 +78,23 @@ defmodule Quantum.Executor do
}"
end)

{
node,
Task.Supervisor.async_nolink({task_supervisor, node}, fn ->
debug_logging &&
Logger.debug(fn ->
"[#{inspect(Node.self())}][#{__MODULE__}] Execute started for job #{inspect(job_name)}"
end)
Task.Supervisor.async_nolink({task_supervisor, node}, fn ->
debug_logging &&
Logger.debug(fn ->
"[#{inspect(Node.self())}][#{__MODULE__}] Execute started for job #{inspect(job_name)}"
end)

result = execute_task(task)
result = execute_task(task)

debug_logging &&
Logger.debug(fn ->
"[#{inspect(Node.self())}][#{__MODULE__}] Execution ended for job #{inspect(job_name)}, which yielded result: #{
inspect(result)
}"
end)
debug_logging &&
Logger.debug(fn ->
"[#{inspect(Node.self())}][#{__MODULE__}] Execution ended for job #{inspect(job_name)}, which yielded result: #{
inspect(result)
}"
end)

:ok
end)
}
end

@spec check_node(Node.t(), GenServer.server(), Job.t()) :: boolean
defp check_node(node, task_supervisor, %{name: job_name}) do
if running_node?(node, task_supervisor) do
true
else
Logger.warn(
"Node #{inspect(node)} is not running. Job #{inspect(job_name)} could not be executed."
)

false
end
end

# Check if the task supervisor runs on a given node
@spec running_node?(Node.t(), GenServer.server()) :: boolean
defp running_node?(node, _) when node == node(), do: true

defp running_node?(node, task_supervisor) do
node
|> :rpc.call(:erlang, :whereis, [task_supervisor])
|> is_pid()
:ok
end)
end

# Run function
Expand Down
6 changes: 3 additions & 3 deletions lib/quantum/executor_supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ defmodule Quantum.ExecutorSupervisor do
struct!(
InitOpts,
Map.take(opts, [
:execution_broadcaster_reference,
:node_selector_broadcaster_reference,
:task_supervisor_reference,
:task_registry_reference,
:debug_logging
Expand All @@ -29,7 +29,7 @@ defmodule Quantum.ExecutorSupervisor do
@doc false
def init(
%InitOpts{
execution_broadcaster_reference: execution_broadcaster
node_selector_broadcaster_reference: node_selector_broadcaster
} = opts
) do
executor_opts =
Expand All @@ -45,7 +45,7 @@ defmodule Quantum.ExecutorSupervisor do
ConsumerSupervisor.init(
[{Quantum.Executor, executor_opts}],
strategy: :one_for_one,
subscribe_to: [{execution_broadcaster, max_demand: 50}]
subscribe_to: [{node_selector_broadcaster, max_demand: 50}]
)
end
end
4 changes: 2 additions & 2 deletions lib/quantum/executor_supervisor/init_opts.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ defmodule Quantum.ExecutorSupervisor.InitOpts do
# Init Options for Quantum.ExecutorSupervisor

@type t :: %__MODULE__{
execution_broadcaster_reference: GenServer.server(),
node_selector_broadcaster_reference: GenServer.server(),
task_supervisor_reference: GenServer.server(),
task_registry_reference: GenServer.server(),
debug_logging: boolean
}

@enforce_keys [
:execution_broadcaster_reference,
:node_selector_broadcaster_reference,
:task_supervisor_reference,
:task_registry_reference,
:debug_logging
Expand Down
4 changes: 2 additions & 2 deletions lib/quantum/executor_supervisor/start_opts.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@ defmodule Quantum.ExecutorSupervisor.StartOpts do

@type t :: %__MODULE__{
name: GenServer.server(),
execution_broadcaster_reference: GenServer.server(),
node_selector_broadcaster_reference: GenServer.server(),
task_supervisor_reference: GenServer.server(),
task_registry_reference: GenServer.server(),
debug_logging: boolean()
}

@enforce_keys [
:name,
:execution_broadcaster_reference,
:node_selector_broadcaster_reference,
:task_supervisor_reference,
:task_registry_reference,
:debug_logging
Expand Down
86 changes: 86 additions & 0 deletions lib/quantum/node_selector_broadcaster.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
defmodule Quantum.NodeSelectorBroadcaster do
@moduledoc false

# Receives Added / Removed Jobs, Broadcasts Executions of Jobs

use GenStage

require Logger

alias Quantum.ExecutionBroadcaster.Event, as: ExecuteEvent
alias Quantum.Job
alias Quantum.RunStrategy.NodeList

alias __MODULE__.{Event, InitOpts, StartOpts, State}

@type event :: {:add, Job.t()} | {:execute, Job.t()}

@doc """
Start Stage
"""
@spec start_link(StartOpts.t()) :: GenServer.on_start()
def start_link(%StartOpts{name: name} = opts) do
GenStage.start_link(
__MODULE__,
struct!(
InitOpts,
Map.take(opts, [
:execution_broadcaster_reference,
:task_supervisor_reference
])
),
name: name
)
end

@doc false
def init(%InitOpts{
execution_broadcaster_reference: execution_broadcaster,
task_supervisor_reference: task_supervisor_reference
}) do
{:producer_consumer,
%State{
task_supervisor_reference: task_supervisor_reference
}, subscribe_to: [execution_broadcaster]}
end

def handle_events(events, _, %{task_supervisor_reference: task_supervisor_reference} = state) do
{:noreply,
Enum.flat_map(events, fn %ExecuteEvent{job: job} ->
job
|> select_nodes(task_supervisor_reference)
|> Enum.map(fn node ->
%Event{job: job, node: node}
end)
end), state}
end

defp select_nodes(%Job{run_strategy: run_strategy} = job, task_supervisor) do
run_strategy
|> NodeList.nodes(job)
|> Enum.filter(&check_node(&1, task_supervisor, job))
end

@spec check_node(Node.t(), GenServer.server(), Job.t()) :: boolean
defp check_node(node, task_supervisor, %{name: job_name}) do
if running_node?(node, task_supervisor) do
true
else
Logger.warn(
"Node #{inspect(node)} is not running. Job #{inspect(job_name)} could not be executed."
)

false
end
end

# Check if the task supervisor runs on a given node
@spec running_node?(Node.t(), GenServer.server()) :: boolean
defp running_node?(node, _) when node == node(), do: true

defp running_node?(node, task_supervisor) do
node
|> :rpc.call(:erlang, :whereis, [task_supervisor])
|> is_pid()
end
end
16 changes: 16 additions & 0 deletions lib/quantum/node_selector_broadcaster/event.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
defmodule Quantum.NodeSelectorBroadcaster.Event do
@moduledoc false

# Execute Event

alias Quantum.Job

@type t :: %__MODULE__{
job: Job.t(),
node: Node.t()
}

@enforce_keys [:job, :node]

defstruct @enforce_keys
end
16 changes: 16 additions & 0 deletions lib/quantum/node_selector_broadcaster/init_opts.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
defmodule Quantum.NodeSelectorBroadcaster.InitOpts do
@moduledoc false

# Init Options for Quantum.NodeSelectorBroadcaster

@type t :: %__MODULE__{
execution_broadcaster_reference: GenServer.server(),
task_supervisor_reference: GenServer.server()
}

@enforce_keys [
:execution_broadcaster_reference,
:task_supervisor_reference
]
defstruct @enforce_keys
end
18 changes: 18 additions & 0 deletions lib/quantum/node_selector_broadcaster/start_opts.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
defmodule Quantum.NodeSelectorBroadcaster.StartOpts do
@moduledoc false

# Start Options for Quantum.NodeSelectorBroadcaster

@type t :: %__MODULE__{
name: GenServer.server(),
execution_broadcaster_reference: GenServer.server(),
task_supervisor_reference: GenServer.server()
}

@enforce_keys [
:name,
:execution_broadcaster_reference,
:task_supervisor_reference
]
defstruct @enforce_keys
end
15 changes: 15 additions & 0 deletions lib/quantum/node_selector_broadcaster/state.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
defmodule Quantum.NodeSelectorBroadcaster.State do
@moduledoc false

# Internal State

@type t :: %__MODULE__{
task_supervisor_reference: GenServer.server()
}

@enforce_keys [
:task_supervisor_reference
]

defstruct @enforce_keys
end
Loading

0 comments on commit f04a95e

Please sign in to comment.