Skip to content
This repository was archived by the owner on May 17, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,57 @@ Options:
- `--bisection-factor` - Segments per iteration. When set to 2, it performs binary search.
- `--bisection-threshold` - Minimal bisection threshold. i.e. maximum size of pages to diff locally.
- `-j` or `--threads` - Number of worker threads to use per database. Default=1.
- `--conf`, `--run` - Specify the run and configuration from a TOML file. (see below)

### How to use with a configuration file

Data-diff lets you load the configuration for a run from a TOML file.

Reasons to use a configuration file:

- Convenience - Set-up the parameters for diffs that need to run often

- Easier and more readible - You can define the database connection settings as config values, instead of in a URI.

- Gives you fine-grained control over the settings switches, without requiring any Python code.

Use `--conf` to specify that path to the configuration file. data-diff will load the settings from `run.default`, if it's defined.

Then you can, optionally, use `--run` to choose to load the settings of a specific run, and override the settings `run.default`. (all runs extend `run.default`, like inheritance).

Finally, CLI switches have the final say, and will override the settings defined by the configuration file, and the current run.

Example TOML file:

```toml
# Specify the connection params to the test database.
[database.test_postgresql]
driver = "postgresql"
user = "postgres"
password = "Password1"

# Specify the default run params
[run.default]
update_column = "timestamp"
verbose = true

# Specify params for a run 'test_diff'.
[run.test_diff]
verbose = false
# Source 1 ("left")
1.database = "test_postgresql" # Use options from database.test_postgresql
1.table = "rating"
# Source 2 ("right")
2.database = "postgresql://postgres:Password1@/" # Use URI like in the CLI
2.table = "rating_del1"
```

In this example, running `data-diff --conf myconfig.toml --run test_diff` will compare between `rating` and `rating_del1`.
It will use the `timestamp` column as the update column, as specified in `run.default`. However, it won't be verbose, since that
flag is overwritten to `false`.

Running it with `data-diff --conf myconfig.toml --run test_diff -v` will set verbose back to `true`.


## How to use from Python

Expand Down
106 changes: 69 additions & 37 deletions data_diff/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@
DEFAULT_BISECTION_THRESHOLD,
DEFAULT_BISECTION_FACTOR,
)
from .databases.connect import connect_to_uri
from .databases.connect import connect
from .parse_time import parse_time_before_now, UNITS_STR, ParseError
from .config import apply_config_from_file

import rich
import click
Expand All @@ -26,19 +27,19 @@


