Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 108 additions & 1 deletion lib/mix/tasks/bench.ex
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ defmodule Mix.Tasks.Bench do
* `--partition-count` - Number of partitions (default: schedulers_online)
* `--max-messages` - Maximum messages to generate (default: unlimited, mutually exclusive with --duration)
* `--through` - Pipeline stage to run through: "full", "reorder_buffer", or "sps" (default: "full")
* `--profile` - Enable per-stage pipeline profiling (default: false)
* `--tprof` - Enable tprof call_time profiling of Sequin.* modules (default: false)
* `--tprof-limit` - Number of top functions to show in tprof report (default: 60)

## Examples

Expand All @@ -37,11 +40,18 @@ defmodule Mix.Tasks.Bench do

# Run through SlotProcessorServer (includes message handler, stops before Broadway)
mix benchmark --through sps

# Run with per-stage pipeline profiling
mix benchmark --duration 30 --profile

# Run with tprof profiling to see which functions dominate wall time
mix benchmark --duration 30 --tprof
"""
use Mix.Task

alias Sequin.Accounts
alias Sequin.Benchmark.MessageHandler, as: BenchmarkMessageHandler
alias Sequin.Benchmark.Profiler
alias Sequin.Benchmark.Stats
alias Sequin.Consumers
alias Sequin.Databases
Expand Down Expand Up @@ -80,7 +90,10 @@ defmodule Mix.Tasks.Bench do
pk_collision_rate: :float,
partition_count: :integer,
max_messages: :integer,
through: :string
through: :string,
profile: :boolean,
tprof: :boolean,
tprof_limit: :integer
]
)

Expand All @@ -91,6 +104,9 @@ defmodule Mix.Tasks.Bench do
partition_count = Keyword.get(opts, :partition_count, @default_partition_count)
max_messages = Keyword.get(opts, :max_messages)
through = opts |> Keyword.get(:through, "full") |> String.to_existing_atom()
profile? = Keyword.get(opts, :profile, false)
tprof? = Keyword.get(opts, :tprof, false)
tprof_limit = Keyword.get(opts, :tprof_limit, 60)

if max_messages && duration_opt do
Mix.raise("--duration and --max-messages are mutually exclusive")
Expand All @@ -106,6 +122,14 @@ defmodule Mix.Tasks.Bench do
# Start the application
Mix.Task.run("app.start")

# Initialize profiler if requested
if profile?, do: Profiler.init()

# Start tprof if requested
if tprof? do
start_tprof()
end

announce("#{@bold}=== Sequin Pipeline Benchmark ===#{@reset}", @cyan)
IO.puts("")

Expand All @@ -117,6 +141,8 @@ defmodule Mix.Tasks.Bench do
IO.puts(" Partition count: #{partition_count}")
IO.puts(" Max messages: #{max_messages || "unlimited"}")
IO.puts(" Through: #{through}")
IO.puts(" profile: #{profile?}")
IO.puts(" tprof: #{tprof?}")
IO.puts("")

# Setup replication slot
Expand Down Expand Up @@ -321,10 +347,86 @@ defmodule Mix.Tasks.Bench do
pipeline_tracked
)

# Print profiling report
if profile? do
report = Profiler.report()

if Enum.any?(report) do
Profiler.format_report(report)
end
end

# Print tprof report if enabled
if tprof? do
stop_and_report_tprof(tprof_limit)
end

# Cleanup
if profile?, do: Profiler.cleanup()
cleanup_entities(consumer, replication)
end

defp start_tprof do
:tprof.start(%{type: :call_time})
:tprof.enable_trace(:all)

for {mod, _} <- :code.all_loaded(),
mod_str = Atom.to_string(mod),
String.starts_with?(mod_str, "Elixir.Sequin.") do
:tprof.set_pattern(mod, :_, :_)
end

announce("tprof profiling enabled (tracing Sequin.* modules)", @yellow)
end

defp stop_and_report_tprof(limit) do
:tprof.disable_trace(:all)
{:call_time, raw} = :tprof.collect()
:tprof.stop()

grand_total =
raw
|> Enum.map(fn {_, _, _, pid_data} ->
pid_data |> Enum.map(fn {_, _, time} -> time end) |> Enum.sum()
end)
|> Enum.sum()

rows =
raw
|> Enum.map(fn {mod, fun, arity, pid_data} ->
total_calls = pid_data |> Enum.map(fn {_, calls, _} -> calls end) |> Enum.sum()
total_time = pid_data |> Enum.map(fn {_, _, time} -> time end) |> Enum.sum()
{mod, fun, arity, total_calls, total_time}
end)
|> Enum.sort_by(fn {_, _, _, _, time} -> time end, :desc)
|> Enum.take(limit)

IO.puts("")
announce("#{@bold}tprof Call Time Profile (top #{limit} functions):#{@reset}", @cyan)
IO.puts("")

header =
" #{String.pad_trailing("FUNCTION", 70)} #{String.pad_leading("CALLS", 12)} #{String.pad_leading("TIME (ms)", 12)} #{String.pad_leading("% TOTAL", 9)}"

IO.puts(header)
IO.puts(" #{String.duplicate("-", 105)}")

Enum.each(rows, fn {mod, fun, arity, calls, time_us} ->
mfa = "#{inspect(mod)}.#{fun}/#{arity}"
time_ms = Float.round(time_us / 1000, 1)
pct = if grand_total > 0, do: Float.round(time_us / grand_total * 100, 1), else: 0.0

IO.puts(
" #{String.pad_trailing(mfa, 70)} #{String.pad_leading(format_number(calls), 12)} #{String.pad_leading(:erlang.float_to_binary(time_ms, decimals: 1), 12)} #{String.pad_leading(:erlang.float_to_binary(pct, decimals: 1) <> "%", 9)}"
)
end)

IO.puts("")
total_ms = Float.round(grand_total / 1000, 1)
IO.puts(" Total traced time: #{total_ms}ms")
IO.puts("")
end

defp cleanup_entities(_consumer, replication) do
{:ok, database} = Databases.get_db(replication.postgres_database_id)
{:ok, account} = Accounts.get_account(database.account_id)
Expand Down Expand Up @@ -448,6 +550,11 @@ defmodule Mix.Tasks.Bench do
byte_size: msg.byte_size,
created_at_us: extract_created_at(msg.message.fields)
})

if Profiler.enabled?() do
Profiler.checkpoint(msg.message.commit_lsn, msg.message.commit_idx, :sink_in)
Profiler.finalize_message(msg.message.commit_lsn, msg.message.commit_idx)
end
end)
end)

Expand Down
8 changes: 8 additions & 0 deletions lib/sequin/benchmark/message_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ defmodule Sequin.Benchmark.MessageHandler do

@behaviour Sequin.Runtime.MessageHandler

alias Sequin.Benchmark.Profiler
alias Sequin.Benchmark.Stats
alias Sequin.Runtime.SlotProcessor.Message

Expand Down Expand Up @@ -49,6 +50,13 @@ defmodule Sequin.Benchmark.MessageHandler do
end)
end)

if Profiler.enabled?() do
Enum.each(messages, fn %Message{} = msg ->
Profiler.checkpoint(msg.commit_lsn, msg.commit_idx, :sink_in)
Profiler.finalize_message(msg.commit_lsn, msg.commit_idx)
end)
end

{:ok, length(messages)}
end

Expand Down
Loading
Loading