Skip to content

Commit

Permalink
Properly Implement & Test Swarm Problem Resolution (quantum-elixir#369)
Browse files Browse the repository at this point in the history
  • Loading branch information
maennchen authored and c-rack committed Sep 6, 2018
1 parent 6db6328 commit bcf08bd
Show file tree
Hide file tree
Showing 10 changed files with 312 additions and 25 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ This project adheres to [Semantic Versioning](http://semver.org/).

## Unreleased

### Fixed
- Fix & Test Swarm Handoff & Conflict Resolution

Diff for [unreleased]

## 2.3.2 - 2018-08-21
Expand Down
12 changes: 7 additions & 5 deletions dialyzer.ignore-warnings
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@
:0: Unknown function 'Elixir.Quantum.RunStrategy.NodeList.Reference':'__impl__'/1
:0: Unknown function 'Elixir.Quantum.RunStrategy.NodeList.Tuple':'__impl__'/1
lib/quantum/date_library.ex:38: Overloaded contract for 'Elixir.Quantum.DateLibrary':'to_utc!'/2 has overlapping domains; such contracts are currently unsupported and are simply ignored
lib/quantum/execution_broadcaster.ex:278: The pattern {'ok', Vdate@1} can never match the type {'error',<<_:368>>}
lib/quantum/execution_broadcaster.ex:307: The pattern {'ok', Vdate@1} can never match the type {'error',<<_:368>>}
lib/quantum/execution_broadcaster.ex:331: Function add_to_state/3 will never be called
lib/quantum/execution_broadcaster.ex:339: Function add_job_at_date/3 will never be called
lib/quantum/execution_broadcaster.ex:346: Function find_date_and_put_job/3 will never be called
lib/quantum/execution_broadcaster.ex:286: The pattern {'ok', Vdate@1} can never match the type {'error',<<_:368>>}
lib/quantum/execution_broadcaster.ex:315: The pattern {'ok', Vdate@1} can never match the type {'error',<<_:368>>}
lib/quantum/execution_broadcaster.ex:286: The pattern {'ok', _date@1} can never match the type {'error',<<_:368>>}
lib/quantum/execution_broadcaster.ex:315: The pattern {'ok', _date@1} can never match the type {'error',<<_:368>>}
lib/quantum/execution_broadcaster.ex:339: Function add_to_state/3 will never be called
lib/quantum/execution_broadcaster.ex:347: Function add_job_at_date/3 will never be called
lib/quantum/execution_broadcaster.ex:354: Function find_date_and_put_job/3 will never be called
lib/quantum/util.ex:22: Function 'gen_stage_v12?'/0 has no local return
lib/quantum/util.ex:31: The call 'Elixir.Version':'match?'(binary(),<<_:64>>) will never return since it differs in the 1st argument from the success typing arguments: (#{'__struct__':='Elixir.Version', 'build':='nil' | binary(), 'major':=binary() | non_neg_integer(), 'minor':='nil' | non_neg_integer(), 'patch':='nil' | non_neg_integer(), 'pre':=[binary() | non_neg_integer()]},binary() | #{'__struct__':='Elixir.Version.Requirement', 'compiled':=boolean(), 'matchspec':=[{atom() | tuple(),[any()],[any()]}] | ets:comp_match_spec(), 'source':=_})
20 changes: 14 additions & 6 deletions lib/quantum/execution_broadcaster.ex
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,10 @@ defmodule Quantum.ExecutionBroadcaster do
{:stop, :shutdown, %{state | timer: nil}}
end

def handle_info({:swarm, :die}, %State{timer: nil} = state) do
{:stop, :shutdown, state}
end

defp handle_event({:add, %{name: job_name} = job}, %State{debug_logging: debug_logging} = state) do
debug_logging &&
Logger.debug(fn ->
Expand Down Expand Up @@ -219,6 +223,8 @@ defmodule Quantum.ExecutionBroadcaster do
"[#{inspect(Node.self())}][#{__MODULE__}] Handing of state to other cluster node"
end)

jobs = Enum.flat_map(jobs, fn {_time, jobs} -> jobs end)

{:reply, {:resume, {jobs, last_execution_date}}, [], state}
end

Expand All @@ -238,9 +244,10 @@ defmodule Quantum.ExecutionBroadcaster do
intermediate_state = %{state | time: earlier_last_execution_date}

new_state =
Enum.reduce(handoff_jobs, intermediate_state, fn job, acc_state ->
add_job_to_state(job, acc_state)
end)
handoff_jobs
|> Enum.reduce(intermediate_state, &add_job_to_state/2)
|> sort_state
|> reset_timer

{:noreply, [], new_state}
end
Expand All @@ -261,9 +268,10 @@ defmodule Quantum.ExecutionBroadcaster do
intermediate_state = %{state | time: earlier_last_execution_date}

new_state =
Enum.reduce(handoff_jobs, intermediate_state, fn job, acc_state ->
add_job_to_state(job, acc_state)
end)
handoff_jobs
|> Enum.reduce(intermediate_state, &add_job_to_state/2)
|> sort_state
|> reset_timer

{:noreply, [], new_state}
end
Expand Down
6 changes: 3 additions & 3 deletions lib/quantum/job_broadcaster.ex
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ defmodule Quantum.JobBroadcaster do
end)

new_jobs = Enum.into(handoff_jobs, jobs)
{:noreply, %{state | jobs: new_jobs, buffer: buffer ++ handoff_buffer}}
{:noreply, [], %{state | jobs: new_jobs, buffer: buffer ++ handoff_buffer}}
end

def handle_cast(
Expand All @@ -229,7 +229,7 @@ defmodule Quantum.JobBroadcaster do
end)

new_jobs = Enum.into(handoff_jobs, jobs)
{:noreply, %{state | jobs: new_jobs, buffer: buffer ++ handoff_buffer}}
{:noreply, [], %{state | jobs: new_jobs, buffer: buffer ++ handoff_buffer}}
end

def handle_call(:jobs, _, %State{jobs: jobs} = state),
Expand All @@ -243,7 +243,7 @@ defmodule Quantum.JobBroadcaster do
"[#{inspect(Node.self())}][#{__MODULE__}] Handing of state to other cluster node"
end)

{:reply, {:resume, {jobs, buffer}}, state}
{:reply, {:resume, {jobs, buffer}}, [], state}
end

def handle_info({:swarm, :die}, state) do
Expand Down
147 changes: 140 additions & 7 deletions test/quantum/execution_broadcaster_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,16 @@ defmodule Quantum.ExecutionBroadcasterTest do
import ExUnit.CaptureLog
import Quantum.CaptureLogExtend

alias Quantum.{ExecutionBroadcaster, ExecutionBroadcaster.State, Job}
alias Quantum.{ExecutionBroadcaster, ExecutionBroadcaster.State, HandoffHelper, Job}
alias Quantum.Storage.Test, as: TestStorage
alias Quantum.{TestConsumer, TestProducer}

# Allow max 10% Latency
@max_timeout 1_100

# Timeout until first event arrives
@init_timeout 15_000

doctest ExecutionBroadcaster

defmodule TestScheduler do
Expand Down Expand Up @@ -85,7 +88,7 @@ defmodule Quantum.ExecutionBroadcasterTest do
use Quantum.Storage.Test

def last_execution_date(_),
do: NaiveDateTime.add(NaiveDateTime.utc_now(), -3_600, :second)
do: NaiveDateTime.add(NaiveDateTime.utc_now(), -50, :second)
end

capture_log(fn ->
Expand All @@ -109,12 +112,13 @@ defmodule Quantum.ExecutionBroadcasterTest do

diff_seconds = NaiveDateTime.diff(NaiveDateTime.utc_now(), date, :second)

assert diff_seconds >= 3_600 - 1
assert diff_seconds >= 50 - 1

assert_receive {:received, {:execute, ^job}}, @init_timeout

assert_receive {:received, {:execute, ^job}}, @max_timeout
# Quickly executes until reached current time
for _ <- 0..diff_seconds do
assert_receive {:received, {:execute, ^job}}, 100
for i <- 0..(diff_seconds - 2) do
assert_receive {:received, {:execute, ^job}}, 500, "Fast execution #{i} not received"
end

# Maybe a little time elapsed in the test?
Expand All @@ -123,7 +127,7 @@ defmodule Quantum.ExecutionBroadcasterTest do
end

# Goes back to normal pace
refute_receive {:received, {:execute, ^job}}, 100
refute_receive {:received, {:execute, ^job}}, 900
end)
end

Expand Down Expand Up @@ -285,4 +289,133 @@ defmodule Quantum.ExecutionBroadcasterTest do
end)
end
end