@click.command()
@click.argument("db1_uri")
@click.argument("table1_name")
@click.argument("db2_uri")
@click.argument("table2_name")
@click.option("-k", "--key-column", default="id", help="Name of primary key column")
@click.argument("database1", required=False)
@click.argument("table1", required=False)
@click.argument("database2", required=False)
@click.argument("table2", required=False)
@click.option("-k", "--key-column", default=None, help="Name of primary key column. Default='id'.")
@click.option("-t", "--update-column", default=None, help="Name of updated_at/last_updated column")
@click.option("-c", "--columns", default=[], multiple=True, help="Names of extra columns to compare")
@click.option("-l", "--limit", default=None, help="Maximum number of differences to find")
@click.option("--bisection-factor", default=DEFAULT_BISECTION_FACTOR, help="Segments per iteration")
@click.option("--bisection-factor", default=None, help=f"Segments per iteration. Default={DEFAULT_BISECTION_FACTOR}.")
@click.option(
"--bisection-threshold",
default=DEFAULT_BISECTION_THRESHOLD,
help="Minimal bisection threshold. Below it, data-diff will download the data and compare it locally.",
default=None,
help=f"Minimal bisection threshold. Below it, data-diff will download the data and compare it locally. Default={DEFAULT_BISECTION_THRESHOLD}.",
)
@click.option(
"--min-age",
Expand All @@ -57,16 +58,32 @@
@click.option(
"-j",
"--threads",
default="1",
default=None,
help="Number of worker threads to use per database. Default=1. "
"A higher number will increase performance, but take more capacity from your database. "
"'serial' guarantees a single-threaded execution of the algorithm (useful for debugging).",
)
def main(
db1_uri,
table1_name,
db2_uri,
table2_name,
@click.option(
"--conf",
default=None,
help="Path to a configuration.toml file, to provide a default configuration, and a list of possible runs.",
)
@click.option(
"--run",
default=None,
help="Name of run-configuration to run. If used, CLI arguments for database and table must be omitted.",
)
def main(conf, run, **kw):
if conf:
kw = apply_config_from_file(conf, run, kw)
return _main(**kw)


def _main(
database1,
table1,
database2,
table2,
key_column,
update_column,
columns,
Expand All @@ -82,35 +99,50 @@ def main(
threads,
keep_column_case,
json_output,
threads1=None,
threads2=None,
__conf__=None,
):
if limit and stats:
print("Error: cannot specify a limit when using the -s/--stats switch")
return

if interactive:
debug = True

if debug:
logging.basicConfig(level=logging.DEBUG, format=LOG_FORMAT, datefmt=DATE_FORMAT)
if __conf__:
logging.debug(f"Applied run configuration: {__conf__}")
elif verbose:
logging.basicConfig(level=logging.INFO, format=LOG_FORMAT, datefmt=DATE_FORMAT)

if limit and stats:
logging.error("Cannot specify a limit when using the -s/--stats switch")
return

key_column = key_column or "id"
if bisection_factor is None:
bisection_factor = DEFAULT_BISECTION_FACTOR
if bisection_threshold is None:
bisection_threshold = DEFAULT_BISECTION_THRESHOLD

threaded = True
if threads is not None:
if threads.lower() == "serial":
threaded = False
threads = 1
else:
try:
threads = int(threads)
except ValueError:
logging.error("Error: threads must be a number, 'auto', or 'serial'.")
return
if threads < 1:
logging.error("Error: threads must be >= 1")
return

db1 = connect_to_uri(db1_uri, threads)
db2 = connect_to_uri(db2_uri, threads)
if threads is None:
threads = 1
elif isinstance(threads, str) and threads.lower() == "serial":
assert not (threads1 or threads2)
threaded = False
threads = 1
else:
try:
threads = int(threads)
except ValueError:
logging.error("Error: threads must be a number, or 'serial'.")
return
if threads < 1:
logging.error("Error: threads must be >= 1")
return

db1 = connect(database1, threads1 or threads)
db2 = connect(database2, threads2 or threads)

if interactive:
db1.enable_interactive()
Expand All @@ -128,8 +160,8 @@ def main(
logging.error("Error while parsing age expression: %s" % e)
return

table1 = TableSegment(db1, db1.parse_table_name(table1_name), key_column, update_column, columns, **options)
table2 = TableSegment(db2, db2.parse_table_name(table2_name), key_column, update_column, columns, **options)
table1_seg = TableSegment(db1, db1.parse_table_name(table1), key_column, update_column, columns, **options)
table2_seg = TableSegment(db2, db2.parse_table_name(table2), key_column, update_column, columns, **options)

differ = TableDiffer(
bisection_factor=bisection_factor,
Expand All @@ -138,7 +170,7 @@ def main(
max_threadpool_size=threads and threads * 2,
debug=debug,
)
diff_iter = differ.diff_tables(table1, table2)
diff_iter = differ.diff_tables(table1_seg, table2_seg)

if limit:
diff_iter = islice(diff_iter, int(limit))
Expand Down
73 changes: 73 additions & 0 deletions data_diff/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
from typing import Any, Dict
import toml


class ConfigParseError(Exception):
pass


def is_uri(s: str) -> bool:
return "://" in s


def _apply_config(config: Dict[str, Any], run_name: str, kw: Dict[str, Any]):
# Load config
databases = config.pop("database", {})
runs = config.pop("run", {})
if config:
raise ConfigParseError(f"Unknown option(s): {config}")

# Init run_args
run_args = runs.get("default") or {}
if run_name:
if run_name not in runs:
raise ConfigParseError(f"Cannot find run '{run_name}' in configuration.")
run_args.update(runs[run_name])
else:
run_name = "default"

# Process databases + tables
for index in "12":
args = run_args.pop(index, {})
for attr in ("database", "table"):
if attr not in args:
raise ConfigParseError(f"Running 'run.{run_name}': Connection #{index} in missing attribute '{attr}'.")

database = args.pop("database")
table = args.pop("table")
threads = args.pop("threads", None)
if args:
raise ConfigParseError(f"Unexpected attributes for connection #{index}: {args}")

if not is_uri(database):
if database not in databases:
raise ConfigParseError(
f"Database '{database}' not found in list of databases. Available: {list(databases)}."
)
database = dict(databases[database])
assert isinstance(database, dict)
if "driver" not in database:
raise ConfigParseError(f"Database '{database}' did not specify a driver.")

run_args[f"database{index}"] = database
run_args[f"table{index}"] = table
if threads is not None:
run_args[f"threads{index}"] = int(threads)

# Update keywords
new_kw = dict(kw) # Set defaults
new_kw.update(run_args) # Apply config
new_kw.update({k: v for k, v in kw.items() if v}) # Apply non-empty defaults

new_kw["__conf__"] = run_args

return new_kw


def apply_config_from_file(path: str, run_name: str, kw: Dict[str, Any]):
with open(path) as f:
return _apply_config(toml.load(f), run_name, kw)


def apply_config_from_string(toml_config: str, run_name: str, kw: Dict[str, Any]):
return _apply_config(toml.loads(toml_config), run_name, kw)
1 change: 1 addition & 0 deletions data_diff/databases/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ def __init__(self, thread_count=1):
self._init_error = None
self._queue = ThreadPoolExecutor(thread_count, initializer=self.set_conn)
self.thread_local = threading.local()
logger.info(f"[{self.name}] Starting a threadpool, size={thread_count}.")

def set_conn(self):
assert not hasattr(self.thread_local, "conn")
Expand Down
43 changes: 40 additions & 3 deletions data_diff/databases/connect.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,46 @@ def connect_to_uri(db_uri: str, thread_count: Optional[int] = 1) -> Database:
kw = matcher.match_path(dsn)

if scheme == "bigquery":
return cls(dsn.host, **kw)
kw["project"] = dsn.host
return cls(**kw)

if scheme == "snowflake":
kw["account"] = dsn.host
assert not dsn.port
kw["user"] = dsn.user
kw["password"] = dsn.password
else:
kw["host"] = dsn.host
kw["port"] = dsn.port
kw["user"] = dsn.user
if dsn.password:
kw["password"] = dsn.password
kw = {k: v for k, v in kw.items() if v is not None}

if issubclass(cls, ThreadedDatabase):
return cls(dsn.host, dsn.port, dsn.user, dsn.password, thread_count=thread_count, **kw)
return cls(thread_count=thread_count, **kw)

return cls(dsn.host, dsn.port, dsn.user, dsn.password, **kw)
return cls(**kw)


def connect_with_dict(d, thread_count):
d = dict(d)
driver = d.pop("driver")
try:
matcher = MATCH_URI_PATH[driver]
except KeyError:
raise NotImplementedError(f"Driver {driver} currently not supported")

cls = matcher.database_cls
if issubclass(cls, ThreadedDatabase):
return cls(thread_count=thread_count, **d)

return cls(**d)


def connect(x, thread_count):
if isinstance(x, str):
return connect_to_uri(x, thread_count)
elif isinstance(x, dict):
return connect_with_dict(x, thread_count)
raise RuntimeError(x)
12 changes: 6 additions & 6 deletions data_diff/databases/database_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ class ColType:
pass


class IKey(ABC):
"Interface for ColType, for using a column as a key in data-diff"
python_type: type


@dataclass
class PrecisionType(ColType):
precision: int
Expand Down Expand Up @@ -54,7 +59,7 @@ class Float(FractionalType):
pass


class Decimal(FractionalType):
class Decimal(FractionalType, IKey):
@property
def python_type(self) -> type:
if self.precision == 0:
Expand All @@ -66,11 +71,6 @@ class StringType(ColType):
pass


class IKey(ABC):
"Interface for ColType, for using a column as a key in data-diff"
python_type: type


class ColType_UUID(StringType, IKey):
python_type = ArithUUID

Expand Down
Loading