Skip to content

Commit 0de1301

Browse files
committed
feat: add erlang distribution
Base on what recon, observer_cli and prometheus.erl do
1 parent 9a752ee commit 0de1301

File tree

5 files changed

+307
-1
lines changed

5 files changed

+307
-1
lines changed
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
defmodule Realtime.DistributedMetrics do
2+
@moduledoc """
3+
Gather stats for each connected node
4+
"""
5+
6+
require Record
7+
Record.defrecordp(:net_address, Record.extract(:net_address, from_lib: "kernel/include/net_address.hrl"))
8+
@spec info() :: %{node => map}
9+
def info do
10+
# First check if Erlang distribution is started
11+
if :net_kernel.get_state()[:started] != :no do
12+
{:ok, nodes_info} = :net_kernel.nodes_info()
13+
14+
port_addresses =
15+
:erlang.ports()
16+
|> Stream.filter(fn port ->
17+
:erlang.port_info(port, :name) == {:name, ~c"tcp_inet"}
18+
end)
19+
|> Stream.map(&{:inet.peername(&1), &1})
20+
|> Stream.filter(fn
21+
{{:ok, _peername}, _port} -> true
22+
_ -> false
23+
end)
24+
|> Enum.map(fn {{:ok, peername}, port} -> {peername, port} end)
25+
|> Enum.into(%{})
26+
27+
Map.new(nodes_info, &info(&1, port_addresses))
28+
else
29+
%{}
30+
end
31+
end
32+
33+
defp info({node, info}, port_addresses) do
34+
dist_pid = info[:owner]
35+
state = info[:state]
36+
37+
case info[:address] do
38+
net_address(address: address) when address != :undefined ->
39+
{node, info(node, port_addresses, dist_pid, state, address)}
40+
41+
_ ->
42+
{node, %{pid: dist_pid, state: state}}
43+
end
44+
end
45+
46+
defp info(node, port_addresses, dist_pid, state, address) do
47+
if dist_port = port_addresses[address] do
48+
%{
49+
inet_stats: inet_stats(dist_port),
50+
port: dist_port,
51+
pid: dist_pid,
52+
state: state
53+
}
54+
else
55+
%{pid: dist_pid, state: state}
56+
end
57+
|> Map.merge(%{
58+
queue_size: node_queue_size(node)
59+
})
60+
end
61+
62+
defp inet_stats(port) do
63+
case :inet.getstat(port) do
64+
{:ok, stats} ->
65+
stats
66+
67+
_ ->
68+
nil
69+
end
70+
end
71+
72+
defp node_queue_size(node) do
73+
case :ets.lookup(:sys_dist, node) do
74+
[dist] ->
75+
conn_id = elem(dist, 2)
76+
77+
with {:ok, _, _, queue_size} <- :erlang.dist_get_stat(conn_id) do
78+
{:ok, queue_size}
79+
else
80+
_ -> {:error, :not_found}
81+
end
82+
83+
_ ->
84+
{:error, :not_found}
85+
end
86+
end
87+
end
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
defmodule Realtime.PromEx.Plugins.Distributed do
2+
@moduledoc """
3+
Distributed erlang metrics
4+
"""
5+
6+
use PromEx.Plugin
7+
alias Realtime.DistributedMetrics
8+
9+
@event_node_queue_size [:prom_ex, :plugin, :dist, :queue_size]
10+
@event_recv_bytes [:prom_ex, :plugin, :dist, :recv, :bytes]
11+
@event_recv_count [:prom_ex, :plugin, :dist, :recv, :count]
12+
@event_send_bytes [:prom_ex, :plugin, :dist, :send, :bytes]
13+
@event_send_count [:prom_ex, :plugin, :dist, :send, :count]
14+
@event_send_pending_bytes [:prom_ex, :plugin, :dist, :send, :pending, :bytes]
15+
16+
@impl true
17+
def polling_metrics(opts) do
18+
poll_rate = Keyword.get(opts, :poll_rate)
19+
20+
[
21+
metrics(poll_rate)
22+
]
23+
end
24+
25+
defp metrics(poll_rate) do
26+
Polling.build(
27+
:realtime_vm_dist,
28+
poll_rate,
29+
{__MODULE__, :execute_metrics, []},
30+
[
31+
last_value(
32+
[:dist, :queue_size],
33+
event_name: @event_node_queue_size,
34+
description: "Number of bytes in the output distribution queue",
35+
measurement: :size,
36+
tags: [:origin_node, :target_node]
37+
),
38+
last_value(
39+
[:dist, :recv_bytes],
40+
event_name: @event_recv_bytes,
41+
description: "Number of bytes received by the socket.",
42+
measurement: :size,
43+
tags: [:origin_node, :target_node]
44+
),
45+
last_value(
46+
[:dist, :recv_count],
47+
event_name: @event_recv_count,
48+
description: "Number of packets received by the socket.",
49+
measurement: :size,
50+
tags: [:origin_node, :target_node]
51+
),
52+
last_value(
53+
[:dist, :send_bytes],
54+
event_name: @event_send_bytes,
55+
description: "Number of bytes sent by the socket.",
56+
measurement: :size,
57+
tags: [:origin_node, :target_node]
58+
),
59+
last_value(
60+
[:dist, :send_count],
61+
event_name: @event_send_count,
62+
description: "Number of packets sent by the socket.",
63+
measurement: :size,
64+
tags: [:origin_node, :target_node]
65+
),
66+
last_value(
67+
[:dist, :send_pending_bytes],
68+
event_name: @event_send_pending_bytes,
69+
description: "Number of bytes waiting to be sent by the socket.",
70+
measurement: :size,
71+
tags: [:origin_node, :target_node]
72+
)
73+
]
74+
)
75+
end
76+
77+
def execute_metrics do
78+
dist_info = DistributedMetrics.info()
79+
80+
Enum.each(dist_info, fn {node, info} ->
81+
execute_queue_size(node, info)
82+
execute_inet_stats(node, info)
83+
end)
84+
end
85+
86+
defp execute_inet_stats(node, info) do
87+
if stats = info[:inet_stats] do
88+
:telemetry.execute(@event_recv_bytes, %{size: stats[:recv_oct]}, %{origin_node: node(), target_node: node})
89+
:telemetry.execute(@event_recv_count, %{size: stats[:recv_cnt]}, %{origin_node: node(), target_node: node})
90+
91+
:telemetry.execute(@event_send_bytes, %{size: stats[:send_oct]}, %{origin_node: node(), target_node: node})
92+
:telemetry.execute(@event_send_count, %{size: stats[:send_cnt]}, %{origin_node: node(), target_node: node})
93+
94+
:telemetry.execute(@event_send_pending_bytes, %{size: stats[:send_pend]}, %{
95+
origin_node: node(),
96+
target_node: node
97+
})
98+
end
99+
end
100+
101+
defp execute_queue_size(node, info) do
102+
with {:ok, size} <- info[:queue_size] do
103+
:telemetry.execute(@event_node_queue_size, %{size: size}, %{origin_node: node(), target_node: node})
104+
end
105+
end
106+
end

