Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ janitor_children_timeout = Env.get_integer("JANITOR_CHILDREN_TIMEOUT", :timer.se
janitor_schedule_timer = Env.get_integer("JANITOR_SCHEDULE_TIMER_IN_MS", :timer.hours(4))
platform = if System.get_env("AWS_EXECUTION_ENV") == "AWS_ECS_FARGATE", do: :aws, else: :fly
broadcast_pool_size = Env.get_integer("BROADCAST_POOL_SIZE", 10)
pubsub_adapter = System.get_env("PUBSUB_ADAPTER", "pg2") |> String.to_atom()
pubsub_adapter = System.get_env("PUBSUB_ADAPTER", "gen_rpc") |> String.to_atom()
websocket_max_heap_size = div(Env.get_integer("WEBSOCKET_MAX_HEAP_SIZE", 50_000_000), :erlang.system_info(:wordsize))

no_channel_timeout_in_ms =
Expand Down
5 changes: 4 additions & 1 deletion lib/realtime/gen_rpc/pub_sub.ex
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,10 @@ defmodule Realtime.GenRpcPubSub.Worker do
def start_link({pubsub, worker}), do: GenServer.start_link(__MODULE__, pubsub, name: worker)

@impl true
def init(pubsub), do: {:ok, pubsub}
def init(pubsub) do
Process.flag(:message_queue_data, :off_heap)
{:ok, pubsub}
end

@impl true
def handle_info({:ftl, topic, message, dispatcher}, pubsub) do
Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
def project do
[
app: :realtime,
version: "2.51.5",
version: "2.51.6",
elixir: "~> 1.17.3",
elixirc_paths: elixirc_paths(Mix.env()),
start_permanent: Mix.env() == :prod,
Expand Down
10 changes: 10 additions & 0 deletions test/realtime/gen_rpc_pub_sub_test.exs
Original file line number Diff line number Diff line change
@@ -1,2 +1,12 @@
Application.put_env(:phoenix_pubsub, :test_adapter, {Realtime.GenRpcPubSub, []})
Code.require_file("../../deps/phoenix_pubsub/test/shared/pubsub_test.exs", __DIR__)

defmodule Realtime.GenRpcPubSubTest do
use ExUnit.Case, async: true

test "it sets off_heap message_queue_data flag on the workers" do
assert Realtime.PubSubElixir.Realtime.PubSub.Adapter_1
|> Process.whereis()
|> Process.info(:message_queue_data) == {:message_queue_data, :off_heap}
end
end
Loading