Skip to content

Commit 500d623

Browse files
authored
Merge pull request #1 from abmBispo/schedulers-improvement
Schedulers improvement
2 parents 1b76a6d + 944239e commit 500d623

File tree

6 files changed

+100
-73
lines changed

6 files changed

+100
-73
lines changed

lib/elixir_queue/job.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
defmodule ElixirQueue.Job do
2-
defstruct [:mod, :func, :args]
2+
defstruct [:mod, :func, :args, :retry_attempts]
33
end

lib/elixir_queue/queue.ex

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,28 @@ defmodule ElixirQueue.Queue do
1313
def init(_opts), do: {:ok, {}}
1414

1515
@doc ~S"""
16-
Get next job to be processed
16+
Enqueues job to be processed
1717
## Examples
1818
iex> ElixirQueue.Queue.clear()
1919
iex> ElixirQueue.Queue.perform_later(Enum, :reverse, [[1,2,3,4,5]])
2020
:ok
2121
"""
2222
@spec perform_later(atom, atom, list(any)) :: :ok
2323
def perform_later(mod, func, args \\ []) do
24-
job = %Job{mod: mod, func: func, args: args}
24+
job = %Job{mod: mod, func: func, args: args, retry_attempts: 0}
25+
GenServer.call(Queue, {:perform_later, job})
26+
end
27+
28+
@doc ~S"""
29+
Enqueues job to be processed
30+
## Examples
31+
iex> ElixirQueue.Queue.clear()
32+
iex> job = %ElixirQueue.Job{mod: Enum, func: :reverse, args: [[1,2,3,4,5]]}
33+
iex> ElixirQueue.Queue.perform_later(job)
34+
:ok
35+
"""
36+
@spec perform_later(ElixirQueue.Job.t()) :: :ok
37+
def perform_later(job = %Job{}) do
2538
GenServer.call(Queue, {:perform_later, job})
2639
end
2740

@@ -41,6 +54,7 @@ defmodule ElixirQueue.Queue do
4154
@spec fetch :: {:ok, any} | {:error, :empty}
4255
def fetch, do: GenServer.call(Queue, :fetch)
4356

57+
@spec clear :: :ok
4458
def clear, do: GenServer.call(Queue, :clear)
4559

4660
@impl true

lib/elixir_queue/worker.ex

Lines changed: 28 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,44 +1,45 @@
11
defmodule ElixirQueue.Worker do
2-
use GenServer, restart: :temporary
2+
use Agent, restart: :temporary
33
require Logger
44

5-
alias ElixirQueue.Job
5+
alias ElixirQueue.{
6+
Job,
7+
WorkerPool
8+
}
69

10+
@spec start_link(any) :: {:error, any} | {:ok, pid}
711
@doc """
812
Starts a new worker.
913
"""
10-
@spec start_link(any) :: {:error, any} | {:ok, pid}
11-
def start_link(opts), do: GenServer.start_link(__MODULE__, opts)
12-
13-
@impl true
14-
@spec init(any) :: {:ok, Job.t()}
15-
def init(_opts), do: {:ok, %Job{}}
16-
17-
@impl true
18-
def handle_call({:start, job}, _from, _state),
19-
do: {:reply, :ok, job}
20-
21-
def handle_call({:perform, %Job{mod: mod, func: func, args: args}}, _from, state),
22-
do: {:reply, apply(mod, func, args), state}
14+
def start_link(_opts) do
15+
Agent.start_link(fn -> %Job{} end)
16+
end
2317

24-
def handle_call(:halt, _from, _state),
25-
do: {:reply, :ok, %Job{}}
18+
@spec get(pid()) :: Job.t()
19+
def get(worker), do: Agent.get(worker, fn state -> state end)
2620

27-
def handle_call(:idle?, _from, state),
28-
do: {:reply, state == %Job{}, state}
21+
@spec idle?(pid()) :: boolean
22+
def idle?(worker), do: Agent.get(worker, fn state -> state end) == %Job{}
2923

30-
@spec perform(pid(), Job.t()) :: any()
31-
def perform(worker, job) do
32-
GenServer.call(worker, {:start, job})
33-
result = GenServer.call(worker, {:perform, job})
34-
GenServer.call(worker, :halt)
24+
@spec perform(pid(), ElixirQueue.Job.t()) :: any()
25+
def perform(worker, job = %Job{mod: mod, func: func, args: args}) do
26+
start_job(worker, job)
27+
result = apply(mod, func, args)
28+
end_job(worker)
3529

