-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
DbtShellTask #2526
Merged
Merged
DbtShellTask #2526
Changes from all commits
Commits
Show all changes
12 commits
Select commit
Hold shift + click to select a range
148d8b3
contributing dbtshelltask
376cbad
contributing dbtshelltask
298f50c
contributing dbtshelltask
b431524
contributing dbtshelltask
5fa1288
contributing dbtshelltask
548cdf4
contributing dbtshelltask
ffab647
test updates based on feedback
1f78811
Merge branch 'master' into feature/task_dbt
ddff299
casting posixpath to str
d53fd6f
Merge branch 'feature/task_dbt' of github.com:Clearcover/prefect into…
48bb067
removing dbt dependency
3313ce9
skip test if dbt not installed
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
""" | ||
This module contains a task for interacting with dbt via the shell. | ||
""" | ||
|
||
try: | ||
from prefect.tasks.dbt.dbt import DbtShellTask | ||
except ImportError: | ||
raise ImportError("Using `prefect.tasks.dbt` requires dbt to be installed.") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,119 @@ | ||
import os | ||
import yaml | ||
from typing import Any | ||
|
||
from prefect.tasks.shell import ShellTask | ||
from prefect.utilities.tasks import defaults_from_attrs | ||
|
||
|
||
class DbtShellTask(ShellTask): | ||
""" | ||
Task for running dbt commands. It will create a profiles.yml file prior to running dbt commands. | ||
Args: | ||
- command (string, optional): dbt command to be executed; can also be | ||
provided post-initialization by calling this task instance | ||
- dbt_kwargs (dict, optional): keyword arguments used to populate the profiles.yml file (e.g. | ||
`{'type': 'snowflake', 'threads': 4, 'account': '...'}`); can also be provided at runtime | ||
- env (dict, optional): dictionary of environment variables to use for | ||
the subprocess; can also be provided at runtime | ||
- environment (string, optional): The default target your dbt project will use | ||
- helper_script (str, optional): a string representing a shell script, which | ||
will be executed prior to the `command` in the same process. Can be used to | ||
change directories, define helper functions, etc. when re-using this Task | ||
for different commands in a Flow | ||
- overwrite_profiles (boolean, optional): flag to indicate whether existing profiles.yml file | ||
should be overwritten; defaults to `False` | ||
- profile_name (string, optional): Profile name used for populating the profile name of profiles.yml | ||
- profiles_dir (string, optional): path to directory where the profile.yml file will be contained | ||
- set_profiles_envar (boolean, optional): flag to indicate whether DBT_PROFILES_DIR should be set to the | ||
provided profiles_dir; defaults to `True` | ||
- shell (string, optional): shell to run the command with; defaults to "bash" | ||
- return_all (bool, optional): boolean specifying whether this task should return all lines of stdout | ||
as a list, or just the last line as a string; defaults to `False` | ||
- **kwargs: additional keyword arguments to pass to the Task constructor | ||
Example: | ||
```python | ||
from prefect import Flow | ||
from ccde.prefect.tasks.dbt import DbtShellTask | ||
with Flow(name="dbt_flow") as f: | ||
task = DbtShellTask( | ||
profile_name='default', | ||
environment='test', | ||
dbt_kwargs={'type': 'snowflake', 'threads': 1, 'account': 'account.us-east-1'}, | ||
overwrite_profiles=True, | ||
profiles_dir=test_path)(command='dbt run') | ||
out = f.run() | ||
``` | ||
""" | ||
|
||
def __init__( | ||
self, | ||
profile_name: str = None, | ||
environment: str = None, | ||
overwrite_profiles: bool = False, | ||
profiles_dir: str = None, | ||
set_profiles_envar: bool = True, | ||
dbt_kwargs: dict = None, | ||
**kwargs: Any | ||
): | ||
self.profile_name = profile_name | ||
self.environment = environment | ||
self.overwrite_profiles = overwrite_profiles | ||
self.profiles_dir = profiles_dir | ||
self.set_profiles_envar = set_profiles_envar | ||
self.dbt_kwargs = dbt_kwargs | ||
super().__init__(**kwargs) | ||
|
||
@defaults_from_attrs("command", "env", "dbt_kwargs") | ||
def run( | ||
self, command: str = None, env: dict = None, dbt_kwargs: dict = None | ||
) -> str: | ||
""" | ||
If no profiles.yml file is found or if overwrite_profiles flag is set to True, this will first generate | ||
a profiles.yml file in the profiles_dir directory. Then run the dbt cli shell command. | ||
Args: | ||
- command (string): shell command to be executed; can also be | ||
provided at task initialization. Any variables / functions defined in | ||
`self.helper_script` will be available in the same process this command | ||
runs in | ||
- env (dict, optional): dictionary of environment variables to use for | ||
the subprocess | ||
- dbt_kwargs(dict, optional): keyword arguments used to populate the profiles.yml file | ||
Returns: | ||
- stdout (string): if `return_all` is `False` (the default), only the last line of stdout | ||
is returned, otherwise all lines are returned, which is useful for passing | ||
result of shell command to other downstream tasks. If there is no output, `None` is returned. | ||
Raises: | ||
- prefect.engine.signals.FAIL: if command has an exit code other | ||
than 0 | ||
""" | ||
profiles_exists = False | ||
if os.getenv("DBT_PROFILES_DIR"): | ||
dbt_profiles_dir = os.getenv("DBT_PROFILES_DIR") | ||
profiles_exists = os.path.exists( | ||
os.path.join(dbt_profiles_dir, "profiles.yml") | ||
) | ||
elif self.profiles_dir: | ||
profiles_exists = os.path.exists( | ||
os.path.join(self.profiles_dir, "profiles.yml") | ||
) | ||
|
||
dbt_kwargs = {**self.dbt_kwargs, **(dbt_kwargs or {})} | ||
|
||
if self.overwrite_profiles or not profiles_exists: | ||
profile = { | ||
self.profile_name: { | ||
"outputs": {self.environment: dbt_kwargs}, | ||
"target": self.environment, | ||
} | ||
} | ||
|
||
profile_path = os.path.join(self.profiles_dir, "profiles.yml") | ||
|
||
with open(profile_path, "w+") as yaml_file: | ||
yaml.dump(profile, yaml_file, default_flow_style=False) | ||
|
||
if self.set_profiles_envar: | ||
os.environ["DBT_PROFILES_DIR"] = self.profiles_dir | ||
|
||
super(DbtShellTask, self).run(command=command, env=env) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
from shutil import which | ||
import pytest | ||
|
||
if not which("dbt"): | ||
pytest.mark.skip("dbt not installed") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,73 @@ | ||
import os | ||
import pathlib | ||
import sys | ||
|
||
import pytest | ||
|
||
from prefect import Flow | ||
from prefect.tasks.dbt import DbtShellTask | ||
|
||
|
||
pytestmark = pytest.mark.skipif( | ||
sys.platform == "win32", reason="DbtShellTask currently not supported on Windows" | ||
) | ||
|
||
|
||
def test_shell_creates_profiles_yml_file(tmpdir): | ||
dbt_dir = tmpdir.mkdir("dbt") | ||
with Flow(name="test") as f: | ||
task = DbtShellTask( | ||
profile_name="default", | ||
environment="test", | ||
dbt_kwargs={ | ||
"type": "snowflake", | ||
"threads": 1, | ||
"account": "JH72176.us-east-1", | ||
"user": "jane@company.com", | ||
"role": "analyst", | ||
"database": "staging", | ||
"warehouse": "data_science", | ||
"schema": "analysis", | ||
"private_key_path": "/src/private_key.p8", | ||
"private_key_passphrase": "password123", | ||
}, | ||
overwrite_profiles=True, | ||
profiles_dir=str(dbt_dir), | ||
)(command="ls") | ||
|
||
out = f.run() | ||
profiles_path = dbt_dir.join("profiles.yml") | ||
assert out.is_successful() | ||
assert os.path.exists(profiles_path) | ||
|
||
|
||
def test_shell_uses_dbt_envar(tmpdir, monkeypatch): | ||
dbt_project_path = tmpdir.mkdir("dbt_project") | ||
monkeypatch.setenv("DBT_PROFILES_DIR", str(dbt_project_path)) | ||
real_profiles_path = dbt_project_path.join("profiles.yml") | ||
open(real_profiles_path, "a").close() | ||
|
||
with Flow(name="test") as f: | ||
task = DbtShellTask( | ||
profile_name="default", | ||
environment="test", | ||
dbt_kwargs={ | ||
"type": "snowflake", | ||
"threads": 1, | ||
"account": "JH72176.us-east-1", | ||
"user": "jane@company.com", | ||
"role": "analyst", | ||
"database": "staging", | ||
"warehouse": "data_science", | ||
"schema": "analysis", | ||
"private_key_path": "/src/private_key.p8", | ||
"private_key_passphrase": "password123", | ||
}, | ||
overwrite_profiles=False, | ||
profiles_dir=str(tmpdir), | ||
)(command="ls") | ||
|
||
out = f.run() | ||
missing_profiles_path = tmpdir.join("profiles.yml") | ||
assert out.is_successful() | ||
assert not os.path.exists(missing_profiles_path) |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
clever