Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/realtime/gen_rpc/pub_sub.ex
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ defmodule Realtime.GenRpcPubSub.Worker do
@impl true
def init(pubsub) do
Process.flag(:message_queue_data, :off_heap)
Process.flag(:fullsweep_after, 1000)
Process.flag(:fullsweep_after, 100)
{:ok, pubsub}
end

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ defmodule RealtimeWeb.RealtimeChannel.PresenceHandler do
alias Phoenix.Tracker.Shard
alias Realtime.GenCounter
alias Realtime.RateCounter
alias Realtime.Tenants
# alias Realtime.Tenants
alias Realtime.Tenants.Authorization
alias RealtimeWeb.Presence
alias RealtimeWeb.RealtimeChannel.Logging
Expand Down
43 changes: 43 additions & 0 deletions lib/realtime_web/channels/tenant_rate_limiters.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
defmodule RealtimeWeb.TenantRateLimiters do
@moduledoc """
Rate limiters for tenants.
"""
require Logger
alias Realtime.UsersCounter
alias Realtime.Tenants
alias Realtime.RateCounter
alias Realtime.Api.Tenant

@spec check_tenant(Realtime.Api.Tenant.t()) :: :ok | {:error, :too_many_connections | :too_many_joins}
def check_tenant(tenant) do
with :ok <- max_concurrent_users_check(tenant) do
max_joins_per_second_check(tenant)
end
end

defp max_concurrent_users_check(%Tenant{max_concurrent_users: max_conn_users, external_id: external_id}) do
total_conn_users = UsersCounter.tenant_users(external_id)

if total_conn_users < max_conn_users,
do: :ok,
else: {:error, :too_many_connections}
end

defp max_joins_per_second_check(%Tenant{max_joins_per_second: max_joins_per_second} = tenant) do
rate_args = Tenants.joins_per_second_rate(tenant.external_id, max_joins_per_second)

RateCounter.new(rate_args)

case RateCounter.get(rate_args) do
{:ok, %{limit: %{triggered: false}}} ->
:ok

{:ok, %{limit: %{triggered: true}}} ->
{:error, :too_many_joins}

error ->
Logger.error("UnknownErrorOnCounter: #{inspect(error)}")
{:error, error}
end
end
end
12 changes: 12 additions & 0 deletions lib/realtime_web/channels/user_socket.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ defmodule RealtimeWeb.UserSocket do
alias Realtime.PostgresCdc
alias Realtime.Tenants

alias RealtimeWeb.TenantRateLimiters
alias RealtimeWeb.ChannelsAuthorization
alias RealtimeWeb.RealtimeChannel
alias RealtimeWeb.RealtimeChannel.Logging
Expand Down Expand Up @@ -56,6 +57,7 @@ defmodule RealtimeWeb.UserSocket do
token when is_binary(token) <- token,
jwt_secret_dec <- Crypto.decrypt!(jwt_secret),
{:ok, claims} <- ChannelsAuthorization.authorize_conn(token, jwt_secret_dec, jwt_jwks),
:ok <- TenantRateLimiters.check_tenant(tenant),
{:ok, postgres_cdc_module} <- PostgresCdc.driver(postgres_cdc_default) do
%Tenant{
extensions: extensions,
Expand Down Expand Up @@ -111,6 +113,16 @@ defmodule RealtimeWeb.UserSocket do
log_error("MalformedJWT", "The token provided is not a valid JWT")
{:error, :token_malformed}

{:error, :too_many_connections} ->
msg = "Too many connected users"
Logging.log_error(socket, "ConnectionRateLimitReached", msg)
{:error, :too_many_connections}

{:error, :too_many_joins} ->
msg = "Too many joins per second"
Logging.log_error(socket, "JoinsRateLimitReached", msg)
{:error, :too_many_joins}

error ->
log_error("ErrorConnectingToWebsocket", error)
error
Expand Down
2 changes: 1 addition & 1 deletion lib/realtime_web/endpoint.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ defmodule RealtimeWeb.Endpoint do
websocket: [
connect_info: [:peer_data, :uri, :x_headers],
fullsweep_after: 20,
max_frame_size: 8_000_000,
max_frame_size: 5_000_000,
# https://github.com/ninenines/cowboy/blob/24d32de931a0c985ff7939077463fc8be939f0e9/doc/src/manual/cowboy_websocket.asciidoc#L228
# active_n: The number of packets Cowboy will request from the socket at once.
# This can be used to tweak the performance of the server. Higher values reduce
Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
def project do
[
app: :realtime,
version: "2.51.13",
version: "2.51.14",
elixir: "~> 1.17.3",
elixirc_paths: elixirc_paths(Mix.env()),
start_permanent: Mix.env() == :prod,
Expand Down
2 changes: 1 addition & 1 deletion test/realtime/gen_rpc_pub_sub_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,6 @@ defmodule Realtime.GenRpcPubSubTest do
test "it sets fullsweep_after flag on the workers" do
assert Realtime.PubSubElixir.Realtime.PubSub.Adapter_1
|> Process.whereis()
|> Process.info(:fullsweep_after) == {:fullsweep_after, 1000}
|> Process.info(:fullsweep_after) == {:fullsweep_after, 100}
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@
def check(_socket), do: false
end

test "returns true if the socket is a private channel and the presence write policy is true", %{tenant: tenant} do

Check failure on line 79 in test/realtime_web/channels/realtime_channel/presence_handler_test.exs

View workflow job for this annotation

GitHub Actions / Tests

test can_write_presence?/1 returns true if the socket is a private channel and the presence write policy is true (RealtimeWeb.RealtimeChannel.PresenceHandlerTest)
policies = %Policies{presence: %PresencePolicies{write: true}}
socket = socket_fixture(tenant, random_string(), random_string(), policies: policies, private?: true)
assert TestCanWritePresence.check(socket)
Expand Down Expand Up @@ -434,6 +434,7 @@
assert log =~ "PresenceRateLimitReached"
end

@tag :skip
@tag policies: [:authenticated_read_broadcast_and_presence, :authenticated_write_broadcast_and_presence]
test "respects rate limits on private channels", %{tenant: tenant, topic: topic, db_conn: db_conn} do
key = random_string()
Expand Down
31 changes: 31 additions & 0 deletions test/realtime_web/channels/tenant_rate_limiters_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
defmodule RealtimeWeb.TenantRateLimitersTest do
use Realtime.DataCase, async: true

use Mimic
alias RealtimeWeb.TenantRateLimiters
alias Realtime.Api.Tenant

setup do
tenant = %Tenant{external_id: random_string(), max_concurrent_users: 1, max_joins_per_second: 1}

%{tenant: tenant}
end

describe "check_tenant/1" do
test "rate is not exceeded", %{tenant: tenant} do
assert TenantRateLimiters.check_tenant(tenant) == :ok
end

test "max concurrent users is exceeded", %{tenant: tenant} do
Realtime.UsersCounter.add(self(), tenant.external_id)

assert TenantRateLimiters.check_tenant(tenant) == {:error, :too_many_connections}
end

test "max joins is exceeded", %{tenant: tenant} do
expect(Realtime.RateCounter, :get, fn _ -> {:ok, %{limit: %{triggered: true}}} end)

assert TenantRateLimiters.check_tenant(tenant) == {:error, :too_many_joins}
end
end
end
Loading