3630
unless Mix.env() == :test,
3731
do: Logger.info("JOB DONE SUCCESSFULLY #{inspect(job)} ====> RESULT: #{inspect(result)}")
3832

39-
{:ok, result}
33+
{worker, job, result}
34+
end
35+
36+
defp start_job(worker, job) do
37+
Agent.update(worker, fn _ -> job end)
38+
WorkerPool.backup_worker(worker, job)
4039
end
4140

42-
@spec idle?(pid()) :: any
43-
def idle?(worker), do: GenServer.call(worker, :idle?)
41+
defp end_job(worker) do
42+
WorkerPool.clean_worker_backup(worker)
43+
Agent.update(worker, fn _ -> %Job{} end)
44+
end
4445
end

lib/elixir_queue/worker_pool.ex

Lines changed: 50 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@ defmodule ElixirQueue.WorkerPool do
55
alias ElixirQueue.{
66
WorkerSupervisor,
77
WorkerPool,
8-
Worker
8+
Worker,
9+
Queue
910
}
1011

1112
def start_link(opts) do
@@ -17,60 +18,65 @@ defmodule ElixirQueue.WorkerPool do
1718
@spec init(any) :: {:ok, %{failed_jobs: [], pids: [], successful_jobs: []}}
1819
def init(_opts) do
1920
pids =
20-
Fail proff for _ <- 1..Application.fetch_env!(:elixir_queue, :workers) do
21+
for _ <- 1..Application.fetch_env!(:elixir_queue, :workers) do
2122
{:ok, pid} = DynamicSupervisor.start_child(WorkerSupervisor, Worker)
22-
ref = Process.monitor(pid)
23-
{pid, ref}
23+
Process.monitor(pid)
24+
pid
2425
end
2526

27+
:ets.new(:worker_backup, [:set, :protected, :named_table])
28+
2629
{:ok, %{pids: pids, successful_jobs: [], failed_jobs: []}}
2730
end
2831

29-
# Server side functions
3032
@impl true
31-
def handle_call(:workers, _from, state), do: {:reply, Enum.map(state.pids, &elem(&1, 0)), state}
33+
def handle_call(:workers, _from, state) do
34+
{:reply, state.pids, state}
35+
end
3236

3337
def handle_call(:failed_jobs, _from, state), do: {:reply, state.failed_jobs, state}
3438

3539
def handle_call(:successful_jobs, _from, state), do: {:reply, state.successful_jobs, state}
3640

37-
def handle_call({:add_successful_job, worker, job, result}, _from, state),
38-
do: {
39-
:reply,
40-
:ok,
41-
Map.put(state, :successful_jobs, [{worker, job, result} | state.successful_jobs])
42-
}
41+
def handle_call({:add_successful_job, worker, job, result}, _from, state) do
42+
{:reply, :ok,
43+
Map.put(state, :successful_jobs, [{worker, job, result} | state.successful_jobs])}
44+
end
4345

44-
def handle_call({:add_failed_job, worker, job, err}, _from, state),
45-
do: {:reply, :ok, Map.put(state, :failed_jobs, [{worker, job, err} | state.failed_jobs])}
46+
def handle_call({:backup_worker, worker, job}, _from, state) do
47+
:ets.insert(:worker_backup, {worker, job})
48+
{:reply, :ok, state}
49+
end
50+
51+
def handle_call({:clean_worker_backup, worker}, _from, state) do
52+
:ets.delete(:worker_backup, worker)
53+
{:reply, :ok, state}
54+
end
4655

4756
@impl true
48-
def handle_info({:DOWN, _ref, :process, dead_worker, reason}, state) do
49-
{:ok, worker} = DynamicSupervisor.start_child(WorkerSupervisor, Worker)
50-
worker_reference = Process.monitor(worker)
57+
def handle_info({:DOWN, _, :process, dead_worker, reason}, state) do
58+
{:ok, pid} = DynamicSupervisor.start_child(WorkerSupervisor, Worker)
59+
Process.monitor(pid)
5160
pids = Enum.filter(state.pids, &(&1 != dead_worker))
5261

