Skip to content

Commit

Permalink
✨ backfill consumers
Browse files Browse the repository at this point in the history
  • Loading branch information
RTLS committed Jul 9, 2024
1 parent 3e15909 commit 6710eb4
Show file tree
Hide file tree
Showing 22 changed files with 505 additions and 87 deletions.
4 changes: 4 additions & 0 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ config :phoenix, :json_library, Jason
#
# For production it's recommended to configure a different adapter
# at the `config/runtime.exs`.
config :sequin, Oban,
queues: [default: 10],
repo: Sequin.Repo

config :sequin, Sequin.Mailer, adapter: Swoosh.Adapters.Local

config :sequin, Sequin.Repo,
Expand Down
1 change: 1 addition & 0 deletions config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ config :phoenix, :plug_init_mode, :runtime
config :phoenix_live_view,
enable_expensive_runtime_checks: true

config :sequin, Oban, testing: :manual
config :sequin, Sequin.Mailer, adapter: Swoosh.Adapters.Test

config :sequin, Sequin.Repo,
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
services:
postgres:
image: "hbontempo/postgres-hll@sha256:356274403972d6c12fb3a77b5f6f7a0e87c3fb1113f298e5509d8e2051711730"
command: postgres -c shared_preload_libraries=pg_stat_statements -c pg_stat_statements.track=all -c max_connections=200 -c pg_stat_statements.max=10000 -c track_activity_query_size=2048
command: postgres -c shared_preload_libraries=pg_stat_statements -c pg_stat_statements.track=all -c max_connections=200 -c pg_stat_statements.max=10000 -c track_activity_query_size=2048 -c wal_level=logical
ports:
- "5432:5432"
environment:
Expand Down
3 changes: 2 additions & 1 deletion lib/sequin/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ defmodule Sequin.Application do
# Start to serve requests, typically the last entry
SequinWeb.Endpoint,
{Task.Supervisor, name: Sequin.TaskSupervisor},
{ConCache, name: Sequin.Cache, ttl_check_interval: :timer.seconds(1), global_ttl: :infinity}
{ConCache, name: Sequin.Cache, ttl_check_interval: :timer.seconds(1), global_ttl: :infinity},
{Oban, Application.fetch_env!(:sequin, Oban)}
]
end

Expand Down
26 changes: 24 additions & 2 deletions lib/sequin/streams/consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ defmodule Sequin.Streams.Consumer do
import Ecto.Changeset
import Ecto.Query

alias __MODULE__
alias Sequin.Accounts.Account
alias Sequin.Streams.Stream

Expand All @@ -23,6 +24,7 @@ defmodule Sequin.Streams.Consumer do
]}
typed_schema "consumers" do
field :slug, :string
field :backfill_completed_at, :utc_datetime_usec
field :ack_wait_ms, :integer, default: 30_000
field :max_ack_pending, :integer, default: 10_000
field :max_deliver, :integer
Expand All @@ -37,13 +39,22 @@ defmodule Sequin.Streams.Consumer do

def create_changeset(consumer, attrs) do
consumer
|> cast(attrs, [:stream_id, :ack_wait_ms, :max_ack_pending, :max_deliver, :max_waiting, :slug, :filter_subject])
|> cast(attrs, [
:stream_id,
:ack_wait_ms,
:max_ack_pending,
:max_deliver,
:max_waiting,
:slug,
:filter_subject,
:backfill_completed_at
])
|> validate_required([:stream_id, :slug, :filter_subject])
|> foreign_key_constraint(:stream_id)
end

def update_changeset(consumer, attrs) do
cast(consumer, attrs, [:ack_wait_ms, :max_ack_pending, :max_deliver, :max_waiting])
cast(consumer, attrs, [:ack_wait_ms, :max_ack_pending, :max_deliver, :max_waiting, :backfill_completed_at])
end

def where_account_id(query \\ base_query(), account_id) do
Expand Down Expand Up @@ -93,4 +104,15 @@ defmodule Sequin.Streams.Consumer do
filter_token == "*" or filter_token == subject_token
end)
end

@backfill_completed_at_threshold :timer.minutes(5)
def should_delete_acked_messages?(consumer, now \\ DateTime.utc_now())

def should_delete_acked_messages?(%Consumer{backfill_completed_at: nil}, _now), do: false