describe "swarm/handoff" do
@tag manual_dispatch: true
test "works" do
Process.flag(:trap_exit, true)

{:ok, producer} = TestProducer.start_link()

%{start: {ExecutionBroadcaster, f, a}} =
ExecutionBroadcaster.child_spec(
{Module.concat(__MODULE__, Old), producer, TestStorage, TestScheduler, true}
)

{:ok, old_broadcaster} = apply(ExecutionBroadcaster, f, a)

{:ok, _old_consumer} = TestConsumer.start_link(old_broadcaster, self())

job =
TestScheduler.new_job()
|> Job.set_schedule(~e[*]e)

TestProducer.send(producer, {:add, job})

assert_receive {:received, {:execute, ^job}}, @max_timeout

%{start: {ExecutionBroadcaster, f, a}} =
ExecutionBroadcaster.child_spec(
{Module.concat(__MODULE__, New), producer, TestStorage, TestScheduler, true}
)

{:ok, new_broadcaster} = apply(ExecutionBroadcaster, f, a)

{:ok, _new_consumer} = TestConsumer.start_link(new_broadcaster, self())

HandoffHelper.initiate_handoff(old_broadcaster, new_broadcaster)

assert_receive {:EXIT, ^old_broadcaster, :shutdown}

assert_receive {:received, {:execute, ^job}}, @max_timeout
end

