Skip to content

Commit

Permalink
✨ Use a rich subject pattern for NATS
Browse files Browse the repository at this point in the history
  • Loading branch information
acco committed Dec 16, 2024
1 parent 0181413 commit 698d0f6
Show file tree
Hide file tree
Showing 11 changed files with 24 additions and 42 deletions.
10 changes: 0 additions & 10 deletions assets/svelte/components/SinkCardNats.svelte
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,6 @@
>
</div>
</div>

<div>
<span class="text-sm text-gray-500">Subject</span>
<div class="mt-2">
<span
class="font-mono bg-slate-50 pl-1 pr-4 py-1 border border-slate-100 rounded-md whitespace-nowrap"
>{consumer.sink.subject}</span
>
</div>
</div>
</div>
</CardContent>
</Card>
12 changes: 0 additions & 12 deletions assets/svelte/consumers/SinkNatsForm.svelte
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,5 @@
<p class="text-destructive text-sm">{errors.sink.port}</p>
{/if}
</div>

<div class="space-y-2">
<Label for="subject">Subject</Label>
<Input
id="subject"
bind:value={form.sink.subject}
placeholder="my-stream"
/>
{#if errors.sink?.subject}
<p class="text-destructive text-sm">{errors.sink.subject}</p>
{/if}
</div>
</CardContent>
</Card>
Binary file modified docs/images/quickstart/nats/nats-box-messages.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified docs/images/quickstart/nats/nats-config-card.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
3 changes: 1 addition & 2 deletions docs/quickstart/nats.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ By the end, you'll have hands-on experience setting up Postgres change data capt

```bash
docker run --rm -it --network host natsio/nats-box:latest
nats sub products.events
nats sub "sequin.>"
```

Keep this terminal window open - you'll see messages start flowing once we create the sink.
Expand Down Expand Up @@ -74,7 +74,6 @@ By the end, you'll have hands-on experience setting up Postgres change data capt

- Host: If running NATS and Sequin locally with Docker, use `host.docker.internal`
- Port: The port NATS is listening on (default: 4222)
- Subject: The NATS subject to publish to (e.g., `products.events`)

<Frame>
<img style={{ maxWidth: '700px' }} src="/images/quickstart/nats/nats-config-card.png" alt="NATS configuration card" />
Expand Down
9 changes: 3 additions & 6 deletions lib/sequin/consumers/nats_sink.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,23 @@ defmodule Sequin.Consumers.NatsSink do

import Ecto.Changeset

@derive {Jason.Encoder, only: [:host, :port, :subject]}
@derive {Jason.Encoder, only: [:host, :port]}
@primary_key false
typed_embedded_schema do
field :type, Ecto.Enum, values: [:nats], default: :nats
field :host, :string
field :port, :integer
field :subject, :string
field :connection_id, :string
end

def changeset(struct, params) do
struct
|> cast(params, [
:host,
:port,
:subject
:port
])
|> validate_required([:host, :port, :subject])
|> validate_required([:host, :port])
|> validate_number(:port, greater_than: 0, less_than: 65_536)
|> validate_length(:subject, max: 255)
|> put_connection_id()
end

Expand Down
19 changes: 16 additions & 3 deletions lib/sequin/nats/client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ defmodule Sequin.Nats.Client do
@behaviour Sequin.Nats

alias Sequin.Consumers.ConsumerEvent
alias Sequin.Consumers.ConsumerEventData
alias Sequin.Consumers.ConsumerRecord
alias Sequin.Consumers.ConsumerRecordData
alias Sequin.Consumers.NatsSink
alias Sequin.Error
alias Sequin.Nats.ConnectionCache
Expand All @@ -13,7 +15,7 @@ defmodule Sequin.Nats.Client do
def send_messages(%NatsSink{} = sink, messages) when is_list(messages) do
with {:ok, connection} <- ConnectionCache.connection(sink) do
Enum.reduce_while(messages, :ok, fn message, :ok ->
case publish_message(message, connection, sink) do
case publish_message(message, connection) do
:ok ->
{:cont, :ok}

Expand Down Expand Up @@ -52,18 +54,29 @@ defmodule Sequin.Nats.Client do
{:error, to_sequin_error(error)}
end

defp publish_message(message, connection, sink) do
defp publish_message(message, connection) do
opts = [headers: get_headers(message)]
payload = to_payload(message)
subject = subject(message)

try do
Gnat.pub(connection, sink.subject, Jason.encode_to_iodata!(payload), opts)
Gnat.pub(connection, subject, Jason.encode_to_iodata!(payload), opts)
catch
error ->
{:error, to_sequin_error(error)}
end
end

defp subject(%ConsumerEvent{data: %ConsumerEventData{} = data}) do
%{metadata: %{database_name: database_name, table_schema: table_schema, table_name: table_name}} = data
"sequin.changes.#{database_name}.#{table_schema}.#{table_name}.#{data.action}"
end

defp subject(%ConsumerRecord{data: %ConsumerRecordData{} = data}) do
%{metadata: %{database_name: database_name, table_schema: table_schema, table_name: table_name}} = data
"sequin.rows.#{database_name}.#{table_schema}.#{table_name}"
end

defp to_sequin_error(error) do
case error do
error when is_binary(error) ->
Expand Down
6 changes: 2 additions & 4 deletions lib/sequin_web/live/components/consumer_form.ex
Original file line number Diff line number Diff line change
Expand Up @@ -489,8 +489,7 @@ defmodule SequinWeb.Components.ConsumerForm do
%{
"type" => "nats",
"host" => sink["host"],
"port" => sink["port"],
"subject" => sink["subject"]
"port" => sink["port"]
}
end

Expand Down Expand Up @@ -622,8 +621,7 @@ defmodule SequinWeb.Components.ConsumerForm do
%{
"type" => "nats",
"host" => sink.host,
"port" => sink.port,
"subject" => sink.subject
"port" => sink.port
}
end

Expand Down
1 change: 0 additions & 1 deletion lib/sequin_web/live/sink_consumers/show.ex
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,6 @@ defmodule SequinWeb.SinkConsumersLive.Show do
type: :nats,
host: sink.host,
port: sink.port,
subject: sink.subject,
connection_id: sink.connection_id
}
end
Expand Down
3 changes: 1 addition & 2 deletions test/sequin/nats_pipeline_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ defmodule Sequin.ConsumersRuntime.NatsPipelineTest do
sink: %{
type: :nats,
host: "localhost",
port: 4222,
subject: "test.subject"
port: 4222
}
)

Expand Down
3 changes: 1 addition & 2 deletions test/support/factory/consumers_factory.ex
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,7 @@ defmodule Sequin.Factory.ConsumersFactory do
%NatsSink{
type: :nats,
host: "localhost",
port: 4222,
subject: Factory.word()
port: 4222
},
attrs
)
Expand Down

0 comments on commit 698d0f6

Please sign in to comment.