def should_delete_acked_messages?(%Consumer{backfill_completed_at: backfill_completed_at}, now) do
backfill_completed_at
|> DateTime.add(@backfill_completed_at_threshold, :millisecond)
|> DateTime.compare(now) == :lt
end
end
76 changes: 76 additions & 0 deletions lib/sequin/streams/consumer_backfill_worker.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
defmodule Sequin.Streams.ConsumerBackfillWorker do
@moduledoc false
use Oban.Worker

alias Sequin.Streams
alias Sequin.Streams.Consumer
alias Sequin.Streams.ConsumerMessage

require Logger

@limit 10_000

def create(consumer_id, seq \\ 0)

def create(consumer_id, seq) do
%{consumer_id: consumer_id, seq: seq}
|> new()
|> Oban.insert()
end

@impl Oban.Worker
def perform(%Oban.Job{args: %{"consumer_id" => consumer_id, "seq" => seq}}) do
consumer = Streams.get_consumer!(consumer_id)

if Consumer.should_delete_acked_messages?(consumer) do
delete_acked_messages(consumer)
else
backfill_messages(consumer, seq)
end
end

def backfill_messages(consumer, seq) do
messages =
Streams.list_messages_for_stream(consumer.stream_id,
seq_gt: seq,
limit: @limit,
order_by: [asc: :seq],
select: [:subject, :seq]
)

{:ok, _} =
messages
|> Enum.filter(fn message ->
Consumer.filter_matches_subject?(consumer.filter_subject, message.subject)
end)
|> Enum.map(fn message ->
%ConsumerMessage{
consumer_id: consumer.id,
message_subject: message.subject,
message_seq: message.seq
}
end)
|> Streams.upsert_consumer_messages()

case messages do
messages when length(messages) < @limit ->
{:ok, _} = Streams.update_consumer_with_lifecycle(consumer, %{backfill_completed_at: DateTime.utc_now()})
create(consumer.id, nil)

_ ->
next_seq = Enum.max_by(messages, fn message -> message.seq end).seq
create(consumer.id, next_seq)
end
end

defp delete_acked_messages(consumer) do
case Streams.delete_acked_consumer_messages_for_consumer(consumer.id, @limit) do
{0, nil} ->
:ok

{count, nil} ->
Logger.info("Deleted #{count} acked messages for consumer #{consumer.id}")
create(consumer.id, nil)
end
end
end
2 changes: 1 addition & 1 deletion lib/sequin/streams/consumer_message.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ defmodule Sequin.Streams.ConsumerMessage do
field :last_delivered_at, :utc_datetime_usec
field :message_seq, :integer
field :not_visible_until, :utc_datetime_usec
field :state, Ecto.Enum, values: [:delivered, :available, :pending_redelivery]
field :state, Ecto.Enum, values: [:acked, :available, :delivered, :pending_redelivery]

timestamps(type: :utc_datetime_usec)
end
Expand Down
9 changes: 9 additions & 0 deletions lib/sequin/streams/message.ex
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,15 @@ defmodule Sequin.Streams.Message do
)
end

@spec where_state(Ecto.Query.t(), atom()) :: Ecto.Query.t()
def where_state(query \\ base_query(), state) do
from([message: m] in query, where: m.state == ^state)
end

def where_seq_gt(query \\ base_query(), seq) do
from([message: m] in query, where: m.seq > ^seq)
end

defp base_query(query \\ __MODULE__) do
from(m in query, as: :message)
end
Expand Down
17 changes: 0 additions & 17 deletions lib/sequin/streams/outstanding_message_count.ex

This file was deleted.

4 changes: 3 additions & 1 deletion lib/sequin/streams/query/upsert_consumer_messages.sql
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ FROM input_data
ON CONFLICT (consumer_id, message_subject)
DO UPDATE SET
state = CASE streams.consumer_messages.state
WHEN 'acked'::streams.consumer_message_state THEN 'available'::streams.consumer_message_state
WHEN 'available'::streams.consumer_message_state THEN 'available'::streams.consumer_message_state
WHEN 'delivered'::streams.consumer_message_state THEN 'pending_redelivery'::streams.consumer_message_state
WHEN 'pending_redelivery'::streams.consumer_message_state THEN 'pending_redelivery'::streams.consumer_message_state
END,
message_seq = EXCLUDED.message_seq,
updated_at = NOW()
updated_at = NOW()
WHERE EXCLUDED.message_seq > streams.consumer_messages.message_seq
Loading

0 comments on commit 6710eb4

Please sign in to comment.