Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,12 @@ If you're using the default tenant, the URL is `ws://realtime-dev.localhost:4000
| BROADCAST_POOL_SIZE | number | Number of processes to relay Phoenix.PubSub messages across the cluster |
| POSTGRES_CDC_SCOPE_SHARDS | number | Number of dynamic supervisor partitions used by the Postgres CDC extension. Defaults to 5. |
| USERS_SCOPE_SHARDS | number | Number of dynamic supervisor partitions used by the Users extension. Defaults to 5. |
| METRICS_PUSHER_ENABLED | boolean | Enable periodic push of Prometheus metrics. Defaults to 'false'. Requires METRICS_PUSHER_URL to be set. |
| METRICS_PUSHER_URL | string | Full URL endpoint to push metrics to (e.g., 'https://example.com/api/v1/import/prometheus'). Required when METRICS_PUSHER_ENABLED is 'true'. |
| METRICS_PUSHER_AUTH | string | Optional authorization header value for metrics pushes (e.g., 'Bearer token'). If not set, requests will be sent without authorization. Keep this secret if used. |
| METRICS_PUSHER_INTERVAL_MS | number | Interval in milliseconds between metrics pushes. Defaults to '30000' (30 seconds). |
| METRICS_PUSHER_TIMEOUT_MS | number | HTTP request timeout in milliseconds for metrics push operations. Defaults to '15000' (15 seconds). |
| METRICS_PUSHER_COMPRESS | boolean | Enable gzip compression for metrics payloads. Defaults to 'true'. |

The OpenTelemetry variables mentioned above are not an exhaustive list of all [supported environment variables](https://opentelemetry.io/docs/languages/sdk-configuration/).

Expand Down
14 changes: 13 additions & 1 deletion config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,12 @@ postgres_cdc_scope_shards = Env.get_integer("POSTGRES_CDC_SCOPE_SHARDS", 5)
regional_broadcasting = Env.get_boolean("REGIONAL_BROADCASTING", false)
no_channel_timeout_in_ms = Env.get_integer("NO_CHANNEL_TIMEOUT_IN_MS", :timer.minutes(10))
measure_traffic_interval_in_ms = Env.get_integer("MEASURE_TRAFFIC_INTERVAL_IN_MS", :timer.seconds(10))
metrics_pusher_enabled = Env.get_boolean("METRICS_PUSHER_ENABLED", false)
metrics_pusher_url = System.get_env("METRICS_PUSHER_URL")
metrics_pusher_auth = System.get_env("METRICS_PUSHER_AUTH")
metrics_pusher_interval_ms = Env.get_integer("METRICS_PUSHER_INTERVAL_MS", :timer.seconds(30))
metrics_pusher_timeout_ms = Env.get_integer("METRICS_PUSHER_TIMEOUT_MS", :timer.seconds(15))
metrics_pusher_compress = Env.get_boolean("METRICS_PUSHER_COMPRESS", true)

if !(db_version in [nil, "ipv6", "ipv4"]),
do: raise("Invalid IP version, please set either ipv6 or ipv4")
Expand Down Expand Up @@ -144,7 +150,13 @@ config :realtime,
master_region: master_region,
metrics_tags: metrics_tags,
measure_traffic_interval_in_ms: measure_traffic_interval_in_ms,
disable_healthcheck_logging: disable_healthcheck_logging
disable_healthcheck_logging: disable_healthcheck_logging,
metrics_pusher_enabled: metrics_pusher_enabled,
metrics_pusher_url: metrics_pusher_url,
metrics_pusher_auth: metrics_pusher_auth,
metrics_pusher_interval_ms: metrics_pusher_interval_ms,
metrics_pusher_timeout_ms: metrics_pusher_timeout_ms,
metrics_pusher_compress: metrics_pusher_compress

if config_env() != :test && run_janitor? do
config :realtime,
Expand Down
5 changes: 4 additions & 1 deletion config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@ config :realtime,
api_jwt_secret: System.get_env("API_JWT_SECRET", "secret"),
metrics_jwt_secret: "test",
prom_poll_rate: 5_000,
request_id_baggage_key: "sb-request-id"
request_id_baggage_key: "sb-request-id",
metrics_pusher_req_options: [
plug: {Req.Test, Realtime.MetricsPusher}
]

# Print nothing during tests unless captured or a test failure happens
config :logger,
Expand Down
10 changes: 9 additions & 1 deletion lib/realtime/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ defmodule Realtime.Application do
{RealtimeWeb.RealtimeChannel.Tracker, check_interval_in_ms: no_channel_timeout_in_ms},
RealtimeWeb.Endpoint,
RealtimeWeb.Presence
] ++ extensions_supervisors() ++ janitor_tasks()
] ++ extensions_supervisors() ++ janitor_tasks() ++ metrics_pusher_children()

