Skip to content

Commit

Permalink
Apply mix format
Browse files Browse the repository at this point in the history
  • Loading branch information
bitwalker committed Apr 4, 2018
1 parent e3e6269 commit cfb71aa
Show file tree
Hide file tree
Showing 12 changed files with 308 additions and 132 deletions.
6 changes: 6 additions & 0 deletions .formatter.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
[
inputs: [
"lib/**/*.ex",
"test/**/*.{ex, exs}",
],
]
27 changes: 16 additions & 11 deletions lib/app.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,25 @@ defmodule Cluster.App do
defp get_child_specs() do
import Supervisor.Spec, warn: false
specs = Application.get_env(:libcluster, :topologies, [])

for {topology, spec} <- specs do
strategy = Keyword.fetch!(spec, :strategy)
config = Keyword.get(spec, :config, [])
connect_mfa = Keyword.get(spec, :connect, {:net_kernel, :connect, []})
strategy = Keyword.fetch!(spec, :strategy)
config = Keyword.get(spec, :config, [])
connect_mfa = Keyword.get(spec, :connect, {:net_kernel, :connect, []})
disconnect_mfa = Keyword.get(spec, :disconnect, {:net_kernel, :disconnect, []})
list_nodes_mfa = Keyword.get(spec, :list_nodes, {:erlang, :nodes, [:connected]})
opts = Keyword.get(spec, :child_spec, [])
worker_args = [[
topology: topology,
connect: connect_mfa,
disconnect: disconnect_mfa,
list_nodes: list_nodes_mfa,
config: config
]]
opts = Keyword.get(spec, :child_spec, [])

worker_args = [
[
topology: topology,
connect: connect_mfa,
disconnect: disconnect_mfa,
list_nodes: list_nodes_mfa,
config: config
]
]

worker(strategy, worker_args, opts)
end
end
Expand Down
7 changes: 4 additions & 3 deletions lib/logger.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@ defmodule Cluster.Logger do
@moduledoc false
require Logger

def debug(t, msg) do
def debug(t, msg) do
if Application.get_env(:libcluster, :debug, false) do
log(:debug, t, msg)
end
end
def info(t, msg), do: log(:info, t, msg)
def warn(t, msg), do: log(:warn, t, msg)

def info(t, msg), do: log(:info, t, msg)
def warn(t, msg), do: log(:warn, t, msg)
def error(t, msg), do: log(:error, t, msg)

defp log(level, t, msg), do: Logger.log(level, "[libcluster:#{t}] #{msg}")
Expand Down
4 changes: 2 additions & 2 deletions lib/strategy/dns_poll.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ defmodule Cluster.Strategy.DNSPoll do
`<sname>@<ip-address>`. If your setup matches those assumptions, this strategy will periodically poll DNS and connect
all nodes it finds.
options:
## Options
* `poll_interval` - How often to poll in milliseconds (optional; default: 5_000)
* `query` - DNS query to use (required; e.g. "my-app.example.com")
* `node_sname` - The short name of the nodes you wish to connect to (required; e.g. "my-app")
example config:
## Usage
config :libcluster,
topologies: [
Expand Down
6 changes: 4 additions & 2 deletions lib/strategy/epmd.ex
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,14 @@ defmodule Cluster.Strategy.Epmd do

def start_link(opts) do
topology = Keyword.fetch!(opts, :topology)
config = Keyword.get(opts, :config, [])
connect = Keyword.fetch!(opts, :connect)
config = Keyword.get(opts, :config, [])
connect = Keyword.fetch!(opts, :connect)
list_nodes = Keyword.fetch!(opts, :list_nodes)

case Keyword.get(config, :hosts, []) do
[] ->
:ignore

nodes when is_list(nodes) ->
Cluster.Strategy.connect_nodes(topology, connect, list_nodes, nodes)
:ignore
Expand Down
6 changes: 5 additions & 1 deletion lib/strategy/erlang_hosts.ex
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ defmodule Cluster.Strategy.ErlangHosts do
topology = Keyword.fetch!(opts, :topology)
Cluster.Logger.warn(topology, "couldn't find .hosts.erlang file - not joining cluster")
:ignore

file ->
GenServer.start_link(__MODULE__, {opts, file})
end
Expand All @@ -39,6 +40,7 @@ defmodule Cluster.Strategy.ErlangHosts do
end

def handle_info(:timeout, state), do: handle_info(:connect, state)

def handle_info(:connect, state) do
new_state = connect_hosts(state)
{:noreply, new_state, configured_timeout(new_state)}
Expand All @@ -50,7 +52,7 @@ defmodule Cluster.Strategy.ErlangHosts do

defp connect_hosts(%{opts: opts, hosts_file: hosts_file} = state) do
topology = Keyword.fetch!(opts, :topology)
connect = Keyword.fetch!(opts, :connect)
connect = Keyword.fetch!(opts, :connect)
list_nodes = Keyword.fetch!(opts, :list_nodes)

nodes =
Expand All @@ -64,9 +66,11 @@ defmodule Cluster.Strategy.ErlangHosts do
end

defp gather_node_names([], acc), do: acc

defp gather_node_names([{{:ok, names}, host} | rest], acc) do
names = Enum.map(names, fn {name, _} -> String.to_atom("#{name}@#{host}") end)
gather_node_names(rest, names ++ acc)
end

defp gather_node_names([_ | rest], acc), do: gather_node_names(rest, acc)
end
58 changes: 37 additions & 21 deletions lib/strategy/gossip.ex
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ defmodule Cluster.Strategy.Gossip do
alias Cluster.Strategy.State

@default_port 45892
@default_addr {0,0,0,0}
@default_multicast_addr {230,1,1,251}
@default_addr {0, 0, 0, 0}
@default_multicast_addr {230, 1, 1, 251}

def start_link(opts) do
GenServer.start_link(__MODULE__, opts)
Expand All @@ -49,27 +49,35 @@ defmodule Cluster.Strategy.Gossip do
list_nodes: Keyword.fetch!(opts, :list_nodes),
config: Keyword.fetch!(opts, :config)
}

