Skip to content

Commit

Permalink
Make Debug Logging Configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
Tim Digel authored and maennchen committed Mar 28, 2018
1 parent a05e3ba commit 5c2eb20
Show file tree
Hide file tree
Showing 10 changed files with 209 additions and 130 deletions.
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,13 @@ config :logger,
level: :debug
```

If you want do use the logger in debug-level without the messages from quantum:

```elixir
config :your_app, YourApp.Scheduler,
debug_logging: false
```

If you encounter any problems with `quantum`, please search if there is already an
[open issue](https://github.com/quantum-elixir/quantum-core/issues) addressing the problem.

Expand Down
3 changes: 2 additions & 1 deletion lib/quantum.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ defmodule Quantum do
schedule: nil,
overlap: true,
timezone: :utc,
run_strategy: {Random, :cluster}
run_strategy: {Random, :cluster},
debug_logging: true
]

@doc """
Expand Down
87 changes: 48 additions & 39 deletions lib/quantum/execution_broadcaster.ex
Original file line number Diff line number Diff line change
Expand Up @@ -25,37 +25,38 @@ defmodule Quantum.ExecutionBroadcaster do
* `job_broadcaster` - The name of the stage to listen to
"""
@spec start_link(GenServer.server(), GenServer.server()) :: GenServer.on_start()
def start_link(name, job_broadcaster) do
@spec start_link(GenServer.server(), GenServer.server(), boolean()) :: GenServer.on_start()
def start_link(name, job_broadcaster, debug_logging) do
__MODULE__
|> GenStage.start_link(job_broadcaster, name: name)
|> GenStage.start_link({job_broadcaster, debug_logging}, name: name)
|> Util.start_or_link()
end

@doc false
@spec child_spec({GenServer.server(), GenServer.server()}) :: Supervisor.child_spec()
def child_spec({name, job_broadcaster}) do
%{super([]) | start: {__MODULE__, :start_link, [name, job_broadcaster]}}
@spec child_spec({GenServer.server(), GenServer.server(), boolean()}) :: Supervisor.child_spec()
def child_spec({name, job_broadcaster, debug_logging}) do
%{super([]) | start: {__MODULE__, :start_link, [name, job_broadcaster, debug_logging]}}
end

@doc false
def init(job_broadcaster) do
state = %{jobs: [], time: NaiveDateTime.utc_now(), timer: nil}
def init({job_broadcaster, debug_logging}) do
state = %{jobs: [], time: NaiveDateTime.utc_now(), timer: nil, debug_logging: debug_logging}
{:producer_consumer, state, subscribe_to: [job_broadcaster]}
end

def handle_events(events, _, state) do
def handle_events(events, _, %{debug_logging: debug_logging} = state) do
reboot_add_events =
events
|> Enum.filter(&add_reboot_event?/1)
|> Enum.map(fn {:add, job} -> {:execute, job} end)

for {_, %{name: job_name}} <- reboot_add_events do
Logger.debug(fn ->
"[#{inspect(Node.self())}][#{__MODULE__}] Scheduling job for single reboot execution: #{
inspect(job_name)
}"
end)
debug_logging &&
Logger.debug(fn ->
"[#{inspect(Node.self())}][#{__MODULE__}] Scheduling job for single reboot execution: #{
inspect(job_name)
}"
end)
end

state =
Expand All @@ -68,7 +69,10 @@ defmodule Quantum.ExecutionBroadcaster do
{:noreply, reboot_add_events, state}
end

def handle_info(:execute, %{jobs: [{time_to_execute, jobs_to_execute} | tail]} = state) do
def handle_info(
:execute,
%{jobs: [{time_to_execute, jobs_to_execute} | tail], debug_logging: debug_logging} = state
) do
state =
state
|> Map.put(:timer, nil)
Expand All @@ -79,11 +83,12 @@ defmodule Quantum.ExecutionBroadcaster do
jobs_to_execute
|> (fn jobs ->
for %{name: job_name} <- jobs do
Logger.debug(fn ->
"[#{inspect(Node.self())}][#{__MODULE__}] Scheduling job for execution #{
inspect(job_name)
}"
end)
debug_logging &&
Logger.debug(fn ->
"[#{inspect(Node.self())}][#{__MODULE__}] Scheduling job for execution #{
inspect(job_name)
}"
end)
end

jobs
Expand All @@ -95,18 +100,20 @@ defmodule Quantum.ExecutionBroadcaster do
{:noreply, Enum.map(jobs_to_execute, fn job -> {:execute, job} end), state}
end

defp handle_event({:add, %{name: job_name} = job}, state) do
Logger.debug(fn ->
"[#{inspect(Node.self())}][#{__MODULE__}] Adding job #{inspect(job_name)}"
end)
defp handle_event({:add, %{name: job_name} = job}, %{debug_logging: debug_logging} = state) do
debug_logging &&
Logger.debug(fn ->
"[#{inspect(Node.self())}][#{__MODULE__}] Adding job #{inspect(job_name)}"
end)

add_job_to_state(job, state)
end

defp handle_event({:remove, name}, %{jobs: jobs} = state) do
Logger.debug(fn ->
"[#{inspect(Node.self())}][#{__MODULE__}] Removing job #{inspect(name)}"
end)
defp handle_event({:remove, name}, %{jobs: jobs, debug_logging: debug_logging} = state) do
debug_logging &&
Logger.debug(fn ->
"[#{inspect(Node.self())}][#{__MODULE__}] Removing job #{inspect(name)}"
end)

jobs =
jobs
Expand Down Expand Up @@ -219,7 +226,7 @@ defmodule Quantum.ExecutionBroadcaster do
Map.put(state, :timer, nil)
end

defp reset_timer(%{timer: nil, jobs: jobs} = state) do
defp reset_timer(%{timer: nil, jobs: jobs, debug_logging: debug_logging} = state) do
run_date = next_run_date(jobs)

timer =
Expand All @@ -231,20 +238,22 @@ defmodule Quantum.ExecutionBroadcaster do
|> DateTime.to_unix(:millisecond)
|> Kernel.-(System.time_offset(:millisecond))

Logger.debug(fn ->
"[#{inspect(Node.self())}][#{__MODULE__}] Continuing Execution Broadcasting at #{
inspect(monotonic_time)
} (#{NaiveDateTime.to_iso8601(run_date)})"
end)
debug_logging &&
Logger.debug(fn ->
"[#{inspect(Node.self())}][#{__MODULE__}] Continuing Execution Broadcasting at #{
inspect(monotonic_time)
} (#{NaiveDateTime.to_iso8601(run_date)})"
end)

Process.send_after(self(), :execute, monotonic_time, abs: true)

_ ->
Logger.debug(fn ->
"[#{inspect(Node.self())}][#{__MODULE__}] Continuing Execution Broadcasting ASAP (#{
NaiveDateTime.to_iso8601(run_date)
})"
end)
debug_logging &&
Logger.debug(fn ->
"[#{inspect(Node.self())}][#{__MODULE__}] Continuing Execution Broadcasting ASAP (#{
NaiveDateTime.to_iso8601(run_date)
})"
end)

send(self(), :execute)
nil
Expand Down
55 changes: 31 additions & 24 deletions lib/quantum/executor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,20 @@ defmodule Quantum.Executor do
* `message` - The Message to Execute (`{:execute, %Job{}}`)
"""
@spec start_link({GenServer.server(), GenServer.server()}, {:execute, Job.t()}) :: {:ok, pid}
def start_link({task_supervisor, task_registry}, {:execute, job}) do
@spec start_link({GenServer.server(), GenServer.server(), boolean()}, {:execute, Job.t()}) ::
{:ok, pid}
def start_link({task_supervisor, task_registry, debug_logging}, {:execute, job}) do
Task.start_link(fn ->
execute(task_supervisor, task_registry, job)
execute(task_supervisor, task_registry, debug_logging, job)
end)
end

@spec execute(GenServer.server(), GenServer.server(), Job.t()) :: :ok
@spec execute(GenServer.server(), GenServer.server(), boolean(), Job.t()) :: :ok
# Execute task on all given nodes without checking for overlap
defp execute(
task_supervisor,
_task_registry,
debug_logging,
%Job{overlap: true, run_strategy: run_strategy} = job
) do
# Find Nodes to run on
Expand All @@ -41,7 +43,7 @@ defmodule Quantum.Executor do
run_strategy
|> NodeList.nodes(job)
|> Enum.filter(&check_node(&1, task_supervisor, job))
|> Enum.each(&run(&1, job, task_supervisor))
|> Enum.each(&run(&1, job, task_supervisor, debug_logging))

:ok
end
Expand All @@ -50,11 +52,13 @@ defmodule Quantum.Executor do
defp execute(
task_supervisor,
task_registry,
debug_logging,
%Job{overlap: false, run_strategy: run_strategy, name: job_name} = job
) do
Logger.debug(fn ->
"[#{inspect(Node.self())}][#{__MODULE__}] Start execution of job #{inspect(job_name)}"
end)
debug_logging &&
Logger.debug(fn ->
"[#{inspect(Node.self())}][#{__MODULE__}] Start execution of job #{inspect(job_name)}"
end)

# Find Nodes to run on
# Mark Running and only continue with item if it worked
Expand All @@ -65,7 +69,7 @@ defmodule Quantum.Executor do
|> NodeList.nodes(job)
|> Enum.filter(&(TaskRegistry.mark_running(task_registry, job_name, &1) == :marked_running))
|> Enum.filter(&check_node(&1, task_supervisor, job))
|> Enum.map(&run(&1, job, task_supervisor))
|> Enum.map(&run(&1, job, task_supervisor, debug_logging))
|> Enum.each(fn {node, %Task{ref: ref}} ->
receive do
{^ref, _} ->
Expand All @@ -80,28 +84,31 @@ defmodule Quantum.Executor do
end

# Ececute the given function on a given node via the task supervisor
@spec run(Node.t(), Job.t(), GenServer.server()) :: {Node.t(), Task.t()}
defp run(node, %{name: job_name, task: task}, task_supervisor) do
Logger.debug(fn ->
"[#{inspect(Node.self())}][#{__MODULE__}] Task for job #{inspect(job_name)} started on node #{
inspect(node)
}"
end)
@spec run(Node.t(), Job.t(), GenServer.server(), boolean()) :: {Node.t(), Task.t()}
defp run(node, %{name: job_name, task: task}, task_supervisor, debug_logging) do
debug_logging &&
Logger.debug(fn ->
"[#{inspect(Node.self())}][#{__MODULE__}] Task for job #{inspect(job_name)} started on node #{
inspect(node)
}"
end)

{
node,
Task.Supervisor.async_nolink({task_supervisor, node}, fn ->
Logger.debug(fn ->
"[#{inspect(Node.self())}][#{__MODULE__}] Execute started for job #{inspect(job_name)}"
end)
debug_logging &&
Logger.debug(fn ->
"[#{inspect(Node.self())}][#{__MODULE__}] Execute started for job #{inspect(job_name)}"
end)

result = execute_task(task)

Logger.debug(fn ->
"[#{inspect(Node.self())}][#{__MODULE__}] Execution ended for job #{inspect(job_name)}, which yielded result: #{
inspect(result)
}"
end)
debug_logging &&
Logger.debug(fn ->
"[#{inspect(Node.self())}][#{__MODULE__}] Execution ended for job #{inspect(job_name)}, which yielded result: #{
inspect(result)
}"
end)

:ok
end)
Expand Down
28 changes: 17 additions & 11 deletions lib/quantum/executor_supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,17 @@ defmodule Quantum.ExecutorSupervisor do

alias Quantum.Util

@spec start_link(GenServer.server(), GenServer.server(), GenServer.server(), GenServer.server()) ::
GenServer.on_start()
def start_link(name, execution_broadcaster, task_supervisor, task_registry) do
@spec start_link(
GenServer.server(),
GenServer.server(),
GenServer.server(),
GenServer.server(),
boolean()
) :: GenServer.on_start()
def start_link(name, execution_broadcaster, task_supervisor, task_registry, debug_logging) do
__MODULE__
|> ConsumerSupervisor.start_link(
{execution_broadcaster, task_supervisor, task_registry},
{execution_broadcaster, task_supervisor, task_registry, debug_logging},
name: name
)
|> Util.start_or_link()
Expand All @@ -21,17 +26,17 @@ defmodule Quantum.ExecutorSupervisor do
# credo:disable-for-next-line Credo.Check.Design.TagTODO
# TODO: Remove when gen_stage:0.12 support is dropped
if Util.gen_stage_v12?() do
def init({execution_broadcaster, task_supervisor, task_registry}) do
def init({execution_broadcaster, task_supervisor, task_registry, debug_logging}) do
ConsumerSupervisor.init(
{Quantum.Executor, {task_supervisor, task_registry}},
{Quantum.Executor, {task_supervisor, task_registry, debug_logging}},
strategy: :one_for_one,
subscribe_to: [{execution_broadcaster, max_demand: 50}]
)
end
else
def init({execution_broadcaster, task_supervisor, task_registry}) do
def init({execution_broadcaster, task_supervisor, task_registry, debug_logging}) do
ConsumerSupervisor.init(
[{Quantum.Executor, {task_supervisor, task_registry}}],
[{Quantum.Executor, {task_supervisor, task_registry, debug_logging}}],
strategy: :one_for_one,
subscribe_to: [{execution_broadcaster, max_demand: 50}]
)
Expand All @@ -43,15 +48,16 @@ defmodule Quantum.ExecutorSupervisor do
GenServer.server(),
GenServer.server(),
GenServer.server(),
GenServer.server()
GenServer.server(),
boolean()
}) :: Supervisor.child_spec()
def child_spec({name, execution_broadcaster, task_supervisor, task_registry}) do
def child_spec({name, execution_broadcaster, task_supervisor, task_registry, debug_logging}) do
%{
super([])
| start: {
__MODULE__,
:start_link,
[name, execution_broadcaster, task_supervisor, task_registry]
[name, execution_broadcaster, task_supervisor, task_registry, debug_logging]
}
}
end
Expand Down
Loading

0 comments on commit 5c2eb20

Please sign in to comment.