database_connections = if master_region == region, do: [Realtime.Repo], else: [Replica.replica()]

Expand Down Expand Up @@ -156,6 +156,14 @@ defmodule Realtime.Application do
end
end

defp metrics_pusher_children do
if Application.get_env(:realtime, :metrics_pusher_enabled) do
[Realtime.MetricsPusher]
else
[]
end
end

defp opentelemetry_setup do
:opentelemetry_cowboy.setup()
OpentelemetryPhoenix.setup(adapter: :cowboy2)
Expand Down
130 changes: 130 additions & 0 deletions lib/realtime/metrics_pusher.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
defmodule Realtime.MetricsPusher do
@moduledoc """
GenServer that periodically pushes Prometheus metrics to an endpoint.

Only starts if `url` is configured.
Pushes metrics every 30 seconds (configurable) to the configured URL endpoint.
"""

use GenServer
require Logger

defstruct [:push_ref, :interval, :req_options]

@spec start_link(keyword()) :: {:ok, pid()} | :ignore
def start_link(opts) do
url = opts[:url] || Application.get_env(:realtime, :metrics_pusher_url)

if is_binary(url) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
else
Logger.warning("MetricsPusher not started: url must be configured")

:ignore
end
end

@impl true
def init(opts) do
url = opts[:url] || Application.get_env(:realtime, :metrics_pusher_url)
auth = opts[:auth] || Application.get_env(:realtime, :metrics_pusher_auth)

interval =
Keyword.get(
opts,
:interval,
Application.get_env(:realtime, :metrics_pusher_interval_ms, :timer.seconds(30))
)

timeout =
Keyword.get(
opts,
:timeout,
Application.get_env(:realtime, :metrics_pusher_timeout_ms, :timer.seconds(15))
)

compress =
Keyword.get(
opts,
:compress,
Application.get_env(:realtime, :metrics_pusher_compress, true)
)

Logger.info("Starting MetricsPusher (url: #{url}, interval: #{interval}ms, compress: #{compress})")

headers =
if auth do
[{"authorization", auth}, {"content-type", "text/plain"}]
else
[{"content-type", "text/plain"}]
end

req_options = [
method: :post,
url: url,
headers: headers,
compress_body: compress,
receive_timeout: timeout
]

req_options = Keyword.merge(req_options, Application.get_env(:realtime, :metrics_pusher_req_options, []))

state = %__MODULE__{push_ref: schedule_push(interval), interval: interval, req_options: req_options}

{:ok, state}
end

@impl true
def handle_info(:push, state) do
Process.cancel_timer(state.push_ref)

{exec_time, _} = :timer.tc(fn -> push(state.req_options) end, :millisecond)

if exec_time > :timer.seconds(5) do
Logger.warning("Metrics push took: #{exec_time} ms")
end

{:noreply, %{state | push_ref: schedule_push(state.interval)}}
end

def handle_info(msg, state) do
Logger.error("MetricsPusher received unexpected message: #{inspect(msg)}")
{:noreply, state}
end

defp schedule_push(delay), do: Process.send_after(self(), :push, delay)

defp push(req_options) do
try do
metrics = Realtime.PromEx.get_metrics()

case send_metrics(req_options, metrics) do
:ok ->
:ok

{:error, reason} ->
Logger.error("MetricsPusher: Failed to push metrics to #{req_options[:url]}: #{inspect(reason)}")
:ok
end
rescue
error ->
Logger.error("MetricsPusher: Exception during push: #{inspect(error)}")
:ok
end
end

defp send_metrics(req_options, metrics) do
[{:body, metrics} | req_options]
|> Req.request()
|> case do
{:ok, %{status: status}} when status in 200..299 ->
:ok

{:ok, %{status: status} = response} ->
{:error, {:http_error, status, response.body}}

{:error, reason} ->
{:error, reason}
end
end
end
4 changes: 2 additions & 2 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
def project do
[
app: :realtime,
version: "2.72.0",
version: "2.73.0",
elixir: "~> 1.18",
elixirc_paths: elixirc_paths(Mix.env()),
start_permanent: Mix.env() == :prod,
Expand Down Expand Up @@ -93,6 +93,7 @@ defmodule Realtime.MixProject do
{:opentelemetry_cowboy, "~> 1.0"},
{:opentelemetry_ecto, "~> 1.2"},
{:gen_rpc, git: "https://github.com/supabase/gen_rpc.git", ref: "5382a0f2689a4cb8838873a2173928281dbe5002"},
{:req, "~> 0.5"},
{:mimic, "~> 1.0", only: :test},
{:floki, ">= 0.30.0", only: :test},
{:mint_web_socket, "~> 1.0", only: :test},
Expand All @@ -104,7 +105,6 @@ defmodule Realtime.MixProject do
{:credo, "~> 1.7", only: [:dev, :test], runtime: false},
{:dialyxir, "~> 1.4", only: :dev, runtime: false},
{:poolboy, "~> 1.5", only: :test},
{:req, "~> 0.5", only: :test},
{:mix_test_watch, "~> 1.0", only: [:dev, :test], runtime: false}
]
end
Expand Down
156 changes: 156 additions & 0 deletions test/realtime/metrics_pusher_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
defmodule Realtime.MetricsPusherTest do
use Realtime.DataCase, async: true
import ExUnit.CaptureLog

