Data flows orchestrated using Prefect.
- ECS runs Prefect Agent and Flow tasks
- Docker image stores Flow code and libraries
- Auto-scaling Prefect Agent based on CPU usage
- Private subnet shields tasks from the internet; only egress is needed
- Permissions according to the principle of least privilege.
We use two environments in this repo:
- A Python environment, for writing Prefect Flows. Code is located in
src/
. - A Node environment for AWS infrastructure that Prefect needs to run Flows. Code is located in
.aws/
.
- docker
- PyCharm
- Copy the
.env.example
file to a file in the same directory called.env
. Change the values according to the instructions you find in that file.⚠️ Do not put your credentials in.env.example
to prevent accidentally checking them into git. Modifying.env
is safe because it's git ignored.- Set
PREFECT__CLOUD__API_KEY
to a Prefect API key that you can create on the API keys page. - Set
SNOWFLAKE_PRIVATE_KEY
to your decrypted private key, as follows:- If you haven't already, create Snowflake development credentials. These are usually stored in
~/.snowflake-keys
. - Run
openssl rsa -in ~/.snowflake/rsa_key.p8
and enter the passphrase for this file when prompted. - Copy the value, after (but not including)
-----BEGIN RSA PRIVATE KEY-----
and before (not including)-----END RSA PRIVATE KEY-----
. - In a text editor, remove all newlines (
\n
). SetSNOWFLAKE_PRIVATE_KEY
to the resulting string.
- If you haven't already, create Snowflake development credentials. These are usually stored in
- Set
- Run
docker compose build && docker compose up
to check that you can start the Prefect agent. When the build is complete, you should see the agent start up and poll to Prefect Cloud:prefect_1 | DEBUG:agent:No ready flow runs found. prefect_1 | [2022-02-03 23:18:53,127] DEBUG - agent | No ready flow runs found. prefect_1 | [2022-02-03 23:18:53,128] DEBUG - agent | Sleeping flow run poller for 2.0 seconds...
- Hit Ctrl+C to stop the Prefect agent.
- In PyCharm, right-click on the src directory > Mark Directory as > Sources Root
- In PyCharm, Configuring Docker Compose as a remote interpreter
⚠️ Even when flows are executed locally they can affect production resources. Check whether theAWS_PROFILE
variable in your.env
file has write access in production, and if so, consider whether the flow you're going to run could have unintended consequences. Ask if you're not sure.- Run
source <(maws -o awscli)
and choose the AWS account that matches the value ofAWS_PROFILE
in your.env
file. - Run the flow in PyCharm, for example by right-clicking on the corresponding file in the
src/flows/
directory and choosing 'Run'.
Libraries are managed using pipenv, to create a consistent run-time environment. Follow the steps below to install a new Python library.
- Run
docker compose up
. - In a new terminal, run
docker compose exec prefect pipenv install pydantic
, replacingpydantic
with the library you want to install.- Add
--dev
for development libraries that don't need to be installed on production, for exampledocker compose exec prefect pipenv install --dev pytest
.
- Add
- The output should look something like this:
$ docker compose exec prefect pipenv install pydantic
Installing pydantic...
Adding pydantic to Pipfile's [packages]...
✔ Installation Succeeded
Installing dependencies from Pipfile.lock (46b380)...
🐍 ▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉ 68/68 — 00:00:20
- Run
docker compose cp prefect:/Pipfile ./ && docker compose cp prefect:/Pipfile.lock ./
to copy the Pipenv files from your Docker container to your host.- Note: if you get
No such command: cp
, try upgrading Docker, or use docker cp instead.
- Note: if you get
Pipfile
andPipfile.lock
should have been changed. Commit those changes to git.- Run
docker compose build
to rebuild your Docker image.
- node/npm and nvm (see How to set up a Node.js development environment)
- tfenv
cd .aws
to go to the .aws directory in the project rootnvm use
to use the right Node versionnpm ci
to install packages from package-lock.json
Pushing to the dev
branch (see 'Deployment' below) is an easy way to deploy infrastructure changes to Pocket-Dev.
The steps below are useful if you want to iterate more quickly over changes in the .aws/
directory.
- Run
$(maws)
and obtain write access to Pocket-Dev - Run
tfenv use
to get the right version of Terraform - Run
npm run build:dev
- From the
.aws/
directory, runcd cdktf.out/stacks/data-flows/
- Run
terraform init
and choose 'Dev' - Run
npm run build:dev && terraform apply
. Repeat this step when you want to apply changes.
Here are some things you'll want to do for using a flow in production:
- Get the flow into on-call alerts (instructions here)
- Log important metrics (for example number of rows)
- Throw exceptions for invalid input
- Usually flows will run on a schedule
- Pocket-Dev:
git push -f origin my-local-branch:dev
- Production: get your PR approved, and merge it into the main branch
Deployments take about 15 minutes. You can monitor their progress in CircleCI and CodePipeline.
This section lists the manual steps that have to be taken when this service is deployed to an AWS environment for the first time.
Create a Prefect project with the name equal to the git branch name which will trigger the deployment.
The following parameters need to be created in the SSM Parameter Store.
Replace {Env}
with the environment name as defined in
.aws/src/config.
Name | Type | Description |
---|---|---|
/DataFlows/{Env}/PREFECT_API_KEY |
SecureString | Prefect service account API key with 'user' permissions to the previously created project |
/DataFlows/{Env}/SNOWFLAKE_PRIVATE_KEY |
SecureString | Decrypted base64 Snowflake private key |
/DataFlows/{Env}/SNOWFLAKE_ACCOUNT |
String | Snowflake account id |
/DataFlows/{Env}/SNOWFLAKE_USER |
String | Snowflake username |
/DataFlows/{Env}/DBT_CLOUD_TOKEN |
SecureString | Dbt service account token |
/DataFlows/{Env}/DBT_CLOUD_ACCOUNT_ID |
String | Dbt account id that you can find in the Dbt cloud url |
/DataFlows/{Env}/GCE_KEY |
SecureString | GCP key |
/DataFlows/{Env}/BRAZE_API_KEY |
SecureString | Braze API key with write access to 'User Data' and 'Subscription'. |
/DataFlows/{Env}/BRAZE_REST_ENDPOINT |
String | Braze REST endpoint |
/DataFlows/{Env}/SNOWFLAKE_DATA_RETENTION_ROLE |
String | Snowflake Role for executing the Deleted User Account data deletions. |
/DataFlows/{Env}/SNOWFLAKE_DATA_RETENTION_WAREHOUSE |
String | Snowflake Warehouse used for data deletions of Deleted User Accounts. |
/DataFlows/{Env}/SNOWFLAKE_DATA_RETENTION_DB |
String | Snowflake DB to record the deleted user accounts. |
/DataFlows/{Env}/SNOWFLAKE_DATA_RETENTION_SCHEMA |
String | Snowflake Schema to record the deleted user accounts. |
/DataFlows/{Env}/SNOWFLAKE_SNOWPLOW_DB |
String | Snowflake DB that has the Snowplow raw events. |
/DataFlows/{Env}/SNOWFLAKE_SNOWPLOW_SCHEMA |
String | Snowflake Schema that has the Snowplow raw events. |
/DataFlows/{Env}/SNOWFLAKE_RAWDATA_DB |
String | Snowflake DB that has the legacy Raw events. |
/DataFlows/{Env}/SNOWFLAKE_RAWDATA_FIREHOSE_SCHEMA |
String | Snowflake Schema that has the legacy Raw events. |
/DataFlows/{Env}/SNOWFLAKE_SNAPSHOT_DB |
String | Snowflake DB that has the legacy Raw Snapshot events. |
/DataFlows/{Env}/SNOWFLAKE_SNAPSHOT_FIREHOSE_SCHEMA |
String | Snowflake Schema that has the legacy Raw Snapshot events. |
- Data validation
- Persist Prefect results to S3
- Automated integration tests
- Python linter
- Sentry integration
- Switch to the LocalDaskExecutor to allow tasks to be executed in parallel
- Experimental cloud account: https://cloud.prefect.io/mathijs-getpocket-com-s-account
- Running Prefect locally
- Running Prefect on AWS