@tag manual_dispatch: true
test "works empty" do
Process.flag(:trap_exit, true)

{:ok, producer} = TestProducer.start_link()

%{start: {ExecutionBroadcaster, f, a}} =
ExecutionBroadcaster.child_spec(
{Module.concat(__MODULE__, Old), producer, TestStorage, TestScheduler, true}
)

{:ok, old_broadcaster} = apply(ExecutionBroadcaster, f, a)

{:ok, _old_consumer} = TestConsumer.start_link(old_broadcaster, self())

%{start: {ExecutionBroadcaster, f, a}} =
ExecutionBroadcaster.child_spec(
{Module.concat(__MODULE__, New), producer, TestStorage, TestScheduler, true}
)

{:ok, new_broadcaster} = apply(ExecutionBroadcaster, f, a)

{:ok, _new_consumer} = TestConsumer.start_link(new_broadcaster, self())

HandoffHelper.initiate_handoff(old_broadcaster, new_broadcaster)

assert_receive {:EXIT, ^old_broadcaster, :shutdown}

job =
TestScheduler.new_job()
|> Job.set_schedule(~e[*]e)

TestProducer.send(producer, {:add, job})

assert_receive {:received, {:execute, ^job}}, @init_timeout
end
end

describe "swarm/resolve_conflict" do
@tag manual_dispatch: true
test "works" do
Process.flag(:trap_exit, true)

{:ok, old_producer} = TestProducer.start_link()

%{start: {ExecutionBroadcaster, f, a}} =
ExecutionBroadcaster.child_spec(
{Module.concat(__MODULE__, Old), old_producer, TestStorage, TestScheduler, true}
)

{:ok, old_broadcaster} = apply(ExecutionBroadcaster, f, a)

{:ok, _old_consumer} = TestConsumer.start_link(old_broadcaster, self())

{:ok, new_producer} = TestProducer.start_link()

%{start: {ExecutionBroadcaster, f, a}} =
ExecutionBroadcaster.child_spec(
{Module.concat(__MODULE__, New), new_producer, TestStorage, TestScheduler, true}
)

{:ok, new_broadcaster} = apply(ExecutionBroadcaster, f, a)

{:ok, _new_consumer} = TestConsumer.start_link(new_broadcaster, self())

old_job =
TestScheduler.new_job()
|> Job.set_schedule(~e[*]e)

TestProducer.send(old_producer, {:add, old_job})

