Skip to content

Adds support for enabling and Disabling relays #572

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 18 commits into from
Apr 25, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 80 additions & 12 deletions lib/cog/relay/relays.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ defmodule Cog.Relay.Relays do

alias Carrier.Messaging
alias Cog.Models.Bundle
alias Cog.Models.Relay
alias Cog.Repo
alias Cog.Relay.Util

Expand All @@ -33,6 +34,24 @@ defmodule Cog.Relay.Relays do
GenServer.call(__MODULE__, {:drop_bundle, bundle}, :infinity)
end

@doc "Enables the relay"
@spec enable_relay(%Relay{}) :: :ok
def enable_relay(%Relay{}=relay) do
GenServer.call(__MODULE__, {:enable_relay, relay}, :infinity)
end

@doc "Disables the relay"
@spec disable_relay(%Relay{}) :: :ok
def disable_relay(%Relay{}=relay) do
GenServer.call(__MODULE__, {:disable_relay, relay}, :infinity)
end

@doc "Drops the relay"
@spec drop_relay(%Relay{}) :: :ok
def drop_relay(%Relay{}=relay) do
GenServer.call(__MODULE__, {:drop_relay, relay}, :infinity)
end

@doc """
Returns the IDs of all Relays currently running `bundle_name`. If no
Relays are running the bundle, an empty list is returned.
Expand Down Expand Up @@ -74,6 +93,18 @@ defmodule Cog.Relay.Relays do
tracker = Tracker.drop_bundle(state.tracker, bundle)
{:reply, :ok, %{state | tracker: tracker}}
end
def handle_call({:enable_relay, relay}, _from, state) do
tracker = enable_relay(state.tracker, relay.id)
{:reply, :ok, %{state | tracker: tracker}}
end
def handle_call({:disable_relay, relay}, _from, state) do
tracker = disable_relay(state.tracker, relay.id)
{:reply, :ok, %{state | tracker: tracker}}
end
def handle_call({:drop_relay, relay}, _from, state) do
tracker = remove_relay(state.tracker, relay.id)
{:reply, :ok, %{state | tracker: tracker}}
end
def handle_call({:relays_running, bundle_name} , _from, state),
do: {:reply, Tracker.relays(state.tracker, bundle_name), state}

Expand All @@ -99,7 +130,7 @@ defmodule Cog.Relay.Relays do
|> Enum.partition(&Util.is_ok?/1)
|> Util.unwrap_partition_results

tracker = update_tracker(announcement, tracker, success_bundles)
tracker = update_tracker(announcement, tracker, success_bundles, internal)

case Map.fetch(announcement, "reply_to") do
:error ->
Expand All @@ -120,30 +151,34 @@ defmodule Cog.Relay.Relays do
defp receipt(announcement_id, failed_bundles),
do: %{"announcement_id" => announcement_id, "status" => "failed", "bundles" => failed_bundles}

defp update_tracker(announcement, tracker, success_bundles) do
defp update_tracker(announcement, tracker, success_bundles, internal) do
relay_id = Map.fetch!(announcement, "relay")

online_status = case Map.fetch!(announcement, "online") do
true -> :online
false -> :offline
end

enabled_status = if internal || relay_enabled?(relay_id) do
:enabled
else
:disabled
end

snapshot_status = case Map.fetch!(announcement, "snapshot") do
true -> :snapshot
false -> :incremental
end

bundle_names = Enum.map(success_bundles, &Map.get(&1, :name)) # Just for logging purposes
case {online_status, snapshot_status} do
case {online_status, enabled_status} do
{:offline, _} ->
Logger.info("Removed Relay #{relay_id} from active relay list")
Tracker.remove_relay(tracker, relay_id)
{:online, :incremental} ->
Logger.info("Incrementally adding bundles for Relay #{relay_id}: #{inspect bundle_names}")
Tracker.add_bundles_for_relay(tracker, relay_id, success_bundles)
{:online, :snapshot} ->
Logger.info("Setting bundles list for Relay #{relay_id}: #{inspect bundle_names}")
Tracker.set_bundles_for_relay(tracker, relay_id, success_bundles)
remove_relay(tracker, relay_id)
{:online, :disabled} ->
load_bundles(snapshot_status, tracker, relay_id, success_bundles)
|> disable_relay(relay_id)
{:online, :enabled} ->
load_bundles(snapshot_status, tracker, relay_id, success_bundles)
|> enable_relay(relay_id)
end
end

Expand Down Expand Up @@ -184,4 +219,37 @@ defmodule Cog.Relay.Relays do
end
end

defp load_bundles(:incremental, tracker, relay_id, success_bundles) do
bundle_names = Enum.map(success_bundles, &Map.get(&1, :name)) # Just for logging purposes
Logger.info("Incrementally adding bundles for Relay #{relay_id}: #{inspect bundle_names}")
Tracker.add_bundles_for_relay(tracker, relay_id, success_bundles)
end
defp load_bundles(:snapshot, tracker, relay_id, success_bundles) do
bundle_names = Enum.map(success_bundles, &Map.get(&1, :name)) # Just for logging purposes
Logger.info("Setting bundles list for Relay #{relay_id}: #{inspect bundle_names}")
Tracker.set_bundles_for_relay(tracker, relay_id, success_bundles)
end

defp enable_relay(tracker, relay_id) do
Logger.info("Enabled Relay #{relay_id}")
Tracker.enable_relay(tracker, relay_id)
end

defp disable_relay(tracker, relay_id) do
Logger.info("Disabled Relay #{relay_id}")
Tracker.disable_relay(tracker, relay_id)
end

defp remove_relay(tracker, relay_id) do
Logger.info("Removed Relay #{relay_id} from active relay list")
Tracker.remove_relay(tracker, relay_id)
end

defp relay_enabled?(relay_id) do
case Repo.get(Relay, relay_id) do
%Relay{enabled: true} -> true
_ -> false
end
end

end
54 changes: 51 additions & 3 deletions lib/cog/relay/tracker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,16 @@ defmodule Cog.Relay.Tracker do

Tracks all the relays that have checked in with the bot, recording
which bundles they each serve.

Maintains a set of disabled relays. Relays that appear in the disabled
set will be filtered out when the list of relays for a bundle is requested.
Note: Relays must be explicitly disabled, otherwise they are assumed to be
available.
"""

