Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added loading checks from a local and workspace file #60

Merged
merged 3 commits into from
Oct 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 31 additions & 7 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,17 +67,13 @@ def viz_type(self) -> str:

This section provides a step-by-step guide to set up and start working on the project. These steps will help you set up your project environment and dependencies for efficient development.

To begin, run `make dev` create the default environment and install development dependencies, assuming you've already cloned the github repo.
Go through the [prerequisites](./README.md#prerequisites) and clone the [dqx github repo](https://github.com/databrickslabs/dqx).

Run the following command to create the default environment and install development dependencies, assuming you've already cloned the github repo.
```shell
make dev
```

Verify installation with
```shell
make test
```

Before every commit, apply the consistent formatting of the code, as we want our codebase look consistent:
```shell
make fmt
Expand All @@ -86,12 +82,40 @@ make fmt
Before every commit, run automated bug detector (`make lint`) and unit tests (`make test`) to ensure that automated
pull request checks do pass, before your code is reviewed by others:
```shell
make lint
make test
```

Configure auth to Databricks workspace for integration testing by configuring credentials.

If you want to run the tests from an IDE you must setup `.env` or `~/.databricks/debug-env.json` file
(see [instructions](https://github.com/databrickslabs/pytester?tab=readme-ov-file#debug_env_name-fixture)).

To run the integration tests from a command line you need to setup environment variables first:
```shell
export DATABRICKS_CLIENT_ID=<client-id>
# set either service principal credentials
export DATABRICKS_CLIENT_SECRET=<client-secret>
export DATABRICKS_HOST=https://<workspace-url>
# or PAT token
export DATABRICKS_TOKEN=<pat-token>

make integration
```

Calculate test coverage and display report in html:
```shell
make coverage
```

## Running CLI from the local repo

Once you clone the repo you can use it to run the CLI command (see the examples below).
Once you clone the repo locally and install Databricks CLI you can run labs CLI commands.

Authenticate your current machine to your Databricks Workspace:
```commandline
databricks auth login --host <WORKSPACE_HOST>
```

Show info about the project:
```commandline
Expand Down
97 changes: 75 additions & 22 deletions README.md

Large diffs are not rendered by default.

Binary file modified docs/dqx_lakehouse.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified docs/dqx_quarantine.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
12 changes: 9 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@ readme = "README.md"
license-files = { paths = ["LICENSE", "NOTICE"] }
requires-python = ">=3.10"
keywords = ["Databricks"]
maintainers = [
{ name = "Serge Smertin", email = "serge.smertin@databricks.com" },
{ name = "Marcin Wojtyczka", email = "marcin.wojtyczka@databricks.com" },
{ name = "Alex Ott", email = "alex.ott@databricks.com" },
]
classifiers = [
"Development Status :: 3 - Alpha",
"License :: Other/Proprietary License",
Expand All @@ -14,7 +19,7 @@ classifiers = [
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: Implementation :: CPython",
]
dependencies = ["databricks-labs-blueprint[yaml]>=0.8,<0.9"]
dependencies = ["databricks-labs-blueprint[yaml]>=0.8,<=0.9", "databricks-sdk>=0.30"]

[project.urls]
Issues = "https://github.com/databrickslabs/dqx/issues"
Expand All @@ -39,7 +44,7 @@ dependencies = [
"pylint~=3.1.0",
"pylint-pytest==2.0.0a0",
"databricks-labs-pylint~=0.3.0",
"pytest~=8.1.0",
"pytest>=8.3",
"pytest-cov~=4.1.0",
"pytest-mock~=3.14.0",
"pytest-timeout~=2.3.1",
Expand All @@ -50,6 +55,7 @@ dependencies = [
"pyspark~=3.5.0",
"pytest-spark~=0.6.0",
"chispa~=0.9.4",
"databricks-labs-pytester~=0.3.1",
]

python="3.10"
Expand All @@ -59,7 +65,7 @@ path = ".venv"

[tool.hatch.envs.default.scripts]
test = "pytest -n 2 --cov src --cov-report=xml --timeout 30 tests/unit --durations 20"
coverage = "pytest -n 2 --cov src tests/unit --timeout 30 --cov-report=html --durations 20"
coverage = "pytest -n 2 --cov src tests/ --timeout 30 --cov-report=html --durations 20"
integration = "pytest -n 10 --cov src tests/integration --durations 20"
fmt = ["black .",
"ruff check . --fix",
Expand Down
63 changes: 60 additions & 3 deletions src/databricks/labs/dqx/engine.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import functools as ft
import itertools
import json
from pathlib import Path
from collections.abc import Callable
from dataclasses import dataclass, field
from enum import Enum
Expand All @@ -9,7 +11,11 @@
from pyspark.sql import Column, DataFrame
from databricks.labs.blueprint.entrypoint import get_logger
from databricks.labs.dqx import col_functions
from databricks.labs.blueprint.installation import Installation

from databricks.labs.dqx.config import WorkspaceConfig
from databricks.labs.dqx.utils import get_column_name
from databricks.sdk.errors import NotFound

logger = get_logger(__name__)

Expand Down Expand Up @@ -75,7 +81,7 @@ class DQRuleColSet:

columns: list[str]
check_func: Callable
criticality: str = "error"
criticality: str = Criticality.ERROR.value
check_func_args: list[Any] = field(default_factory=list)
check_func_kwargs: dict[str, Any] = field(default_factory=dict)

Expand Down Expand Up @@ -179,7 +185,7 @@ def apply_checks_and_split(df: DataFrame, checks: list[DQRule]) -> tuple[DataFra

def get_invalid(df: DataFrame) -> DataFrame:
"""
Get records that violate data quality checks.
Get records that violate data quality checks (records with warnings and errors).
@param df: input DataFrame.
@return: dataframe with error and warning rows and corresponding reporting columns.
"""
Expand All @@ -188,7 +194,7 @@ def get_invalid(df: DataFrame) -> DataFrame:

def get_valid(df: DataFrame) -> DataFrame:
"""
Get records that don't violate data quality checks.
Get records that don't violate data quality checks (records with warnings but no errors).
@param df: input DataFrame.
@return: dataframe with warning rows but no reporting columns.
"""
Expand Down Expand Up @@ -308,3 +314,54 @@ def build_checks(*rules_col_set: DQRuleColSet) -> list[DQRule]:
flat_rules = list(itertools.chain(*rules_nested))

return list(filter(None, flat_rules))


def load_checks_from_local_file(filename: str) -> list[dict]:
"""
Load checks (dq rules) from a file (json or yml) in the local file system.
The returning checks can be used as input for `apply_checks_by_metadata` function.

:param filename: file name / path containing the checks.
:return: list of dq rules
"""
if not filename:
raise ValueError("filename must be provided")

try:
checks = Installation.load_local(list[dict[str, str]], Path(filename))
return _convert_checks_as_string_to_dict(checks)
except FileNotFoundError:
msg = f"Checks file {filename} missing"
raise FileNotFoundError(msg) from None


def load_checks_from_file(installation: Installation) -> list[dict]:
"""
Load checks (dq rules) from a file (json or yml) defined in the installation config.
The returning checks can be used as input for `apply_checks_by_metadata` function.

:param installation: workspace installation object.
:return: list of dq rules
"""
config = installation.load(WorkspaceConfig)
filename = config.checks_file # use check file from the config

logger.info(f"Loading quality rules (checks) from {filename} in the workspace.")

try:
checks = installation.load(list[dict[str, str]], filename=filename)
return _convert_checks_as_string_to_dict(checks)
except NotFound:
msg = f"Checks file {filename} missing"
raise NotFound(msg) from None


def _convert_checks_as_string_to_dict(checks: list[dict[str, str]]) -> list[dict]:
"""
Convert the `check` field from a json string to a dictionary
@param checks: list of checks
@return:
"""
for item in checks:
item['check'] = json.loads(item['check'].replace("'", '"'))
return checks
9 changes: 2 additions & 7 deletions src/databricks/labs/dqx/install.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
import re
import webbrowser
from functools import cached_property

from requests.exceptions import ConnectionError as RequestsConnectionError
import databricks

from databricks.labs.blueprint.entrypoint import get_logger, is_in_debug
from databricks.labs.blueprint.installation import Installation, SerdeError
Expand All @@ -21,8 +21,6 @@
from databricks.labs.dqx.config import WorkspaceConfig
from databricks.labs.dqx.contexts.workspace_cli import WorkspaceContext

TAG_STEP = "step"
NUM_USER_ATTEMPTS = 10 # number of attempts user gets at answering a question

logger = logging.getLogger(__name__)
with_user_agent_extra("cmd", "install")
Expand Down Expand Up @@ -159,10 +157,7 @@ def _confirm_force_install(self) -> bool:
raise RuntimeWarning("DQX is already installed, but no confirmation")
if not self.installation.is_global() and self._force_install == "global":
# Logic for forced global install over user install
self.replace(
installation=Installation.assume_global(self.workspace_client, self.product_info.product_name())
)
return True
raise databricks.sdk.errors.NotImplemented("Migration needed. Not implemented yet.")
if self.installation.is_global() and self._force_install == "user":
# Logic for forced user install over global install
self.replace(
Expand Down
Loading
Loading