dlt offers an init command that will clone and inject any pipeline from this repository into your project, setup the credentials and python dependencies. Please follow our docs
Join our slack by following the invitation link
If you added a pipeline and something does not work: technical-help channel
If you want to contribute pipeline, customization or a fix: dlt-contributors channel
π‘ If you want to share your working pipeline with the community
- Follow the guide on how to prepare your pipeline started with
dlt initto be shared here. - Create an issue that describes the pipeline using community pipeline template
- Fork the pipelines repository
- Create a feature branch in your fork
- Commit to that branch when you work. Please use descriptive commit names
- Make a PR to a master branch of this repository (upstream) from your fork
- We'll do code reviews quickly
π‘ If you want us or someone else to build a new pipeline
Here's the pipeline request template
π‘ If you want to report a bug in one of the pipelines
Use the bug report template
If you are new to dlt complete the Getting started and the Walkthroughs so you have a feeling what is dlt and how people will use your pipeline.
We strongly suggest that you build your pipelines out of existing building blocks.
- Declare your resources and group them in sources using Python decorators.
- Connect the transformers to the resources to load additional data or enrich it
- Create your resources dynamically from data
- Append, replace and merge your tables
- Transform your data before loading and see some examples of customizations like column renames and anonymization
- Set up "last value" incremental loading
- Dispatch data to several tables from a single resource
- Set primary and merge keys, define the columns nullability and data types
- Pass config and credentials into your sources and resources
- Use google oauth2 and service account credentials, database connection strings and define your own complex credentials: see examples below
Concepts to grasp
- Credentials and their "under the hood"
- Schemas, naming conventions and data normalization.
- How we distribute pipelines to our users
Building blocks used right:
- Create dynamic resources for tables by reflecting a whole database
- Incrementally dispatch github events to separate tables
- Read the participants for each deal using transformers and pipe operator
- Read the events for each ticket by attaching transformer to resource explicitly
- Set
tagscolumn data type to complex to load them as JSON/struct - Typical use of
mergewith incremental load for endpoints returning a list of updates to entities in Shopify pipeline. - A
dltmega-combo inpipedrivepipeline, where the deals fromdealendpoint are fed intodeals_flowresource to obtain events for a particular deal. Both resources usemergewrite disposition and incremental load to get just the newest updates. Thedeals_flowis dispatching different event types to separate tables withdlt.mark.with_table_name. - An example of using JSONPath expression to get cursor value for incremental loading. In pipedrive some objects have
timestampproperty and othersupdate_time. The dlt.sources.incremental('update_time|modified') expression lets you bind the incremental to either. - If your source/resource needs google credentials, just use
dltbuilt-in credentials as we do in google sheets and google analytics. Also note howcredentials.to_native_credentials()is used to initialize google api client. - If your source/resource accepts several different credential types look how we deal with 3 different types of Zendesk credentials
- See database connection string credentials applied to sql_database pipeline
Code of the community and verified pipelines reside in pipelines folder. Each pipeline has its own pipeline folder (ie. chess) where the dlt.source and dlt.resource functions are present. The internal organization of this folder is up to the contributor. For each pipeline there's a also a script with the example usages (ie. chess_pipeline.py). The intention is to show the user how the sources/resources may be called and let the user to copy the code from it.
π‘ if you are sharing a pipeline created with
dlt inithere is a guide. Below you can find in-depth information.
- Create a folder (pipeline folder) with your pipeline
<name>inpipelines. Place all your code in that folder. - Place (decorated) source/resource functions in the main module named as pipeline folder (the
__init__.pyalso works) - Try to separate your code where the part that you want people to hack stays in main module and the rest goes to some helper modules.
- Create a demo/usage script with the name
<name>_pipeline.pyand place it inpipelines. Make it work withpostgresorduckdbso it is easy to try them out - Add pipeline specific dependencies as described below
- Add example credentials to this repo as described below.
- Add one liner module docstring to the
__init__.pyin pipeline folder.dlt init --list-pipelineswill use this line as pipeline description. - The pipeline must pass linter stage.
If pipeline requires additional dependencies that are not available in dlt they may be added as follows:
- Use
poetryto add it to the group with the same name as pipeline. Example: chess pipeline usespython-chessto decode game moves. Dependency was added withpoetry add -G chess python-chess - Add
requirements.txtfile in pipeline folder and add the dependency there.
Use relative imports. Your code will be imported as source code and everything under pipeline folder must be self-contained and isolated. Example (from google_sheets)
from .helpers.data_processing import get_spreadsheet_id
from .helpers.api_calls import api_auth
from .helpers import api_callsIn your Create a demo/usage script use normal imports - as you would use in standalone script.
Example (from pipedrive):
import dlt
from pipedrive import pipedrive_sourceThis script is distributed by dlt init with the other pipeline <name> files. It will be a first touch point with your users. It will be used by them as a starting point or as a source of code snippets. The ideal content for the script:
- Shows a few usage examples with different source/resource arguments combinations that you think are the most common cases for your user.
- If you provide any customizations/transformations then show how to use them.
- Any code snippet that will speed the user up.
Examples:
It would be perfect if you are able to run the demo scripts. If you are contributing a working pipeline you can probably re-use your test accounts, data and credentials. You can test the scripts by loading to local duckdb or postgres as explained later.
Your working dir must be pipelines otherwise dlt will not find the .dlt folder with secrets.
All pipeline tests and usage/example scripts share the same config and credential files that are present in pipelines/.dlt.
This makes running locally much easier and dlt configuration is flexible enough to apply to many pipelines in one folder.
Please look at example.secrets.toml in .dlt folder on how to configure postgres, redshift and bigquery destination credentials. Those credentials are shared by all pipelines.
Then you can create your secrets.toml with the credentials you need. The duckdb and postgres destinations work locally and we suggest you use them for initial testing.
As explained in technical docs, both native form (ie. database connection string) or dictionary representation (a python dict with host database password etc.) can be used.
If you add a new pipeline that require a secret value, please add a placeholder to example.secrets.toml. When adding the source config and secrets please follow the section layout for sources. We have a lot of pipelines so we must use precise section layout (up to module level):
[sources.<python module name where source and resources are placed>]
This way we can isolate credentials for each pipeline.
There's compose file with fully prepared postgres instance here
We may distribute a pipeline without tests and daily CI running as community pipeline. verified pipelines require following additional steps
- Place your tests in
tests/<name>. - To run your tests you'll need to create test accounts, data sets, credentials etc. Talk to dlt team on slack. We may provide you with the required accounts and credentials.
python-dlt uses poetry to manage, build and version the package. It also uses make to automate tasks. To start
make install-poetry # will install poetry, to be run outside virtualenvthen
make dev # will install all deps including devExecuting poetry shell and working in it is very convenient at this moment.
Use python 3.8 for development which is the lowest supported version for python-dlt. You'll need distutils and venv:
sudo apt-get install python3.8
sudo apt-get install python3.8-distutils
sudo apt install python3.8-venvYou may also use pyenv as poetry suggests.
python-dlt uses mypy and flake8 with several plugins for linting. We do not reorder imports or reformat code. To lint the code do make lint.
Code does not need to be typed - but it is better if it is - mypy is able to catch a lot of problems in the code. If your pipeline is typed file named py.typed to the folder where your pipeline code is. (see chess pipeline for example)
Function input argument of sources and resources should be typed that allows dlt to validate input arguments at runtime, say which are secrets and generate the secret and config files automatically.
All the pipelines will be parsed and installed with dlt init during the linting stage. Those tests are implemented in tests/test_dlt_init.py. This is required for the PR to be accepted on CI.
Linting step requires properly constructed python packages so it will ask for __init__ files to be created. That can be automated with
./check-package.sh --fixexecuted from the top repo folder
- If you are contributing and want to test against
redshiftandbigquery, ping the dlt team on slack. You'll get atomlfile fragment with the credentials that you can paste into yoursecrets.toml - If you contributed a pipeline and created any credentials, test accounts, test dataset please include them in the tests or share them with
dltteam so we can configure the CI job. If sharing is not possible please help us to reproduce your test cases so CI job will pass.
We use pytest for testing. Every test is running within a set of fixtures that provide the following environment (see conftest.py):
- they load secrets and config from
pipelines/.dltso the same values are used when you run your pipeline from command line and in tests - it sets the working directory for each pipeline to
_storagefolder and makes sure it is empty before each test - it drops all datasets from the destination after each test
- it runs each test with the original environment variables so you can modify
os.environ
Look at tests/chess/test_chess_pipeline.py for an example. The line
@pytest.mark.parametrize('destination_name', ALL_DESTINATIONS)makes sure that each test runs against all destinations (as defined in ALL_DESTINATIONS global variables)
The simplest possible test just creates pipeline and then issues a run on a source. More advanced test will use sql_client to check the data and access the schemas to check the table structure.
Please also look at the test helpers that you can use to assert the load infos, get counts of elements in tables, select and assert the data in tables etc.
Your tests will be run both locally and on CI. It means that a few instances of your test may be executed in parallel and they will be sharing resources. A few simple rules make that possible.
- Always use
full_refreshwhen creating pipelines in test. This will make sure that data is loaded into new schema/dataset. Fixtures inconftest.pywill drop datasets created during load. - When creating any fixtures for your tests, make sure that fixture is unique for your test instance.
If you create database or schema or table, add random suffix/prefix to it und use in your test
If you create an account ie. an user with a name and this name is uniq identifier, also add random suffix/prefix
- Cleanup after your fixtures - delete accounts, drop schemas and databases
- Add code to
tests/utils.pyonly if this is helpful for all tests. Put your specific helpers in your own directory.
Tests in tests/test_dlt_init.py are executed as part of linting stage and must be passing. They make sure that pipeline can be distributed with dlt init.
- When developing, limit the destinations to local ie. duckdb by setting the environment variable:
ALL_DESTINATIONS='["duckdb"]' pytest tests/chess
there's also make test-local command that will run all the tests on duckdb and postgres