Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DbtShellTask #2526

Merged
merged 12 commits into from
May 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ These changes are available in the [master branch](https://github.com/PrefectHQ/
- `flow.visualize` is now able to accept a `format` argument to specify the output file type - [#2447](https://github.com/PrefectHQ/prefect/issues/2447)

### Task Library
- `DbtShellTask` - an extension of ShellTask for working with data build tool (dbt)

- Add `prefect.tasks.gcp.bigquery.BigQueryLoadFile` [#2423](https://github.com/PrefectHQ/prefect/issues/2423)

Expand All @@ -72,6 +73,7 @@ These changes are available in the [master branch](https://github.com/PrefectHQ/

- [Alvin Goh](https://github.com/chuehsien)
- [Daniel Kapitan](https://github.com/dkapitan)
- [Mark McDonald](https://github.com/mhmcdonald)

## 0.10.7 <Badge text="beta" type="success"/>

Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def run(self):
"pushbullet": ["pushbullet.py >= 0.11.0"],
"redis": ["redis >= 3.2.1"],
"rss": ["feedparser >= 5.0.1, < 6.0"],
"snowflake": ["snowflake-connector-python >= 1.8.2, < 2.0"],
"snowflake": ["snowflake-connector-python >= 1.8.2, < 2.5"],
"spacy": ["spacy >= 2.0.0, < 3.0.0"],
"templates": ["jinja2 >= 2.0, < 3.0"],
"test": test_requires,
Expand Down
8 changes: 8 additions & 0 deletions src/prefect/tasks/dbt/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
"""
This module contains a task for interacting with dbt via the shell.
"""

try:
from prefect.tasks.dbt.dbt import DbtShellTask
except ImportError:
raise ImportError("Using `prefect.tasks.dbt` requires dbt to be installed.")
119 changes: 119 additions & 0 deletions src/prefect/tasks/dbt/dbt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
import os
import yaml
from typing import Any

from prefect.tasks.shell import ShellTask
from prefect.utilities.tasks import defaults_from_attrs


class DbtShellTask(ShellTask):
"""
Task for running dbt commands. It will create a profiles.yml file prior to running dbt commands.
Args:
- command (string, optional): dbt command to be executed; can also be
provided post-initialization by calling this task instance
- dbt_kwargs (dict, optional): keyword arguments used to populate the profiles.yml file (e.g.
`{'type': 'snowflake', 'threads': 4, 'account': '...'}`); can also be provided at runtime
- env (dict, optional): dictionary of environment variables to use for
the subprocess; can also be provided at runtime
- environment (string, optional): The default target your dbt project will use
- helper_script (str, optional): a string representing a shell script, which
will be executed prior to the `command` in the same process. Can be used to
change directories, define helper functions, etc. when re-using this Task
for different commands in a Flow
- overwrite_profiles (boolean, optional): flag to indicate whether existing profiles.yml file
should be overwritten; defaults to `False`
- profile_name (string, optional): Profile name used for populating the profile name of profiles.yml
- profiles_dir (string, optional): path to directory where the profile.yml file will be contained
- set_profiles_envar (boolean, optional): flag to indicate whether DBT_PROFILES_DIR should be set to the
provided profiles_dir; defaults to `True`
- shell (string, optional): shell to run the command with; defaults to "bash"
- return_all (bool, optional): boolean specifying whether this task should return all lines of stdout
as a list, or just the last line as a string; defaults to `False`
- **kwargs: additional keyword arguments to pass to the Task constructor
Example:
```python
from prefect import Flow
from ccde.prefect.tasks.dbt import DbtShellTask
with Flow(name="dbt_flow") as f:
task = DbtShellTask(
profile_name='default',
environment='test',
dbt_kwargs={'type': 'snowflake', 'threads': 1, 'account': 'account.us-east-1'},
overwrite_profiles=True,
profiles_dir=test_path)(command='dbt run')
out = f.run()
```
"""

def __init__(
self,
profile_name: str = None,
environment: str = None,
overwrite_profiles: bool = False,
profiles_dir: str = None,
set_profiles_envar: bool = True,
dbt_kwargs: dict = None,
**kwargs: Any
):
self.profile_name = profile_name
self.environment = environment
self.overwrite_profiles = overwrite_profiles
self.profiles_dir = profiles_dir
self.set_profiles_envar = set_profiles_envar
self.dbt_kwargs = dbt_kwargs
super().__init__(**kwargs)

@defaults_from_attrs("command", "env", "dbt_kwargs")
def run(
self, command: str = None, env: dict = None, dbt_kwargs: dict = None
) -> str:
"""
If no profiles.yml file is found or if overwrite_profiles flag is set to True, this will first generate
a profiles.yml file in the profiles_dir directory. Then run the dbt cli shell command.
Args:
- command (string): shell command to be executed; can also be
provided at task initialization. Any variables / functions defined in
`self.helper_script` will be available in the same process this command
runs in
- env (dict, optional): dictionary of environment variables to use for
the subprocess
- dbt_kwargs(dict, optional): keyword arguments used to populate the profiles.yml file
Returns:
- stdout (string): if `return_all` is `False` (the default), only the last line of stdout
is returned, otherwise all lines are returned, which is useful for passing
result of shell command to other downstream tasks. If there is no output, `None` is returned.
Raises:
- prefect.engine.signals.FAIL: if command has an exit code other
than 0
"""
profiles_exists = False
if os.getenv("DBT_PROFILES_DIR"):
dbt_profiles_dir = os.getenv("DBT_PROFILES_DIR")
profiles_exists = os.path.exists(
os.path.join(dbt_profiles_dir, "profiles.yml")
)
elif self.profiles_dir:
profiles_exists = os.path.exists(
os.path.join(self.profiles_dir, "profiles.yml")
)

dbt_kwargs = {**self.dbt_kwargs, **(dbt_kwargs or {})}

if self.overwrite_profiles or not profiles_exists:
profile = {
self.profile_name: {
"outputs": {self.environment: dbt_kwargs},
"target": self.environment,
}
}

profile_path = os.path.join(self.profiles_dir, "profiles.yml")

with open(profile_path, "w+") as yaml_file:
yaml.dump(profile, yaml_file, default_flow_style=False)

if self.set_profiles_envar:
os.environ["DBT_PROFILES_DIR"] = self.profiles_dir

super(DbtShellTask, self).run(command=command, env=env)
5 changes: 5 additions & 0 deletions tests/tasks/dbt/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from shutil import which
import pytest

if not which("dbt"):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

clever

pytest.mark.skip("dbt not installed")
73 changes: 73 additions & 0 deletions tests/tasks/dbt/test_dbt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import os
import pathlib
import sys

import pytest

from prefect import Flow
from prefect.tasks.dbt import DbtShellTask


pytestmark = pytest.mark.skipif(
sys.platform == "win32", reason="DbtShellTask currently not supported on Windows"
)


def test_shell_creates_profiles_yml_file(tmpdir):
dbt_dir = tmpdir.mkdir("dbt")
with Flow(name="test") as f:
task = DbtShellTask(
profile_name="default",
environment="test",
dbt_kwargs={
"type": "snowflake",
"threads": 1,
"account": "JH72176.us-east-1",
"user": "jane@company.com",
"role": "analyst",
"database": "staging",
"warehouse": "data_science",
"schema": "analysis",
"private_key_path": "/src/private_key.p8",
"private_key_passphrase": "password123",
},
overwrite_profiles=True,
profiles_dir=str(dbt_dir),
)(command="ls")

out = f.run()
profiles_path = dbt_dir.join("profiles.yml")
assert out.is_successful()
assert os.path.exists(profiles_path)


def test_shell_uses_dbt_envar(tmpdir, monkeypatch):
dbt_project_path = tmpdir.mkdir("dbt_project")
monkeypatch.setenv("DBT_PROFILES_DIR", str(dbt_project_path))
real_profiles_path = dbt_project_path.join("profiles.yml")
open(real_profiles_path, "a").close()

with Flow(name="test") as f:
task = DbtShellTask(
profile_name="default",
environment="test",
dbt_kwargs={
"type": "snowflake",
"threads": 1,
"account": "JH72176.us-east-1",
"user": "jane@company.com",
"role": "analyst",
"database": "staging",
"warehouse": "data_science",
"schema": "analysis",
"private_key_path": "/src/private_key.p8",
"private_key_passphrase": "password123",
},
overwrite_profiles=False,
profiles_dir=str(tmpdir),
)(command="ls")

out = f.run()
missing_profiles_path = tmpdir.join("profiles.yml")
assert out.is_successful()
assert not os.path.exists(missing_profiles_path)