-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #2526 from Clearcover/feature/task_dbt
DbtShellTask
- Loading branch information
Showing
6 changed files
with
208 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
""" | ||
This module contains a task for interacting with dbt via the shell. | ||
""" | ||
|
||
try: | ||
from prefect.tasks.dbt.dbt import DbtShellTask | ||
except ImportError: | ||
raise ImportError("Using `prefect.tasks.dbt` requires dbt to be installed.") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,119 @@ | ||
import os | ||
import yaml | ||
from typing import Any | ||
|
||
from prefect.tasks.shell import ShellTask | ||
from prefect.utilities.tasks import defaults_from_attrs | ||
|
||
|
||
class DbtShellTask(ShellTask): | ||
""" | ||
Task for running dbt commands. It will create a profiles.yml file prior to running dbt commands. | ||
Args: | ||
- command (string, optional): dbt command to be executed; can also be | ||
provided post-initialization by calling this task instance | ||
- dbt_kwargs (dict, optional): keyword arguments used to populate the profiles.yml file (e.g. | ||
`{'type': 'snowflake', 'threads': 4, 'account': '...'}`); can also be provided at runtime | ||
- env (dict, optional): dictionary of environment variables to use for | ||
the subprocess; can also be provided at runtime | ||
- environment (string, optional): The default target your dbt project will use | ||
- helper_script (str, optional): a string representing a shell script, which | ||
will be executed prior to the `command` in the same process. Can be used to | ||
change directories, define helper functions, etc. when re-using this Task | ||
for different commands in a Flow | ||
- overwrite_profiles (boolean, optional): flag to indicate whether existing profiles.yml file | ||
should be overwritten; defaults to `False` | ||
- profile_name (string, optional): Profile name used for populating the profile name of profiles.yml | ||
- profiles_dir (string, optional): path to directory where the profile.yml file will be contained | ||
- set_profiles_envar (boolean, optional): flag to indicate whether DBT_PROFILES_DIR should be set to the | ||
provided profiles_dir; defaults to `True` | ||
- shell (string, optional): shell to run the command with; defaults to "bash" | ||
- return_all (bool, optional): boolean specifying whether this task should return all lines of stdout | ||
as a list, or just the last line as a string; defaults to `False` | ||
- **kwargs: additional keyword arguments to pass to the Task constructor | ||
Example: | ||
```python | ||
from prefect import Flow | ||
from ccde.prefect.tasks.dbt import DbtShellTask | ||
with Flow(name="dbt_flow") as f: | ||
task = DbtShellTask( | ||
profile_name='default', | ||
environment='test', | ||
dbt_kwargs={'type': 'snowflake', 'threads': 1, 'account': 'account.us-east-1'}, | ||
overwrite_profiles=True, | ||
profiles_dir=test_path)(command='dbt run') | ||
out = f.run() | ||
``` | ||
""" | ||
|
||
def __init__( | ||
self, | ||
profile_name: str = None, | ||
environment: str = None, | ||
overwrite_profiles: bool = False, | ||
profiles_dir: str = None, | ||
set_profiles_envar: bool = True, | ||
dbt_kwargs: dict = None, | ||
**kwargs: Any | ||
): | ||
self.profile_name = profile_name | ||
self.environment = environment | ||
self.overwrite_profiles = overwrite_profiles | ||
self.profiles_dir = profiles_dir | ||
self.set_profiles_envar = set_profiles_envar | ||
self.dbt_kwargs = dbt_kwargs | ||
super().__init__(**kwargs) | ||
|
||
@defaults_from_attrs("command", "env", "dbt_kwargs") | ||
def run( | ||
self, command: str = None, env: dict = None, dbt_kwargs: dict = None | ||
) -> str: | ||
""" | ||
If no profiles.yml file is found or if overwrite_profiles flag is set to True, this will first generate | ||
a profiles.yml file in the profiles_dir directory. Then run the dbt cli shell command. | ||
Args: | ||
- command (string): shell command to be executed; can also be | ||
provided at task initialization. Any variables / functions defined in | ||
`self.helper_script` will be available in the same process this command | ||
runs in | ||
- env (dict, optional): dictionary of environment variables to use for | ||
the subprocess | ||
- dbt_kwargs(dict, optional): keyword arguments used to populate the profiles.yml file | ||
Returns: | ||
- stdout (string): if `return_all` is `False` (the default), only the last line of stdout | ||
is returned, otherwise all lines are returned, which is useful for passing | ||
result of shell command to other downstream tasks. If there is no output, `None` is returned. | ||
Raises: | ||
- prefect.engine.signals.FAIL: if command has an exit code other | ||
than 0 | ||
""" | ||
profiles_exists = False | ||
if os.getenv("DBT_PROFILES_DIR"): | ||
dbt_profiles_dir = os.getenv("DBT_PROFILES_DIR") | ||
profiles_exists = os.path.exists( | ||
os.path.join(dbt_profiles_dir, "profiles.yml") | ||
) | ||
elif self.profiles_dir: | ||
profiles_exists = os.path.exists( | ||
os.path.join(self.profiles_dir, "profiles.yml") | ||
) | ||
|
||
dbt_kwargs = {**self.dbt_kwargs, **(dbt_kwargs or {})} | ||
|
||
if self.overwrite_profiles or not profiles_exists: | ||
profile = { | ||
self.profile_name: { | ||
"outputs": {self.environment: dbt_kwargs}, | ||
"target": self.environment, | ||
} | ||
} | ||
|
||
profile_path = os.path.join(self.profiles_dir, "profiles.yml") | ||
|
||
with open(profile_path, "w+") as yaml_file: | ||
yaml.dump(profile, yaml_file, default_flow_style=False) | ||
|
||
if self.set_profiles_envar: | ||
os.environ["DBT_PROFILES_DIR"] = self.profiles_dir | ||
|
||
super(DbtShellTask, self).run(command=command, env=env) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
from shutil import which | ||
import pytest | ||
|
||
if not which("dbt"): | ||
pytest.mark.skip("dbt not installed") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,73 @@ | ||
import os | ||
import pathlib | ||
import sys | ||
|
||
import pytest | ||
|
||
from prefect import Flow | ||
from prefect.tasks.dbt import DbtShellTask | ||
|
||
|
||
pytestmark = pytest.mark.skipif( | ||
sys.platform == "win32", reason="DbtShellTask currently not supported on Windows" | ||
) | ||
|
||
|
||
def test_shell_creates_profiles_yml_file(tmpdir): | ||
dbt_dir = tmpdir.mkdir("dbt") | ||
with Flow(name="test") as f: | ||
task = DbtShellTask( | ||
profile_name="default", | ||
environment="test", | ||
dbt_kwargs={ | ||
"type": "snowflake", | ||
"threads": 1, | ||
"account": "JH72176.us-east-1", | ||
"user": "jane@company.com", | ||
"role": "analyst", | ||
"database": "staging", | ||
"warehouse": "data_science", | ||
"schema": "analysis", | ||
"private_key_path": "/src/private_key.p8", | ||
"private_key_passphrase": "password123", | ||
}, | ||
overwrite_profiles=True, | ||
profiles_dir=str(dbt_dir), | ||
)(command="ls") | ||
|
||
out = f.run() | ||
profiles_path = dbt_dir.join("profiles.yml") | ||
assert out.is_successful() | ||
assert os.path.exists(profiles_path) | ||
|
||
|
||
def test_shell_uses_dbt_envar(tmpdir, monkeypatch): | ||
dbt_project_path = tmpdir.mkdir("dbt_project") | ||
monkeypatch.setenv("DBT_PROFILES_DIR", str(dbt_project_path)) | ||
real_profiles_path = dbt_project_path.join("profiles.yml") | ||
open(real_profiles_path, "a").close() | ||
|
||
with Flow(name="test") as f: | ||
task = DbtShellTask( | ||
profile_name="default", | ||
environment="test", | ||
dbt_kwargs={ | ||
"type": "snowflake", | ||
"threads": 1, | ||
"account": "JH72176.us-east-1", | ||
"user": "jane@company.com", | ||
"role": "analyst", | ||
"database": "staging", | ||
"warehouse": "data_science", | ||
"schema": "analysis", | ||
"private_key_path": "/src/private_key.p8", | ||
"private_key_passphrase": "password123", | ||
}, | ||
overwrite_profiles=False, | ||
profiles_dir=str(tmpdir), | ||
)(command="ls") | ||
|
||
out = f.run() | ||
missing_profiles_path = tmpdir.join("profiles.yml") | ||
assert out.is_successful() | ||
assert not os.path.exists(missing_profiles_path) |