Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
209 changes: 107 additions & 102 deletions docs/website/docs/dlt-ecosystem/verified-sources/pg_replication.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,16 @@ Resources that can be loaded using this verified source are:
| Name | Description |
| -------------------- | ----------------------------------------------- |
| replication_resource | Load published messages from a replication slot |
| init_replication | Initialize replication and optionally return snapshot resources for initial data load |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@VioletM thought it makes sense to add it here, because it does serve as a resource 👀



:::info
The Postgres replication source currently **does not** support the [scd2 merge strategy](../../general-usage/merge-loading.md#scd2-strategy).
:::

## Setup guide

### Setup user
### Set up user
To set up a Postgres user, follow these steps:

1. The Postgres user needs to have the `LOGIN` and `REPLICATION` attributes assigned:
Expand All @@ -37,7 +39,11 @@ To set up a Postgres user, follow these steps:
```sql
GRANT CREATE ON DATABASE dlt_data TO replication_user;
```


3. If not a superuser, the user must have ownership of the tables that need to be replicated:
```sql
ALTER TABLE your_table OWNER TO replication_user;
```

### Set up RDS
To set up a Postgres user on RDS, follow these steps:
Expand Down Expand Up @@ -65,28 +71,17 @@ To get started with your data pipeline, follow these steps:
dlt init pg_replication duckdb
```

It will initialize [the pipeline example](https://github.com/dlt-hub/verified-sources/blob/master/sources/pg_replication_pipeline.py) with a Postgres replication as the [source](../../general-usage/source) and [DuckDB](../../dlt-ecosystem/destinations/duckdb) as the [destination](../../dlt-ecosystem/destinations).


2. If you'd like to use a different destination, simply replace `duckdb` with the name of your preferred [destination](../../dlt-ecosystem/destinations).

3. This source uses the `sql_database` source; you can initialize it as follows:
It will initialize [pipeline examples](https://github.com/dlt-hub/verified-sources/blob/master/sources/pg_replication_pipeline.py) with Postgres replication as the [source](../../general-usage/source) and [DuckDB](../../dlt-ecosystem/destinations/duckdb) as the [destination](../../dlt-ecosystem/destinations).

2. If you'd like to use a different destination, simply replace `duckdb` with the name of your preferred [destination](../../dlt-ecosystem/destinations). For example:
```sh
dlt init sql_database duckdb
dlt init pg_replication bigquery
```
:::note
It is important to note that it is now only required if a user performs an initial load, specifically when `persist_snapshots` is set to `True`.
:::

4. After running these two commands, a new directory will be created with the necessary files and configuration settings to get started.

3. After running the command, a new directory will be created with the necessary files and configuration settings to get started.

For more information, read the guide on [how to add a verified source](../../walkthroughs/add-a-verified-source).

:::note
You can omit the `[sql.sources.credentials]` section in `secrets.toml` as it is not required.
:::


### Add credentials

Expand All @@ -110,21 +105,22 @@ To get started with your data pipeline, follow these steps:
sources.pg_replication.credentials="postgresql://username@password.host:port/database"
```

3. Finally, follow the instructions in [Destinations](../../dlt-ecosystem/destinations/) to add credentials for your chosen destination. This will ensure that your data is properly routed.
3. Finally, follow the instructions in the [Destinations section](../../dlt-ecosystem/destinations/) to add credentials for your chosen destination.


For more information, read the [Configuration section.](../../general-usage/credentials)

## Run the pipeline

1. Before running the pipeline, ensure that you have installed all the necessary dependencies by running the command:
1. Ensure that you have installed all the necessary dependencies by running:
```sh
pip install -r requirements.txt
```
2. You're now ready to run the pipeline! To get started, run the following command:
2. After carrying out the necessary customization to your pipeline script, you can run the pipeline with the following command:
```sh
python pg_replication_pipeline.py
```
3. Once the pipeline has finished running, you can verify that everything loaded correctly by using the following command:
3. Once the pipeline has finished running, you can verify that everything loaded correctly with:
```sh
dlt pipeline <pipeline_name> show
```
Expand All @@ -136,7 +132,57 @@ For more information, read the [Configuration section.](../../general-usage/cred

## Sources and resources

`dlt` works on the principle of [sources](../../general-usage/source) and [resources](../../general-usage/resource).

### Snapshot resources from `init_replication`

The `init_replication` function serves two main purposes:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The usage of init_replication is kind of complex also because of all the permissions. I would also add:

   The example below make use of the `init_replication` helper from the `pg_replication` source.  
   When you run `init_replication`, Postgres is prepared for logical replication: a publication is created and tables (or the whole schema) are added, a replication slot is created, and—if `persist_snapshots=True` snapshot tables are generated to capture the initial state.

   To perform these steps your Postgres user needs the following permissions: 
   - `CREATE` on the database (or superuser) to create publications  
   - Ownership of the publication and tables to add them  
   - Superuser privileges if replicating an entire schema  
   - `SELECT` on source tables and `CREATE` on the target schema for snapshots  
   - Superuser or the `REPLICATION` attribute for replication slot operations

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you so much for taking care of it @anuunchin !


1. Sets up Postgres replication by creating the necessary replication slot and publication.
2. Optionally captures an initial snapshot when `persist_snapshots=True` and returns snapshot resources for loading existing data.

```py
def init_replication(
slot_name: str = dlt.config.value,
pub_name: str = dlt.config.value,
schema_name: str = dlt.config.value,
table_names: Optional[Union[str, Sequence[str]]] = dlt.config.value,
credentials: ConnectionStringCredentials = dlt.secrets.value,
publish: str = "insert, update, delete",
persist_snapshots: bool = False,
include_columns: Optional[Mapping[str, Sequence[str]]] = None,
columns: Optional[Mapping[str, TTableSchemaColumns]] = None,
reset: bool = False,
) -> Optional[Union[DltResource, List[DltResource]]]:
...
```

`slot_name`: Name of the replication slot to create if it does not exist yet.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This section is just a copy-past from the verified-source repo. Do you think it makes sense to leave it here? I think it will only create an additional place to edit together with the code :)


`pub_name`: Name of the publication to create if it does not exist yet.

`schema_name`: Name of the schema to replicate tables from.

`table_names`: Names of the tables to include in the publication. If not provided, the whole schema specified by `schema_name` will be replicated, including tables added to the schema after the publication was created. Superuser privileges are required for whole-schema replication. When specifying individual table names, the database role must own the tables if the role is not a superuser.

`credentials`: Postgres credentials, automatically resolved from `.dlt/secrets.toml` or environment variables.

`publish`: Comma-separated list of DML operations that controls which changes the publication includes. Allowed values are `insert`, `update`, and `delete`; `truncate` is not supported. For example, `publish="insert"` creates a publication that publishes only inserts.

`persist_snapshots`: Whether to persist the snapshot of table states taken when the replication slot is created. If set to `True`, snapshot tables are created in Postgres for each included table, and corresponding `DltResource` objects are created and returned. These resources can be used to perform an initial load of all data present in the tables at the time the replication slot was created.

`include_columns`: Maps table names to the columns to include in the snapshot tables; columns not listed are excluded. If omitted, all columns are included. This argument is used only when `persist_snapshots` is `True`.

`columns`: Maps table names to column hints to apply to the snapshot table resources. For example:

```py
columns = {
"table_x": {"col_a": {"data_type": "json"}},
"table_y": {"col_y": {"precision": 32}},
}
```
This argument is used only when `persist_snapshots` is `True`.

`reset`: If set to `True`, the existing slot and publication are dropped and recreated. Has no effect if a slot and publication with the provided names do not yet exist.

### Resource `replication_resource`

Expand All @@ -162,121 +208,80 @@ def replication_resource(

`pub_name`: Publication slot name to publish messages.

`include_columns`: Maps table name(s) to a sequence of names of columns to include in the generated data items. Any column not in the sequence is excluded. If not provided, all columns are included.
`credentials`: Postgres credentials, automatically resolved from `.dlt/secrets.toml` or environment variables.

`include_columns`: Maps table names to the columns to include in the generated data items; columns not listed are excluded. If omitted, all columns are included.

`columns`: Maps table name(s) to column hints to apply on the replicated table(s).
`columns`: Maps table names to column hints to apply on the replicated tables.

`target_batch_size`: Desired number of data items yielded in a batch. Can be used to limit the data items in memory.

`flush_slot`: Whether processed messages are discarded from the replication slot. The recommended value is "True".
`flush_slot`: Whether processed messages are discarded from the replication slot. The recommended value is `True`.

## Customization

If you wish to create your own pipelines, you can leverage source and resource methods from this verified source.
The [pipeline examples](https://github.com/dlt-hub/verified-sources/blob/master/sources/pg_replication_pipeline.py) include demos that simulate changes in a Postgres source to demonstrate replication. The simulation uses a simple pipeline defined as:

1. Define the source pipeline as:

```py
# Defining source pipeline
src_pl = dlt.pipeline(
pipeline_name="source_pipeline",
# Simulation pipeline
sim_pl = dlt.pipeline(
pipeline_name="simulation_pipeline",
destination="postgres",
dataset_name="source_dataset",
dev_mode=True,
)
```
This pipeline is configured in the `get_postgres_pipeline()` function.
It’s meant for local testing, so you can freely modify it to simulate different replication scenarios.

You can configure and use the `get_postgres_pipeline()` function available in the `pg_replication_pipeline.py` file to achieve the same functionality.
:::note
In production, you don’t need a simulation pipeline. Replication runs against an actual Postgres database that changes independently.
:::

:::note IMPORTANT
When working with large datasets from a Postgres database, it's important to consider the relevance of the source pipeline. For testing purposes, using the source pipeline can be beneficial to try out the data flow. However, in production use cases, there will likely be another process that mutates the Postgres database. In such cases, the user generally only needs to define a destination pipeline.
:::
The general workflow for setting up replication is:


2. Similarly, define the destination pipeline.
1. Define the replication pipeline that will load replicated data in your chosen destination:

```py
dest_pl = dlt.pipeline(
repl_pl = dlt.pipeline(
pipeline_name="pg_replication_pipeline",
destination='duckdb',
dataset_name="replicate_single_table",
dev_mode=True,
)
```

3. Define the slot and publication names as:

```py
slot_name = "example_slot"
pub_name = "example_pub"
```

4. To initialize replication, you can use the `init_replication` function. A user can use this function to let `dlt` configure Postgres and make it ready for replication.

```py
# requires the Postgres user to have the REPLICATION attribute assigned
init_replication(
slot_name=slot_name,
pub_name=pub_name,
schema_name=src_pl.dataset_name,
table_names="my_source_table",
reset=True,
)
```

:::note
To replicate the entire schema, you can omit the `table_names` argument from the `init_replication` function.
:::

5. To snapshot the data to the destination during the initial load, you can use the `persist_snapshots=True` argument as follows:
```py
snapshot = init_replication( # requires the Postgres user to have the REPLICATION attribute assigned
slot_name=slot_name,
pub_name=pub_name,
schema_name=src_pl.dataset_name,
table_names="my_source_table",
persist_snapshots=True, # persist snapshot table(s) and let function return resource(s) for initial load
reset=True,
)
```

6. To load this snapshot to the destination, run the destination pipeline as:
2. Initialize replication (if needed) with `init_replication`, and capture a snapshot of the source:

```py
snapshot = init_replication(
slot_name="my_slot",
pub_name="my_pub",
schema_name="my_schema",
table_names="my_source_table",
persist_snapshots=True,
reset=True,
)
```

3. Load the initial snapshot, so the destination contains all existing data before replication begins:

```py
dest_pl.run(snapshot)
repl_pl.run(snapshot)
```

7. After changes are made to the source, you can replicate the changes to the destination using the `replication_resource`, and run the pipeline as:

4. Apply ongoing changes by creating a `replication_resource` to capture updates and keep the destination in sync:
```py
# Create a resource that generates items for each change in the source table
changes = replication_resource(slot_name, pub_name)
changes = replication_resource("my_slot", "my_pub")

# Run the pipeline as
dest_pl.run(changes)
repl_pl.run(changes)
```

8. To replicate tables with selected columns, you can use the `include_columns` argument as follows:

```py
# requires the Postgres user to have the REPLICATION attribute assigned
initial_load = init_replication(
slot_name=slot_name,
pub_name=pub_name,
schema_name=src_pl.dataset_name,
table_names="my_source_table",
include_columns={
"my_source_table": ("column1", "column2")
},
reset=True,
)
```

Similarly, to replicate changes from selected columns, you can use the `table_names` and `include_columns` arguments in the `replication_resource` function.

## Optional: Using `xmin` for Change Data Capture (CDC)
## Alternative: Using `xmin` for Change Data Capture (CDC)

PostgreSQL internally uses the `xmin` system column to track row versions. You can use `xmin` to enable an efficient CDC mechanism when working with the `sql_database` source.
If logical replication doesn't fit your needs, you can use the built-in `xmin` system column of Postgres for change tracking with dlt's `sql_database` source instead of the `pg_replication` source.

To do this, define a `query_adapter_callback` that extracts the `xmin` value from the source table and filters based on an incremental cursor:

Expand Down