A lightweight, modular, and reliable CDC (Change Data Capture) library based on PostgreSQL WAL → RabbitMQ.
- Logical Replication: Captures changes instantly via PostgreSQL WAL (Write-Ahead Log).
- RabbitMQ Integration: Automatically routes changes to a RabbitMQ exchange.
- Robust: Includes automatic reconnection and exponential backoff strategies against connection drops.
- Pluggable: Support for
wal2jsonorpgoutput(Postgres 10+ native) plugins.
- Python 3.10+
- PostgreSQL 10+ (
wal_level = logicalmust be set) - RabbitMQ
pip install dbqueuefrom dbqueue import DBQueueWatcher
def main():
watcher = DBQueueWatcher(
db_dsn="postgresql://user:pass@localhost:5432/dbname",
rabbit_url="amqp://guest:guest@localhost:5672/",
slot_name="my_cdc_slot",
publication="my_cdc_pub",
event_format="debezium_like" # or "simple"
)
# You can also use it as a consumer if you wish
@watcher.on_insert("users")
def handle_user_insert(event):
print(f"New user: {event['after']}")
# Publishes and runs defined handlers
watcher.start()
if __name__ == "__main__":
main()Make sure the following setting is present in postgresql.conf:
wal_level = logical