new_job =
TestScheduler.new_job()
|> Job.set_schedule(~e[*]e)

TestProducer.send(new_producer, {:add, new_job})

assert_receive {:received, {:execute, ^old_job}}, @max_timeout
assert_receive {:received, {:execute, ^new_job}}, @max_timeout

HandoffHelper.resolve_conflict(old_broadcaster, new_broadcaster)

assert_receive {:EXIT, ^old_broadcaster, :shutdown}

assert_receive {:received, {:execute, ^old_job}}, @max_timeout
assert_receive {:received, {:execute, ^new_job}}, @max_timeout
end
end
end
2 changes: 1 addition & 1 deletion test/quantum/executor_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ defmodule Quantum.ExecutorTest do
capture_log(fn ->
Executor.start_link({task_supervisor, task_registry, debug_logging}, {:execute, job})

Process.sleep(50)
Process.sleep(150)
end)

assert :marked_running = TaskRegistry.mark_running(task_registry, job.name, Node.self())
Expand Down
74 changes: 73 additions & 1 deletion test/quantum/job_broadcaster_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ defmodule Quantum.JobBroadcasterTest do

use ExUnit.Case, async: true

alias Quantum.{Job, JobBroadcaster}
alias Quantum.{HandoffHelper, Job, JobBroadcaster}
alias Quantum.Storage.Test, as: TestStorage
alias Quantum.TestConsumer

Expand Down Expand Up @@ -302,4 +302,76 @@ defmodule Quantum.JobBroadcasterTest do
assert active_job == TestScheduler.find_job(broadcaster, active_job_name)
end
end

describe "swarm/handoff" do
test "works" do
Process.flag(:trap_exit, true)

job = TestScheduler.new_job()
job_name = job.name

%{start: {JobBroadcaster, f, a}} =
JobBroadcaster.child_spec(
{Module.concat(__MODULE__, Old), [job], TestStorage, TestScheduler, true}
)

{:ok, old_job_broadcaster} = apply(JobBroadcaster, f, a)

{:ok, _old_consumer} = TestConsumer.start_link(old_job_broadcaster, self())

%{start: {JobBroadcaster, f, a}} =
JobBroadcaster.child_spec(
{Module.concat(__MODULE__, New), [], TestStorage, TestScheduler, true}
)

{:ok, new_job_broadcaster} = apply(JobBroadcaster, f, a)

{:ok, _new_consumer} = TestConsumer.start_link(new_job_broadcaster, self())

HandoffHelper.initiate_handoff(old_job_broadcaster, new_job_broadcaster)

assert TestScheduler.jobs(new_job_broadcaster) == [{job_name, job}]

assert_receive {:EXIT, ^old_job_broadcaster, :shutdown}
end
end

describe "swarm/resolve_conflict" do
test "works" do
Process.flag(:trap_exit, true)

job_1 = TestScheduler.new_job()
job_1_name = job_1.name

job_2 = TestScheduler.new_job()
job_2_name = job_2.name

%{start: {JobBroadcaster, f, a}} =
JobBroadcaster.child_spec(
{Module.concat(__MODULE__, Old), [job_1], TestStorage, TestScheduler, true}
)

{:ok, old_job_broadcaster} = apply(JobBroadcaster, f, a)

{:ok, _old_consumer} = TestConsumer.start_link(old_job_broadcaster, self())

%{start: {JobBroadcaster, f, a}} =
JobBroadcaster.child_spec(
{Module.concat(__MODULE__, New), [job_2], TestStorage, TestScheduler, true}
)

{:ok, new_job_broadcaster} = apply(JobBroadcaster, f, a)

{:ok, _new_consumer} = TestConsumer.start_link(new_job_broadcaster, self())

HandoffHelper.resolve_conflict(old_job_broadcaster, new_job_broadcaster)

resulting_jobs = TestScheduler.jobs(new_job_broadcaster)

assert Enum.member?(resulting_jobs, {job_1_name, job_1})
assert Enum.member?(resulting_jobs, {job_2_name, job_2})

assert_receive {:EXIT, ^old_job_broadcaster, :shutdown}
end
end
end
Loading

0 comments on commit bcf08bd

Please sign in to comment.