port = Keyword.get(state.config, :port, @default_port)
ip = Keyword.get(state.config, :if_addr, @default_addr) |> sanitize_ip
ttl = Keyword.get(state.config, :multicast_ttl, 1)
multicast_addr = Keyword.get(state.config, :multicast_addr, @default_multicast_addr) |> sanitize_ip
{:ok, socket} = :gen_udp.open(port, [
:binary,
active: true,
ip: ip,
reuseaddr: true,
broadcast: true,
multicast_ttl: ttl,
multicast_loop: true,
add_membership: {multicast_addr, {0,0,0,0}}
])
ip = Keyword.get(state.config, :if_addr, @default_addr) |> sanitize_ip
ttl = Keyword.get(state.config, :multicast_ttl, 1)

multicast_addr =
Keyword.get(state.config, :multicast_addr, @default_multicast_addr) |> sanitize_ip

{:ok, socket} =
:gen_udp.open(port, [
:binary,
active: true,
ip: ip,
reuseaddr: true,
broadcast: true,
multicast_ttl: ttl,
multicast_loop: true,
add_membership: {multicast_addr, {0, 0, 0, 0}}
])

state = %{state | :meta => {multicast_addr, port, socket}}
{:ok, state, 0}
end

defp sanitize_ip(input) do
case input do
{_a,_b,_c,_d} = ip -> ip
{_a, _b, _c, _d} = ip ->
ip

ip when is_binary(ip) ->
{:ok, addr} = :inet.parse_ipv4_address(~c"#{ip}")
addr
Expand All @@ -78,8 +86,9 @@ defmodule Cluster.Strategy.Gossip do

# Send stuttered heartbeats
def handle_info(:timeout, state), do: handle_info(:heartbeat, state)

def handle_info(:heartbeat, %State{meta: {multicast_addr, port, socket}} = state) do
debug state.topology, "heartbeat"
debug(state.topology, "heartbeat")
:gen_udp.send(socket, multicast_addr, port, heartbeat(node()))
Process.send_after(self(), :heartbeat, :rand.uniform(5_000))
{:noreply, state}
Expand All @@ -91,7 +100,7 @@ defmodule Cluster.Strategy.Gossip do
{:noreply, state}
end

def terminate(_type, _reason, %State{meta: {_,_,socket}}) do
def terminate(_type, _reason, %State{meta: {_, _, socket}}) do
:gen_udp.close(socket)
:ok
end
Expand All @@ -105,20 +114,27 @@ defmodule Cluster.Strategy.Gossip do
# is connected to us, and if not, we connect to it.
# If the connection fails, it's likely because the cookie
# is different, and thus a node we can ignore
@spec handle_heartbeat(State.t, binary) :: :ok
defp handle_heartbeat(%State{connect: connect, list_nodes: list_nodes} = state, <<"heartbeat::", rest::binary>>) do
@spec handle_heartbeat(State.t(), binary) :: :ok
defp handle_heartbeat(
%State{connect: connect, list_nodes: list_nodes} = state,
<<"heartbeat::", rest::binary>>
) do
self = node()

case :erlang.binary_to_term(rest) do
%{node: ^self} ->
:ok

%{node: n} when is_atom(n) ->
debug state.topology, "received heartbeat from #{n}"
debug(state.topology, "received heartbeat from #{n}")
Cluster.Strategy.connect_nodes(state.topology, connect, list_nodes, [n])
:ok

_ ->
:ok
end
end

defp handle_heartbeat(_state, _packet) do
:ok
end
Expand Down
Loading

0 comments on commit cfb71aa

Please sign in to comment.