mix.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
44
def project do
55
[
66
app: :realtime,
7-
version: "2.57.11",
7+
version: "2.58.0",
88
elixir: "~> 1.17.3",
99
elixirc_paths: elixirc_paths(Mix.env()),
1010
start_permanent: Mix.env() == :prod,
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
defmodule Realtime.DistributedMetricsTest do
2+
# Async false due to Clustered usage
3+
use ExUnit.Case, async: false
4+
5+
alias Realtime.DistributedMetrics
6+
7+
setup_all do
8+
{:ok, node} = Clustered.start()
9+
%{node: node}
10+
end
11+
12+
describe "info/0 while connected" do
13+
test "per node metric", %{node: node} do
14+
assert %{
15+
^node => %{
16+
pid: _pid,
17+
port: _port,
18+
queue_size: {:ok, 0},
19+
state: :up,
20+
inet_stats: [
21+
recv_oct: _,
22+
recv_cnt: _,
23+
recv_max: _,
24+
recv_avg: _,
25+
recv_dvi: _,
26+
send_oct: _,
27+
send_cnt: _,
28+
send_max: _,
29+
send_avg: _,
30+
send_pend: _
31+
]
32+
}
33+
} = DistributedMetrics.info() |> dbg()
34+
end
35+
end
36+
end
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
defmodule Realtime.PromEx.Plugins.DistributedTest do
2+
# Async false due to Clustered usage
3+
use ExUnit.Case, async: false
4+
alias Realtime.PromEx.Plugins
5+
6+
defmodule MetricsTest do
7+
use PromEx, otp_app: :metrics_test
8+
@impl true
9+
def plugins do
10+
[{Plugins.Distributed, poll_rate: 100}]
11+
end
12+
end
13+
14+
setup_all do
15+
{:ok, node} = Clustered.start()
16+
start_supervised!(MetricsTest)
17+
# Send some data back and forth
18+
25 = :erpc.call(node, String, :to_integer, ["25"])
19+
# Wait for MetricsTest to fetch metrics
20+
Process.sleep(200)
21+
%{node: node}
22+
end
23+
24+
describe "pooling metrics" do
25+
setup do
26+
metrics =
27+
PromEx.get_metrics(MetricsTest)
28+
|> String.split("\n", trim: true)
29+
30+
%{metrics: metrics}
31+
end
32+
33+
test "send_pending_bytes", %{metrics: metrics, node: node} do
34+
pattern = ~r/dist_send_pending_bytes{origin_node=\"#{node()}\",target_node=\"#{node}\"}\s(?<number>\d+)/
35+
assert metric_value(metrics, pattern) == 0
36+
end
37+
38+
test "send_count", %{metrics: metrics, node: node} do
39+
pattern = ~r/dist_send_count{origin_node=\"#{node()}\",target_node=\"#{node}\"}\s(?<number>\d+)/
40+
assert metric_value(metrics, pattern) > 0
41+
end
42+
43+
test "send_bytes", %{metrics: metrics, node: node} do
44+
pattern = ~r/dist_send_bytes{origin_node=\"#{node()}\",target_node=\"#{node}\"}\s(?<number>\d+)/
45+
assert metric_value(metrics, pattern) > 0
46+
end
47+
48+
test "recv_count", %{metrics: metrics, node: node} do
49+
pattern = ~r/dist_recv_count{origin_node=\"#{node()}\",target_node=\"#{node}\"}\s(?<number>\d+)/
50+
assert metric_value(metrics, pattern) > 0
51+
end
52+
53+
test "recv_bytes", %{metrics: metrics, node: node} do
54+
pattern = ~r/dist_recv_bytes{origin_node=\"#{node()}\",target_node=\"#{node}\"}\s(?<number>\d+)/
55+
assert metric_value(metrics, pattern) > 0
56+
end
57+
58+
test "queue_size", %{metrics: metrics, node: node} do
59+
pattern = ~r/dist_queue_size{origin_node=\"#{node()}\",target_node=\"#{node}\"}\s(?<number>\d+)/
60+
assert metric_value(metrics, pattern) == 0
61+
end
62+
end
63+
64+
defp metric_value(metrics, pattern) do
65+
metrics
66+
|> Enum.find_value(
67+
"0",
68+
fn item ->
69+
case Regex.run(pattern, item, capture: ["number"]) do
70+
[number] -> number
71+
_ -> false
72+
end
73+
end
74+
)
75+
|> String.to_integer()
76+
end
77+
end

0 commit comments

Comments
 (0)