Simple wrapper to consume WAL entries from your Postgres Database. It offers an abstraction on top of Postgrex to simplify WAL consumption.
It also offers a decoder in PostgresReplication.Decoder based on https://github.com/cainophile/cainophile
Provide the options to connect to the database and the message handler module. As an example present in the examples folder here's how you can track events on all tables:
Mix.install([{:postgres_replication, git: "https://github.com/filipecabaco/postgres_replication.git"}])
defmodule Handler do
@behaviour PostgresReplication.Handler
import PostgresReplication.Protocol
alias PostgresReplication.Protocol.KeepAlive
alias PostgresReplication.Plugin.Pgoutput.Decoder
@impl true
def call(message, _parent_pid) when is_write(message) do
message
|> PostgresReplication.Protocol.parse()
|> Decoder.decode_message()
|> IO.inspect()
:noreply
end
def call(message, _parent_pid) when is_keep_alive(message) do
reply =
case parse(message) do
%KeepAlive{reply: :now, wal_end: wal_end} ->
wal_end = wal_end + 1
standby(wal_end, wal_end, wal_end, :now)
_ ->
hold()
end
{:reply, reply}
end
def call(_, _), do: :noreply
end
defmodule Replication do
def start do
options = %PostgresReplication{
connection_opts: [
hostname: "localhost",
username: "postgres",
password: "postgres",
database: "postgres",
],
table: :all,
opts: [name: __MODULE__, auto_reconnect: true],
handler_module: Handler,
}
{:ok, _pid} = PostgresReplication.start_link(options)
Process.sleep(:infinity)
end
end
Replication.start()def deps do
[
{:postgres_replication, "~> 0.1.0"}
]
endYou need your database to set your wal_level to the value of logical on start.
ALTER SYSTEM SET wal_level='logical'