diff --git a/CHANGES.md b/CHANGES.md index 638e0f8..f56cbe4 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -3,6 +3,7 @@ ## Unreleased - Table Loader: Improved conditional handling of "transformation" parameter +- Table Loader: Improved status reporting and error logging in `BulkProcessor` ## 2024/09/19 v0.0.24 - MongoDB Full: Refactor transformation subsystem to `commons-codec` diff --git a/cratedb_toolkit/io/core.py b/cratedb_toolkit/io/core.py index d793b89..157e1e6 100644 --- a/cratedb_toolkit/io/core.py +++ b/cratedb_toolkit/io/core.py @@ -1,4 +1,5 @@ # TODO: Maybe refactor to `sqlalchemy-cratedb` or `commons-codec` on another iteration? +import json import typing as t from functools import cached_property @@ -120,14 +121,15 @@ def log_level(self): def start(self) -> BulkMetrics: # Acquire batches of documents, convert to SQL operations, and submit to CrateDB. + batch_count = 0 for batch in self.data: + batch_count += 1 + self.progress_bar and self.progress_bar.set_description("READ ") current_batch_size = len(batch) - - self.progress_bar and self.progress_bar.set_description("ACQUIRE") - try: operation = self.batch_to_operation(batch) except Exception as ex: + self._metrics.count_error_total += current_batch_size self.log_level(f"Computing query failed: {ex}") if self.on_error == "raise": raise @@ -137,7 +139,7 @@ def start(self) -> BulkMetrics: statement = sa.text(operation.statement) # Submit operation to CrateDB, using `bulk_args`. - self.progress_bar and self.progress_bar.set_description("SUBMIT ") + self.progress_bar and self.progress_bar.set_description("WRITE") try: cursor = self.connection.execute(statement=statement, parameters=operation.parameters) self.connection.commit() @@ -158,7 +160,7 @@ def start(self) -> BulkMetrics: # in order to relay proper error messages to the user. if failed_records: logger.warning( - f"Incomplete batch. Records processed: {count_success_local}/{current_batch_size}. " + f"Incomplete batch #{batch_count}. Records processed: {count_success_local}/{current_batch_size}. " f"Falling back to per-record operations." ) for record in failed_records: @@ -167,8 +169,8 @@ def start(self) -> BulkMetrics: self.connection.commit() self._metrics.count_success_total += 1 except Exception as ex: - logger.warning(f"Operation failed: {ex}") - logger.debug(f"Failing record: {record}") + logger.error(f"Operation failed: {ex}") + logger.debug(f"Invalid record:\n{json.dumps(record, indent=2)}") self._metrics.count_error_total += 1 self._metrics.bytes_error_total += asizeof(record) if self.on_error == "raise": diff --git a/cratedb_toolkit/util/cli.py b/cratedb_toolkit/util/cli.py index f5ebc3e..17d27b0 100644 --- a/cratedb_toolkit/util/cli.py +++ b/cratedb_toolkit/util/cli.py @@ -24,7 +24,7 @@ def boot_click(ctx: click.Context, verbose: bool = False, debug: bool = False): log_level = logging.DEBUG # Setup logging, according to `verbose` / `debug` flags. - setup_logging(level=log_level, verbose=verbose) + setup_logging(level=log_level, verbose=verbose, debug=debug) def split_list(value: str, delimiter: str = ",") -> t.List[str]: diff --git a/cratedb_toolkit/util/common.py b/cratedb_toolkit/util/common.py index c5aaa6e..9ab9cd1 100644 --- a/cratedb_toolkit/util/common.py +++ b/cratedb_toolkit/util/common.py @@ -1,12 +1,15 @@ # Copyright (c) 2023, Crate.io Inc. # Distributed under the terms of the AGPLv3 license, see LICENSE. import logging +import os import colorlog from colorlog.escape_codes import escape_codes +from cratedb_toolkit.util.data import asbool -def setup_logging(level=logging.INFO, verbose: bool = False, width: int = 36): + +def setup_logging(level=logging.INFO, verbose: bool = False, debug: bool = False, width: int = 36): reset = escape_codes["reset"] log_format = f"%(asctime)-15s [%(name)-{width}s] %(log_color)s%(levelname)-8s:{reset} %(message)s" @@ -15,14 +18,25 @@ def setup_logging(level=logging.INFO, verbose: bool = False, width: int = 36): logging.basicConfig(format=log_format, level=level, handlers=[handler]) - # Enable SQLAlchemy logging. - if verbose: - logging.getLogger("sqlalchemy").setLevel(level) - logging.getLogger("crate.client").setLevel(level) logging.getLogger("sqlalchemy_cratedb").setLevel(level) logging.getLogger("urllib3.connectionpool").setLevel(level) + # Enable SQLAlchemy logging. + if verbose: + logging.getLogger("cratedb_toolkit").setLevel(logging.DEBUG) + + if debug: + # Optionally tame SQLAlchemy and PyMongo. + if asbool(os.environ.get("DEBUG_SQLALCHEMY")): + logging.getLogger("sqlalchemy").setLevel(level) + else: + logging.getLogger("sqlalchemy").setLevel(logging.INFO) + if asbool(os.environ.get("DEBUG_PYMONGO")): + logging.getLogger("pymongo").setLevel(level) + else: + logging.getLogger("pymongo").setLevel(logging.INFO) + # logging.getLogger("docker.auth").setLevel(logging.INFO) # noqa: ERA001 # Tame Faker spamming the logs.