53-
reason |> IO.inspect(label: "reason")
62+
{^dead_worker, backuped_job} =
63+
:ets.lookup(:worker_backup, dead_worker)
64+
|> List.first()
5465

55-
unless Mix.env() == :test,
56-
do: Logger.error("Unexpected worker error:
57-
Worker #{inspect(dead_worker)} received EXIT SIGNAL.
58-
It have been replaced by #{inspect(worker)} worker.
59-
All the job progress was lost and job failed.
60-
By default job returned to the end of queue and will be performed again later.
61-
")
66+
backuped_job = Map.put(backuped_job, :retry_attempts, backuped_job.retry_attempts + 1)
6267

63-
{:noreply, Map.put(state, :pids, [{worker, worker_reference} | pids])}
64-
end
68+
state =
69+
state
70+
|> Map.put(:pids, [pid | pids])
71+
|> Map.put(:failed_jobs, [{dead_worker, backuped_job, reason} | state.failed_jobs])
6572

66-
def handle_info(_msg, state),
67-
do: {:noreply, state}
73+
if backuped_job.retry_attempts < Application.fetch_env!(:elixir_queue, :retries),
74+
do: Queue.perform_later(backuped_job)
6875

69-
# Client side functions #
76+
{:noreply, state}
77+
end
7078

71-
@doc """
72-
Returns _workers_ `PID`s kept in the state.
73-
"""
79+
# Client side functions
7480
@spec workers :: list()
7581
def workers, do: GenServer.call(__MODULE__, :workers)
7682

@@ -83,13 +89,17 @@ Fail proff for _ <- 1..Application.fetch_env!(:elixir_queue, :workers) do
8389
@spec successful_jobs :: list()
8490
def successful_jobs, do: GenServer.call(__MODULE__, :successful_jobs)
8591

86-
@spec add_successful_job(pid(), ElixirQueue.Job.t(), any) :: :ok
87-
def add_successful_job(worker, job, result),
92+
@spec add_successful_job({pid(), ElixirQueue.Job.t(), any}) :: :ok
93+
def add_successful_job({worker, job, result}),
8894
do: GenServer.call(__MODULE__, {:add_successful_job, worker, job, result})
8995

90-
@spec add_failed_job(pid(), ElixirQueue.Job.t(), any) :: :ok
91-
def add_failed_job(worker, job, err),
92-
do: GenServer.call(__MODULE__, {:add_failed_job, worker, job, err})
96+
@spec backup_worker(pid(), ElixirQueue.Job.t()) :: true
97+
def backup_worker(worker, job),
98+
do: GenServer.call(__MODULE__, {:backup_worker, worker, job})
99+
100+
@spec clean_worker_backup(pid()) :: true
101+
def clean_worker_backup(worker),
102+
do: GenServer.call(__MODULE__, {:clean_worker_backup, worker})
93103

94104
@spec idle_worker :: pid()
95105
def idle_worker do
@@ -102,10 +112,9 @@ Fail proff for _ <- 1..Application.fetch_env!(:elixir_queue, :workers) do
102112
@spec perform(ElixirQueue.Job.t()) :: no_return()
103113
def perform(job) do
104114
worker = WorkerPool.idle_worker()
105-
106115
Task.start(fn ->
107-
{:ok, result} = Worker.perform(worker, job)
108-
WorkerPool.add_successful_job(worker, job, result)
116+
Worker.perform(worker, job)
117+
|> WorkerPool.add_successful_job()
109118
end)
110119
end
111120
end

lib/fake_functions/fake.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ defmodule ElixirQueue.Fake do
2525
@spec populate :: :ok
2626
def populate do
2727
for _ <- 0..999, do: Queue.perform_later(Fake, :task, [3])
28-
:ok
28+
:ok
2929
end
3030

3131
@spec spec :: %{

mix.exs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,10 @@ defmodule ElixirQueue.MixProject do
1515
def application do
1616
[
1717
extra_applications: [:logger],
18-
env: [workers: System.schedulers_online()],
18+
env: [
19+
retries: 3,
20+
workers: System.schedulers_online()
21+
],
1922
mod: {ElixirQueue.Application, []}
2023
]
2124
end

0 commit comments

Comments
 (0)