-
Couldn't load subscription status.
- Fork 354
add init_replication description and required permissions #3020
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: devel
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -16,14 +16,16 @@ Resources that can be loaded using this verified source are: | |
| | Name | Description | | ||
| | -------------------- | ----------------------------------------------- | | ||
| | replication_resource | Load published messages from a replication slot | | ||
| | init_replication | Initialize replication and optionally return snapshot resources for initial data load | | ||
|
|
||
|
|
||
| :::info | ||
| The Postgres replication source currently **does not** support the [scd2 merge strategy](../../general-usage/merge-loading.md#scd2-strategy). | ||
| ::: | ||
|
|
||
| ## Setup guide | ||
|
|
||
| ### Setup user | ||
| ### Set up user | ||
| To set up a Postgres user, follow these steps: | ||
|
|
||
| 1. The Postgres user needs to have the `LOGIN` and `REPLICATION` attributes assigned: | ||
|
|
@@ -37,7 +39,11 @@ To set up a Postgres user, follow these steps: | |
| ```sql | ||
| GRANT CREATE ON DATABASE dlt_data TO replication_user; | ||
| ``` | ||
|
|
||
|
|
||
| 3. If not a superuser, the user must have ownership of the tables that need to be replicated: | ||
| ```sql | ||
| ALTER TABLE your_table OWNER TO replication_user; | ||
| ``` | ||
|
|
||
| ### Set up RDS | ||
| To set up a Postgres user on RDS, follow these steps: | ||
|
|
@@ -65,28 +71,17 @@ To get started with your data pipeline, follow these steps: | |
| dlt init pg_replication duckdb | ||
| ``` | ||
|
|
||
| It will initialize [the pipeline example](https://github.com/dlt-hub/verified-sources/blob/master/sources/pg_replication_pipeline.py) with a Postgres replication as the [source](../../general-usage/source) and [DuckDB](../../dlt-ecosystem/destinations/duckdb) as the [destination](../../dlt-ecosystem/destinations). | ||
|
|
||
|
|
||
| 2. If you'd like to use a different destination, simply replace `duckdb` with the name of your preferred [destination](../../dlt-ecosystem/destinations). | ||
|
|
||
| 3. This source uses the `sql_database` source; you can initialize it as follows: | ||
| It will initialize [pipeline examples](https://github.com/dlt-hub/verified-sources/blob/master/sources/pg_replication_pipeline.py) with Postgres replication as the [source](../../general-usage/source) and [DuckDB](../../dlt-ecosystem/destinations/duckdb) as the [destination](../../dlt-ecosystem/destinations). | ||
|
|
||
| 2. If you'd like to use a different destination, simply replace `duckdb` with the name of your preferred [destination](../../dlt-ecosystem/destinations). For example: | ||
| ```sh | ||
| dlt init sql_database duckdb | ||
| dlt init pg_replication bigquery | ||
| ``` | ||
| :::note | ||
| It is important to note that it is now only required if a user performs an initial load, specifically when `persist_snapshots` is set to `True`. | ||
| ::: | ||
|
|
||
| 4. After running these two commands, a new directory will be created with the necessary files and configuration settings to get started. | ||
|
|
||
| 3. After running the command, a new directory will be created with the necessary files and configuration settings to get started. | ||
|
|
||
| For more information, read the guide on [how to add a verified source](../../walkthroughs/add-a-verified-source). | ||
|
|
||
| :::note | ||
| You can omit the `[sql.sources.credentials]` section in `secrets.toml` as it is not required. | ||
| ::: | ||
|
|
||
|
|
||
| ### Add credentials | ||
|
|
||
|
|
@@ -110,21 +105,22 @@ To get started with your data pipeline, follow these steps: | |
| sources.pg_replication.credentials="postgresql://username@password.host:port/database" | ||
| ``` | ||
|
|
||
| 3. Finally, follow the instructions in [Destinations](../../dlt-ecosystem/destinations/) to add credentials for your chosen destination. This will ensure that your data is properly routed. | ||
| 3. Finally, follow the instructions in the [Destinations section](../../dlt-ecosystem/destinations/) to add credentials for your chosen destination. | ||
|
|
||
|
|
||
| For more information, read the [Configuration section.](../../general-usage/credentials) | ||
|
|
||
| ## Run the pipeline | ||
|
|
||
| 1. Before running the pipeline, ensure that you have installed all the necessary dependencies by running the command: | ||
| 1. Ensure that you have installed all the necessary dependencies by running: | ||
| ```sh | ||
| pip install -r requirements.txt | ||
| ``` | ||
| 2. You're now ready to run the pipeline! To get started, run the following command: | ||
| 2. After carrying out the necessary customization to your pipeline script, you can run the pipeline with the following command: | ||
| ```sh | ||
| python pg_replication_pipeline.py | ||
| ``` | ||
| 3. Once the pipeline has finished running, you can verify that everything loaded correctly by using the following command: | ||
| 3. Once the pipeline has finished running, you can verify that everything loaded correctly with: | ||
| ```sh | ||
| dlt pipeline <pipeline_name> show | ||
| ``` | ||
|
|
@@ -136,7 +132,57 @@ For more information, read the [Configuration section.](../../general-usage/cred | |
|
|
||
| ## Sources and resources | ||
|
|
||
| `dlt` works on the principle of [sources](../../general-usage/source) and [resources](../../general-usage/resource). | ||
|
|
||
| ### Snapshot resources from `init_replication` | ||
|
|
||
| The `init_replication` function serves two main purposes: | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The usage of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thank you so much for taking care of it @anuunchin ! |
||
|
|
||
| 1. Sets up Postgres replication by creating the necessary replication slot and publication. | ||
| 2. Optionally captures an initial snapshot when `persist_snapshots=True` and returns snapshot resources for loading existing data. | ||
|
|
||
| ```py | ||
| def init_replication( | ||
| slot_name: str = dlt.config.value, | ||
| pub_name: str = dlt.config.value, | ||
| schema_name: str = dlt.config.value, | ||
| table_names: Optional[Union[str, Sequence[str]]] = dlt.config.value, | ||
| credentials: ConnectionStringCredentials = dlt.secrets.value, | ||
| publish: str = "insert, update, delete", | ||
| persist_snapshots: bool = False, | ||
| include_columns: Optional[Mapping[str, Sequence[str]]] = None, | ||
| columns: Optional[Mapping[str, TTableSchemaColumns]] = None, | ||
| reset: bool = False, | ||
| ) -> Optional[Union[DltResource, List[DltResource]]]: | ||
| ... | ||
| ``` | ||
|
|
||
| `slot_name`: Name of the replication slot to create if it does not exist yet. | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This section is just a copy-past from the |
||
|
|
||
| `pub_name`: Name of the publication to create if it does not exist yet. | ||
|
|
||
| `schema_name`: Name of the schema to replicate tables from. | ||
|
|
||
| `table_names`: Names of the tables to include in the publication. If not provided, the whole schema specified by `schema_name` will be replicated, including tables added to the schema after the publication was created. Superuser privileges are required for whole-schema replication. When specifying individual table names, the database role must own the tables if the role is not a superuser. | ||
|
|
||
| `credentials`: Postgres credentials, automatically resolved from `.dlt/secrets.toml` or environment variables. | ||
|
|
||
| `publish`: Comma-separated list of DML operations that controls which changes the publication includes. Allowed values are `insert`, `update`, and `delete`; `truncate` is not supported. For example, `publish="insert"` creates a publication that publishes only inserts. | ||
|
|
||
| `persist_snapshots`: Whether to persist the snapshot of table states taken when the replication slot is created. If set to `True`, snapshot tables are created in Postgres for each included table, and corresponding `DltResource` objects are created and returned. These resources can be used to perform an initial load of all data present in the tables at the time the replication slot was created. | ||
|
|
||
| `include_columns`: Maps table names to the columns to include in the snapshot tables; columns not listed are excluded. If omitted, all columns are included. This argument is used only when `persist_snapshots` is `True`. | ||
|
|
||
| `columns`: Maps table names to column hints to apply to the snapshot table resources. For example: | ||
|
|
||
| ```py | ||
| columns = { | ||
| "table_x": {"col_a": {"data_type": "json"}}, | ||
| "table_y": {"col_y": {"precision": 32}}, | ||
| } | ||
| ``` | ||
| This argument is used only when `persist_snapshots` is `True`. | ||
|
|
||
| `reset`: If set to `True`, the existing slot and publication are dropped and recreated. Has no effect if a slot and publication with the provided names do not yet exist. | ||
|
|
||
| ### Resource `replication_resource` | ||
|
|
||
|
|
@@ -162,121 +208,80 @@ def replication_resource( | |
|
|
||
| `pub_name`: Publication slot name to publish messages. | ||
|
|
||
| `include_columns`: Maps table name(s) to a sequence of names of columns to include in the generated data items. Any column not in the sequence is excluded. If not provided, all columns are included. | ||
| `credentials`: Postgres credentials, automatically resolved from `.dlt/secrets.toml` or environment variables. | ||
|
|
||
| `include_columns`: Maps table names to the columns to include in the generated data items; columns not listed are excluded. If omitted, all columns are included. | ||
|
|
||
| `columns`: Maps table name(s) to column hints to apply on the replicated table(s). | ||
| `columns`: Maps table names to column hints to apply on the replicated tables. | ||
|
|
||
| `target_batch_size`: Desired number of data items yielded in a batch. Can be used to limit the data items in memory. | ||
|
|
||
| `flush_slot`: Whether processed messages are discarded from the replication slot. The recommended value is "True". | ||
| `flush_slot`: Whether processed messages are discarded from the replication slot. The recommended value is `True`. | ||
|
|
||
| ## Customization | ||
|
|
||
| If you wish to create your own pipelines, you can leverage source and resource methods from this verified source. | ||
| The [pipeline examples](https://github.com/dlt-hub/verified-sources/blob/master/sources/pg_replication_pipeline.py) include demos that simulate changes in a Postgres source to demonstrate replication. The simulation uses a simple pipeline defined as: | ||
|
|
||
| 1. Define the source pipeline as: | ||
|
|
||
| ```py | ||
| # Defining source pipeline | ||
| src_pl = dlt.pipeline( | ||
| pipeline_name="source_pipeline", | ||
| # Simulation pipeline | ||
| sim_pl = dlt.pipeline( | ||
| pipeline_name="simulation_pipeline", | ||
| destination="postgres", | ||
| dataset_name="source_dataset", | ||
| dev_mode=True, | ||
| ) | ||
| ``` | ||
| This pipeline is configured in the `get_postgres_pipeline()` function. | ||
| It’s meant for local testing, so you can freely modify it to simulate different replication scenarios. | ||
|
|
||
| You can configure and use the `get_postgres_pipeline()` function available in the `pg_replication_pipeline.py` file to achieve the same functionality. | ||
| :::note | ||
| In production, you don’t need a simulation pipeline. Replication runs against an actual Postgres database that changes independently. | ||
| ::: | ||
|
|
||
| :::note IMPORTANT | ||
| When working with large datasets from a Postgres database, it's important to consider the relevance of the source pipeline. For testing purposes, using the source pipeline can be beneficial to try out the data flow. However, in production use cases, there will likely be another process that mutates the Postgres database. In such cases, the user generally only needs to define a destination pipeline. | ||
| ::: | ||
| The general workflow for setting up replication is: | ||
|
|
||
|
|
||
| 2. Similarly, define the destination pipeline. | ||
| 1. Define the replication pipeline that will load replicated data in your chosen destination: | ||
|
|
||
| ```py | ||
| dest_pl = dlt.pipeline( | ||
| repl_pl = dlt.pipeline( | ||
| pipeline_name="pg_replication_pipeline", | ||
| destination='duckdb', | ||
| dataset_name="replicate_single_table", | ||
| dev_mode=True, | ||
| ) | ||
| ``` | ||
|
|
||
| 3. Define the slot and publication names as: | ||
|
|
||
| ```py | ||
| slot_name = "example_slot" | ||
| pub_name = "example_pub" | ||
| ``` | ||
|
|
||
| 4. To initialize replication, you can use the `init_replication` function. A user can use this function to let `dlt` configure Postgres and make it ready for replication. | ||
|
|
||
| ```py | ||
| # requires the Postgres user to have the REPLICATION attribute assigned | ||
| init_replication( | ||
| slot_name=slot_name, | ||
| pub_name=pub_name, | ||
| schema_name=src_pl.dataset_name, | ||
| table_names="my_source_table", | ||
| reset=True, | ||
| ) | ||
| ``` | ||
|
|
||
| :::note | ||
| To replicate the entire schema, you can omit the `table_names` argument from the `init_replication` function. | ||
| ::: | ||
|
|
||
| 5. To snapshot the data to the destination during the initial load, you can use the `persist_snapshots=True` argument as follows: | ||
| ```py | ||
| snapshot = init_replication( # requires the Postgres user to have the REPLICATION attribute assigned | ||
| slot_name=slot_name, | ||
| pub_name=pub_name, | ||
| schema_name=src_pl.dataset_name, | ||
| table_names="my_source_table", | ||
| persist_snapshots=True, # persist snapshot table(s) and let function return resource(s) for initial load | ||
| reset=True, | ||
| ) | ||
| ``` | ||
|
|
||
| 6. To load this snapshot to the destination, run the destination pipeline as: | ||
| 2. Initialize replication (if needed) with `init_replication`, and capture a snapshot of the source: | ||
|
|
||
| ```py | ||
| snapshot = init_replication( | ||
| slot_name="my_slot", | ||
| pub_name="my_pub", | ||
| schema_name="my_schema", | ||
| table_names="my_source_table", | ||
| persist_snapshots=True, | ||
| reset=True, | ||
| ) | ||
| ``` | ||
|
|
||
| 3. Load the initial snapshot, so the destination contains all existing data before replication begins: | ||
|
|
||
| ```py | ||
| dest_pl.run(snapshot) | ||
| repl_pl.run(snapshot) | ||
| ``` | ||
|
|
||
| 7. After changes are made to the source, you can replicate the changes to the destination using the `replication_resource`, and run the pipeline as: | ||
|
|
||
| 4. Apply ongoing changes by creating a `replication_resource` to capture updates and keep the destination in sync: | ||
| ```py | ||
| # Create a resource that generates items for each change in the source table | ||
| changes = replication_resource(slot_name, pub_name) | ||
| changes = replication_resource("my_slot", "my_pub") | ||
|
|
||
| # Run the pipeline as | ||
| dest_pl.run(changes) | ||
| repl_pl.run(changes) | ||
| ``` | ||
|
|
||
| 8. To replicate tables with selected columns, you can use the `include_columns` argument as follows: | ||
|
|
||
| ```py | ||
| # requires the Postgres user to have the REPLICATION attribute assigned | ||
| initial_load = init_replication( | ||
| slot_name=slot_name, | ||
| pub_name=pub_name, | ||
| schema_name=src_pl.dataset_name, | ||
| table_names="my_source_table", | ||
| include_columns={ | ||
| "my_source_table": ("column1", "column2") | ||
| }, | ||
| reset=True, | ||
| ) | ||
| ``` | ||
|
|
||
| Similarly, to replicate changes from selected columns, you can use the `table_names` and `include_columns` arguments in the `replication_resource` function. | ||
|
|
||
| ## Optional: Using `xmin` for Change Data Capture (CDC) | ||
| ## Alternative: Using `xmin` for Change Data Capture (CDC) | ||
|
|
||
| PostgreSQL internally uses the `xmin` system column to track row versions. You can use `xmin` to enable an efficient CDC mechanism when working with the `sql_database` source. | ||
| If logical replication doesn't fit your needs, you can use the built-in `xmin` system column of Postgres for change tracking with dlt's `sql_database` source instead of the `pg_replication` source. | ||
|
|
||
| To do this, define a `query_adapter_callback` that extracts the `xmin` value from the source table and filters based on an incremental cursor: | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@VioletM thought it makes sense to add it here, because it does serve as a resource 👀