Skip to content
This repository was archived by the owner on May 17, 2024. It is now read-only.

Commit 66200ad

Browse files
committed
Add 'where' argument to Python API.
1 parent d01e74b commit 66200ad

File tree

10 files changed

+87
-29
lines changed

10 files changed

+87
-29
lines changed

.github/workflows/ci.yml

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -43,13 +43,14 @@ jobs:
4343
run: "poetry install"
4444

4545
# BigQuery start
46-
- id: 'auth'
47-
uses: 'google-github-actions/auth@v1'
48-
with:
49-
credentials_json: '${{ secrets.GOOGLE_CREDENTIALS }}'
5046

51-
- name: 'Set up BigQuery Cloud SDK'
52-
uses: 'google-github-actions/setup-gcloud@v1'
47+
# - id: 'auth'
48+
# uses: 'google-github-actions/auth@v1'
49+
# with:
50+
# credentials_json: '${{ secrets.GOOGLE_CREDENTIALS }}'
51+
52+
# - name: 'Set up BigQuery Cloud SDK'
53+
# uses: 'google-github-actions/setup-gcloud@v1'
5354

5455
# - name: 'Use gcloud CLI'
5556
# run: "gcloud config configurations list"
@@ -64,9 +65,10 @@ jobs:
6465
DATADIFF_SNOWFLAKE_URI: '${{ secrets.DATADIFF_SNOWFLAKE_URI }}'
6566
DATADIFF_PRESTO_URI: '${{ secrets.DATADIFF_PRESTO_URI }}'
6667
DATADIFF_TRINO_URI: '${{ secrets.DATADIFF_TRINO_URI }}'
67-
DATADIFF_BIGQUERY_URI: '${{ secrets.DATADIFF_BIGQUERY_URI }}'
68+
# DATADIFF_BIGQUERY_URI: '${{ secrets.DATADIFF_BIGQUERY_URI }}'
6869
DATADIFF_CLICKHOUSE_URI: 'clickhouse://clickhouse:Password1@localhost:9000/clickhouse'
6970
DATADIFF_VERTICA_URI: 'vertica://vertica:Password1@localhost:5433/vertica'
71+
DATADIFF_REDSHIFT_URI: '${{ secrets.DATADIFF_REDSHIFT_URI }}'
7072
run: |
7173
chmod +x tests/waiting_for_stack_up.sh
7274
./tests/waiting_for_stack_up.sh && TEST_ACROSS_ALL_DBS=0 poetry run unittest-parallel -j 16

.github/workflows/ci_full.yml

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,12 @@ on:
88
# - '!dev/**'
99
pull_request:
1010
branches: [ master ]
11-
1211
workflow_dispatch:
1312

13+
permissions:
14+
id-token: write # This is required for requesting the JWT
15+
contents: read # This is required for actions/checkout
16+
1417
jobs:
1518
unit_tests:
1619
strategy:
@@ -40,19 +43,16 @@ jobs:
4043
run: "poetry install"
4144

4245
# BigQuery start
43-
- id: 'auth'
44-
uses: 'google-github-actions/auth@v1'
45-
with:
46-
credentials_json: '${{ secrets.GOOGLE_CREDENTIALS }}'
47-
48-
- name: 'Set up BigQuery Cloud SDK'
49-
uses: 'google-github-actions/setup-gcloud@v1'
46+
# - id: 'auth'
47+
# uses: 'google-github-actions/auth@v1'
48+
# with:
49+
# credentials_json: '${{ secrets.GOOGLE_CREDENTIALS }}'
5050

51-
# - name: 'Use gcloud CLI'
52-
# run: "gcloud config configurations list"
51+
# - name: 'Set up BigQuery Cloud SDK'
52+
# uses: 'google-github-actions/setup-gcloud@v1'
5353

54-
- name: "Install BigQuery for Python"
55-
run: poetry add google-cloud-bigquery
54+
# - name: "Install BigQuery for Python"
55+
# run: poetry add google-cloud-bigquery
5656

5757
# BigQuery end
5858

@@ -62,7 +62,8 @@ jobs:
6262
DATADIFF_PRESTO_URI: '${{ secrets.DATADIFF_PRESTO_URI }}'
6363
DATADIFF_CLICKHOUSE_URI: 'clickhouse://clickhouse:Password1@localhost:9000/clickhouse'
6464
DATADIFF_VERTICA_URI: 'vertica://vertica:Password1@localhost:5433/vertica'
65-
DATADIFF_BIGQUERY_URI: '${{ secrets.DATADIFF_BIGQUERY_URI }}'
65+
# DATADIFF_BIGQUERY_URI: '${{ secrets.DATADIFF_BIGQUERY_URI }}'
66+
DATADIFF_REDSHIFT_URI: '${{ secrets.DATADIFF_REDSHIFT_URI }}'
6667
run: |
6768
chmod +x tests/waiting_for_stack_up.sh
6869
./tests/waiting_for_stack_up.sh && poetry run unittest-parallel -j 16

