Skip to content

Commit

Permalink
Table Loader: Improve status reporting + error logging in BulkProcessor
Browse files Browse the repository at this point in the history
  • Loading branch information
amotl committed Sep 22, 2024
1 parent ca761b5 commit 795003f
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 13 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

## Unreleased
- Table Loader: Improved conditional handling of "transformation" parameter
- Table Loader: Improved status reporting and error logging in `BulkProcessor`

## 2024/09/19 v0.0.24
- MongoDB Full: Refactor transformation subsystem to `commons-codec`
Expand Down
16 changes: 9 additions & 7 deletions cratedb_toolkit/io/core.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# TODO: Maybe refactor to `sqlalchemy-cratedb` or `commons-codec` on another iteration?
import json
import typing as t
from functools import cached_property

Expand Down Expand Up @@ -120,14 +121,15 @@ def log_level(self):

def start(self) -> BulkMetrics:
# Acquire batches of documents, convert to SQL operations, and submit to CrateDB.
batch_count = 0
for batch in self.data:
batch_count += 1
self.progress_bar and self.progress_bar.set_description("READ ")
current_batch_size = len(batch)

self.progress_bar and self.progress_bar.set_description("ACQUIRE")

try:
operation = self.batch_to_operation(batch)
except Exception as ex:
self._metrics.count_error_total += current_batch_size
self.log_level(f"Computing query failed: {ex}")
if self.on_error == "raise":
raise
Expand All @@ -137,7 +139,7 @@ def start(self) -> BulkMetrics:
statement = sa.text(operation.statement)

# Submit operation to CrateDB, using `bulk_args`.
self.progress_bar and self.progress_bar.set_description("SUBMIT ")
self.progress_bar and self.progress_bar.set_description("WRITE")
try:
cursor = self.connection.execute(statement=statement, parameters=operation.parameters)
self.connection.commit()
Expand All @@ -158,7 +160,7 @@ def start(self) -> BulkMetrics:
# in order to relay proper error messages to the user.
if failed_records:
logger.warning(
f"Incomplete batch. Records processed: {count_success_local}/{current_batch_size}. "
f"Incomplete batch #{batch_count}. Records processed: {count_success_local}/{current_batch_size}. "
f"Falling back to per-record operations."
)
for record in failed_records:
Expand All @@ -167,8 +169,8 @@ def start(self) -> BulkMetrics:
self.connection.commit()
self._metrics.count_success_total += 1
except Exception as ex:
logger.warning(f"Operation failed: {ex}")
logger.debug(f"Failing record: {record}")
logger.error(f"Operation failed: {ex}")
logger.debug(f"Invalid record:\n{json.dumps(record, indent=2)}")
self._metrics.count_error_total += 1
self._metrics.bytes_error_total += asizeof(record)
if self.on_error == "raise":
Expand Down
2 changes: 1 addition & 1 deletion cratedb_toolkit/util/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def boot_click(ctx: click.Context, verbose: bool = False, debug: bool = False):
log_level = logging.DEBUG

# Setup logging, according to `verbose` / `debug` flags.
setup_logging(level=log_level, verbose=verbose)
setup_logging(level=log_level, verbose=verbose, debug=debug)


def split_list(value: str, delimiter: str = ",") -> t.List[str]:
Expand Down
24 changes: 19 additions & 5 deletions cratedb_toolkit/util/common.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
# Copyright (c) 2023, Crate.io Inc.
# Distributed under the terms of the AGPLv3 license, see LICENSE.
import logging
import os

import colorlog
from colorlog.escape_codes import escape_codes

from cratedb_toolkit.util.data import asbool

def setup_logging(level=logging.INFO, verbose: bool = False, width: int = 36):

def setup_logging(level=logging.INFO, verbose: bool = False, debug: bool = False, width: int = 36):
reset = escape_codes["reset"]
log_format = f"%(asctime)-15s [%(name)-{width}s] %(log_color)s%(levelname)-8s:{reset} %(message)s"

Expand All @@ -15,14 +18,25 @@ def setup_logging(level=logging.INFO, verbose: bool = False, width: int = 36):

logging.basicConfig(format=log_format, level=level, handlers=[handler])

# Enable SQLAlchemy logging.
if verbose:
logging.getLogger("sqlalchemy").setLevel(level)

logging.getLogger("crate.client").setLevel(level)
logging.getLogger("sqlalchemy_cratedb").setLevel(level)
logging.getLogger("urllib3.connectionpool").setLevel(level)

# Enable SQLAlchemy logging.
if verbose:
logging.getLogger("cratedb_toolkit").setLevel(logging.DEBUG)

if debug:
# Optionally tame SQLAlchemy and PyMongo.
if asbool(os.environ.get("DEBUG_SQLALCHEMY")):
logging.getLogger("sqlalchemy").setLevel(level)
else:
logging.getLogger("sqlalchemy").setLevel(logging.INFO)
if asbool(os.environ.get("DEBUG_PYMONGO")):
logging.getLogger("pymongo").setLevel(level)
else:
logging.getLogger("pymongo").setLevel(logging.INFO)

# logging.getLogger("docker.auth").setLevel(logging.INFO) # noqa: ERA001

# Tame Faker spamming the logs.
Expand Down

0 comments on commit 795003f

Please sign in to comment.