@type t :: %__MODULE__{map: %{String.t => MapSet.t}}
defstruct [map: %{}]
@type t :: %__MODULE__{map: %{String.t => MapSet.t},
disabled: MapSet.t}
defstruct [map: %{}, disabled: MapSet.new]

@doc """
Create a new, empty Tracker
Expand All @@ -21,6 +27,40 @@ defmodule Cog.Relay.Tracker do
def new(),
do: %__MODULE__{}

@doc """
Enables a relay if it exists in the disabled set by removing it from the
disabled set. When the list of relays for a bundle is requested, disabled
bundles are filtered out.

Note: If a relay is assigned no bundles it is unknown to the tracker. When
enabling or disabling make sure to load bundles first or this will just be
a noop.
"""
@spec enable_relay(t, String.t) :: t
def enable_relay(tracker, relay_id) do
disabled = MapSet.delete(tracker.disabled, relay_id)
%{tracker | disabled: disabled}
end

@doc """
Disables a relay if it exists in the tracker by adding it to the disabled
set. When the list of relays for a bundle is requested, disabled bundles
are filtered out.

Note: If a relay is assigned no bundles it is unknown to the tracker. When
enabling or disabling make sure to load bundles first or this will just be
a noop.
"""
@spec disable_relay(t, String.t) :: t
def disable_relay(tracker, relay_id) do
if in_tracker?(tracker, relay_id) do
disabled = MapSet.put(tracker.disabled, relay_id)
%{tracker | disabled: disabled}
else
tracker
end
end

@doc """
Removes all record of `relay` from the tracker. If `relay` is the
last one serving a given bundle, that bundle is removed from the
Expand All @@ -36,7 +76,9 @@ defmodule Cog.Relay.Tracker do
Map.put(acc, bundle, remaining)
end
end)
%{tracker | map: updated}

disabled = MapSet.delete(tracker.disabled, relay)
%{tracker | map: updated, disabled: disabled}
end

@doc """
Expand Down Expand Up @@ -81,7 +123,13 @@ defmodule Cog.Relay.Tracker do
def relays(tracker, bundle_name) do
tracker.map
|> Map.get(bundle_name, MapSet.new)
|> MapSet.difference(tracker.disabled)
|> MapSet.to_list
end

defp in_tracker?(tracker, relay_id) do
Map.values(tracker.map)
|> Enum.reduce(&MapSet.union(&1, &2))
|> MapSet.member?(relay_id)
end
end
94 changes: 94 additions & 0 deletions lib/cog/repository/relays.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
defmodule Cog.Repository.Relays do
@moduledoc """
Behavioral API for interacting with relays. Prefer these
functions over direct manipulation with `Cog.Repo`.
"""

alias Cog.Repo
alias Cog.Models.Relay
alias Cog.Relay.Relays

@doc """
Creates a new relay given a map of attributes
"""
@spec new(Map.t) :: {:ok, %Relay{}} | {:error, Ecto.Changeset.t}
def new(attrs) do
%Relay{}
|> Relay.changeset(attrs)
|> Repo.insert
end

@doc """
Retrieves all relays.
"""
@spec all :: [%Relay{}]
def all,
do: Repo.all(Relay) |> Repo.preload([groups: :bundles])

@doc """
Retrieves a relay based on the id. The given id must be a
valid UUID.
"""
@spec by_id(String.t) :: {:ok, %Relay{}} | {:error, Ecto.Changeset.t} | {:error, Atom.t}
def by_id(id) do
with :ok <- valid_uuid(id) do
case Repo.get(Relay, id) |> Repo.preload([groups: :bundles]) do
%Relay{} = relay ->
{:ok, relay}
nil ->
{:error, :not_found}
end
end
end

@doc """
Removes a relay from the db and from the internal tracker
"""
@spec delete(String.t) :: {:ok, %Relay{}} | {:error, Ecto.Changeset.t} | {:error, Atom.t}
def delete(id) do
try do
with {:ok, relay} <- by_id(id),
{:ok, deleted_relay} <- Repo.delete(relay),
:ok <- Relays.drop_relay(relay),
do: {:ok, deleted_relay}
rescue
Ecto.StaleModelError ->
{:error, :not_found}
end
end

@doc """
Updates a relay. Toggles the relay status if the update
changes the status.
"""
@spec update(String.t, Map.t) :: {:ok, %Relay{}} | {:error, Ecto.Changeset.t}
def update(id, attrs) do
with {:ok, relay} <- by_id(id) do
changeset = Relay.changeset(relay, attrs)
case Repo.update(changeset) do
{:ok, relay} ->
# If the enabled flag has changed we need to enable/disable the relay
if Map.has_key?(changeset.changes, :enabled) do
update_relay_status(relay)
end
{:ok, relay}
{:error, changeset} ->
{:error, changeset}
end
end
end

defp valid_uuid(id) do
if Cog.UUID.is_uuid?(id) do
:ok
else
{:error, :bad_id}
end
end

defp update_relay_status(%Relay{enabled: true}=relay),
do: Relays.enable_relay(relay)
defp update_relay_status(%Relay{enabled: false}=relay),
do: Relays.disable_relay(relay)

end
Loading