alias Realtime.MetricsPusher
alias Plug.Conn

setup {Req.Test, :verify_on_exit!}

# Helper function to start MetricsPusher and allow it to use Req.Test
defp start_and_allow_pusher(opts) do
pid = start_supervised!({MetricsPusher, opts})
Req.Test.allow(MetricsPusher, self(), pid)
{:ok, pid}
end

describe "start_link/1" do
test "does not start when URL is missing" do
opts = [enabled: true]
assert :ignore = MetricsPusher.start_link(opts)
end

test "sends request successfully" do
opts = [
url: "https://example.com:8428/api/v1/import/prometheus",
auth: "Bearer token",
compress: true,
interval: 10,
timeout: 5000
]

parent = self()

Req.Test.expect(MetricsPusher, fn conn ->
body = Req.Test.raw_body(conn)
assert conn.method == "POST"
assert :zlib.gunzip(body) =~ "# HELP beam_stats_run_queue_count"
assert conn.scheme == :https
assert conn.host == "example.com"
assert conn.port == 8428
assert conn.request_path == "/api/v1/import/prometheus"
assert Conn.get_req_header(conn, "authorization") == ["Bearer token"]
assert Conn.get_req_header(conn, "content-encoding") == ["gzip"]
assert Conn.get_req_header(conn, "content-type") == ["text/plain"]

send(parent, :req_called)
Req.Test.text(conn, "")
end)

{:ok, _pid} = start_and_allow_pusher(opts)
assert_receive :req_called, 100
end

test "sends request successfully without auth header" do
opts = [
url: "http://localhost:8428/api/v1/import/prometheus",
compress: true,
interval: 10,
timeout: 5000
]

parent = self()

Req.Test.expect(MetricsPusher, fn conn ->
body = Req.Test.raw_body(conn)
assert :zlib.gunzip(body) =~ "# HELP beam_stats_run_queue_count"
assert Conn.get_req_header(conn, "authorization") == []

send(parent, :req_called)
Req.Test.text(conn, "")
end)

{:ok, _pid} = start_and_allow_pusher(opts)
assert_receive :req_called, 100
end

test "sends request body untouched when compress=false" do
opts = [
url: "http://localhost:8428/api/v1/import/prometheus",
auth: "Bearer token",
compress: false,
interval: 10,
timeout: 5000
]

parent = self()

Req.Test.expect(MetricsPusher, fn conn ->
body = Req.Test.raw_body(conn)
assert body =~ "# HELP beam_stats_run_queue_count"
assert Conn.get_req_header(conn, "content-encoding") == []
assert Conn.get_req_header(conn, "content-type") == ["text/plain"]

send(parent, :req_called)
Req.Test.text(conn, "")
end)

{:ok, _pid} = start_and_allow_pusher(opts)
assert_receive :req_called, 100
end

test "when request receives non 2XX response" do
opts = [
url: "https://example.com:8428/api/v1/import/prometheus",
auth: "Bearer token",
compress: true,
interval: 10,
timeout: 5000
]

parent = self()

log =
capture_log(fn ->
Req.Test.expect(MetricsPusher, fn conn ->
send(parent, :req_called)
Conn.send_resp(conn, 500, "")
end)

{:ok, pid} = start_and_allow_pusher(opts)
assert_receive :req_called, 100
assert Process.alive?(pid)
# Wait enough for the log to be captured
Process.sleep(100)
end)

assert log =~ "MetricsPusher: Failed to push metrics to"
end

test "when an error is raised" do
opts = [
url: "https://example.com:8428/api/v1/import/prometheus",
interval: 10,
timeout: 5000
]

parent = self()

log =
capture_log(fn ->
Req.Test.expect(MetricsPusher, fn conn ->
send(parent, :req_called)
raise RuntimeError, "unexpected error"
end)

{:ok, pid} = start_and_allow_pusher(opts)
assert_receive :req_called, 100
assert Process.alive?(pid)
# Wait enough for the log to be captured
Process.sleep(100)
end)

assert log =~ "MetricsPusher: Exception during push: %RuntimeError{message: \"unexpected error\"}"
end
end
end