data_diff/__init__.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,8 @@ def diff_tables(
6262
max_threadpool_size: Optional[int] = 1,
6363
# Algorithm
6464
algorithm: Algorithm = Algorithm.AUTO,
65+
# An additional 'where' expression to restrict the search space.
66+
where: str = None,
6567
# Into how many segments to bisect per iteration (hashdiff only)
6668
bisection_factor: int = DEFAULT_BISECTION_FACTOR,
6769
# When should we stop bisecting and compare locally (in row count; hashdiff only)
@@ -92,6 +94,7 @@ def diff_tables(
9294
max_threadpool_size (int): Maximum size of each threadpool. ``None`` means auto.
9395
Only relevant when `threaded` is ``True``.
9496
There may be many pools, so number of actual threads can be a lot higher.
97+
where (str, optional): An additional 'where' expression to restrict the search space.
9598
algorithm (:class:`Algorithm`): Which diffing algorithm to use (`HASHDIFF` or `JOINDIFF`. Default=`AUTO`)
9699
bisection_factor (int): Into how many segments to bisect per iteration. (Used when algorithm is `HASHDIFF`)
97100
bisection_threshold (Number): Minimal row count of segment to bisect, otherwise download
@@ -106,7 +109,7 @@ def diff_tables(
106109
107110
Note:
108111
The following parameters are used to override the corresponding attributes of the given :class:`TableSegment` instances:
109-
`key_columns`, `update_column`, `extra_columns`, `min_key`, `max_key`.
112+
`key_columns`, `update_column`, `extra_columns`, `min_key`, `max_key`, `where`.
110113
If different values are needed per table, it's possible to omit them here, and instead set
111114
them directly when creating each :class:`TableSegment`.
112115
@@ -135,6 +138,7 @@ def diff_tables(
135138
max_key=max_key,
136139
min_update=min_update,
137140
max_update=max_update,
141+
where=where,
138142
).items()
139143
if v is not None
140144
}

data_diff/__main__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
from .databases import connect
2121
from .parse_time import parse_time_before, UNITS_STR, ParseError
2222
from .config import apply_config_from_file
23-
from .tracking import disable_tracking
23+
from .tracking import disable_tracking, set_entrypoint_name
2424
from .version import __version__
2525

2626

@@ -32,6 +32,7 @@
3232
"-": "red",
3333
}
3434

35+
set_entrypoint_name("CLI")
3536

3637
def _remove_passwords_in_dict(d: dict):
3738
for k, v in d.items():

data_diff/sqeleton/databases/bigquery.py

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -145,17 +145,30 @@ def close(self):
145145
self._client.close()
146146

147147
def select_table_schema(self, path: DbPath) -> str:
148-
schema, name = self._normalize_table_path(path)
149-
148+
project, schema, name = self._normalize_table_path(path)
150149
return (
151150
"SELECT column_name, data_type, 6 as datetime_precision, 38 as numeric_precision, 9 as numeric_scale "
152-
f"FROM {schema}.INFORMATION_SCHEMA.COLUMNS "
151+
f"FROM `{project}`.`{schema}`.INFORMATION_SCHEMA.COLUMNS "
153152
f"WHERE table_name = '{name}' AND table_schema = '{schema}'"
154153
)
155154

156155
def query_table_unique_columns(self, path: DbPath) -> List[str]:
157156
return []
158157

158+
def _normalize_table_path(self, path: DbPath) -> DbPath:
159+
if len(path) == 0:
160+
raise ValueError(f"{self.name}: Bad table path for {self}: ()")
161+
elif len(path) == 1:
162+
return (self.project, self.default_schema, path[0])
163+
elif len(path) == 2:
164+
return (self.project,) + path
165+
elif len(path) == 3:
166+
return path
167+
else:
168+
raise ValueError(
169+
f"{self.name}: Bad table path for {self}: '{'.'.join(path)}'. Expected form: [project.]schema.table"
170+
)
171+
159172
def parse_table_name(self, name: str) -> DbPath:
160173
path = parse_table_name(name)
161174
return tuple(i for i in self._normalize_table_path(path) if i is not None)

data_diff/sqeleton/databases/databricks.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ def set_timezone_to_utc(self) -> str:
8888

8989
class Databricks(ThreadedDatabase):
9090
dialect = Dialect()
91-
CONNECT_URI_HELP = "databricks://:<access_token>@<server_name>/<http_path>"
91+
CONNECT_URI_HELP = "databricks://:<access_token>@<server_hostname>/<http_path>"
9292
CONNECT_URI_PARAMS = ["catalog", "schema"]
9393

9494
def __init__(self, *, thread_count, **kw):

data_diff/sqeleton/databases/redshift.py

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from typing import List
1+
from typing import List, Dict
22
from ..abcs.database_types import Float, TemporalType, FractionalType, DbPath
33
from ..abcs.mixins import AbstractMixin_MD5
44
from .postgresql import (
@@ -70,3 +70,31 @@ def select_table_schema(self, path: DbPath) -> str:
7070
"SELECT column_name, data_type, datetime_precision, numeric_precision, numeric_scale FROM information_schema.columns "
7171
f"WHERE table_name = '{table.lower()}' AND table_schema = '{schema.lower()}'"
7272
)
73+
74+
def select_external_table_schema(self, path: DbPath) -> str:
75+
schema, table = self._normalize_table_path(path)
76+
77+
return f"""SELECT
78+
columnname AS column_name
79+
, CASE WHEN external_type = 'string' THEN 'varchar' ELSE external_type END AS data_type
80+
, NULL AS datetime_precision
81+
, NULL AS numeric_precision
82+
, NULL AS numeric_scale
83+
FROM svv_external_columns
84+
WHERE tablename = '{table.lower()}' AND schemaname = '{schema.lower()}'
85+
"""
86+
87+
def query_external_table_schema(self, path: DbPath) -> Dict[str, tuple]:
88+
rows = self.query(self.select_external_table_schema(path), list)
89+
if not rows:
90+
raise RuntimeError(f"{self.name}: Table '{'.'.join(path)}' does not exist, or has no columns")
91+
92+
d = {r[0]: r for r in rows}
93+
assert len(d) == len(rows)
94+
return d
95+
96+
def query_table_schema(self, path: DbPath) -> Dict[str, tuple]:
97+
try:
98+
return super().query_table_schema(path)
99+
except RuntimeError:
100+
return self.query_external_table_schema(path)

data_diff/table_segment.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,8 @@ class TableSegment:
5050
max_key: DbKey = None
5151
min_update: DbTime = None
5252
max_update: DbTime = None
53-
5453
where: str = None
54+
5555
case_sensitive: bool = True
5656
_schema: Schema = None
5757

data_diff/tracking.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ def _load_profile():
4040
g_tracking_enabled = True
4141
g_anonymous_id = None
4242

43+
entrypoint_name = "Python API"
44+
4345

4446
def disable_tracking():
4547
global g_tracking_enabled
@@ -50,6 +52,11 @@ def is_tracking_enabled():
5052
return g_tracking_enabled
5153

5254

55+
def set_entrypoint_name(s):
56+
global entrypoint_name
57+
entrypoint_name = s
58+
59+
5360
def get_anonymous_id():
5461
global g_anonymous_id
5562
if g_anonymous_id is None:
@@ -70,6 +77,7 @@ def create_start_event_json(diff_options: Dict[str, Any]):
7077
"python_version": f"{platform.python_version()}/{platform.python_implementation()}",
7178
"diff_options": diff_options,
7279
"data_diff_version:": __version__,
80+
"entrypoint_name": entrypoint_name,
7381
},
7482
}
7583

@@ -99,6 +107,7 @@ def create_end_event_json(
99107
"diff_rows_cnt": diff_count,
100108
"error_message": error,
101109
"data_diff_version:": __version__,
110+
"entrypoint_name": entrypoint_name,
102111
},
103112
}
104113

tests/common.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
TEST_SNOWFLAKE_CONN_STRING: str = os.environ.get("DATADIFF_SNOWFLAKE_URI") or None
2626
TEST_PRESTO_CONN_STRING: str = os.environ.get("DATADIFF_PRESTO_URI") or None
2727
TEST_BIGQUERY_CONN_STRING: str = os.environ.get("DATADIFF_BIGQUERY_URI") or None
28-
TEST_REDSHIFT_CONN_STRING: str = None
28+
TEST_REDSHIFT_CONN_STRING: str = os.environ.get("DATADIFF_REDSHIFT_URI") or None
2929
TEST_ORACLE_CONN_STRING: str = None
3030
TEST_DATABRICKS_CONN_STRING: str = os.environ.get("DATADIFF_DATABRICKS_URI")
3131
TEST_TRINO_CONN_STRING: str = os.environ.get("DATADIFF_TRINO_URI") or None

0 commit comments

Comments
 (0)