Skip to content

Commit

Permalink
[ENH] Add function log_and_raise (#1238)
Browse files Browse the repository at this point in the history
* add log_and_raise function

* use it in adni_json
  • Loading branch information
NicolasGensollen authored Jul 18, 2024
1 parent 993733a commit ca77bab
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 19 deletions.
47 changes: 28 additions & 19 deletions clinica/iotools/converters/adni_to_bids/adni_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,16 @@
import pandas as pd

from clinica.utils.exceptions import ClinicaXMLParserError
from clinica.utils.stream import cprint
from clinica.utils.stream import cprint, log_and_raise

__all__ = ["create_json_metadata"]


LOGGING_HEADER = "[ADNI JSON]"


def _log_and_raise(message: str):
cprint(f"{LOGGING_HEADER} {message}", lvl="error")
raise ClinicaXMLParserError(f"{LOGGING_HEADER} {message}")
def _log_and_raise_xml_parser_error(message: str):
log_and_raise(f"{LOGGING_HEADER} {message}", ClinicaXMLParserError)


# Map from names of extracted metadata to their proper BIDS names.
Expand Down Expand Up @@ -65,7 +64,9 @@ def _read_xml_files(
def _check_xml_tag(element_tag: str, expected_tag: str):
"""Check that the XML element tagged matches the expected tag."""
if element_tag != expected_tag:
_log_and_raise(f"Bad tag: expected {expected_tag}, got {element_tag}")
_log_and_raise_xml_parser_error(
f"Bad tag: expected {expected_tag}, got {element_tag}"
)


def _check_xml_nb_children(
Expand All @@ -78,13 +79,13 @@ def _check_xml_nb_children(
nb_children = len(xml_el)
if isinstance(expected_nb_children, int):
if nb_children != expected_nb_children:
_log_and_raise(
_log_and_raise_xml_parser_error(
f"Bad number of children for <{xml_el.tag}>: "
f"got {nb_children} != {expected_nb_children}"
)
else:
if nb_children not in expected_nb_children:
_log_and_raise(
_log_and_raise_xml_parser_error(
f"Bad number of children for <{xml_el.tag}>: "
f"got {nb_children}, not in {expected_nb_children}"
)
Expand Down Expand Up @@ -112,8 +113,8 @@ def _parse_project(
Check that the project identifier contains ADNI.
"""
if len(root) != 1:
_log_and_raise(
"XML root should have only one child. " f"{len(root)} children found."
_log_and_raise_xml_parser_error(
"XML root should have only one child. " f"{len(root)} children found.",
)
project = _check_xml(root[0], "project", 4)
_check_xml_project_identifier(project, "ADNI")
Expand All @@ -138,7 +139,7 @@ def _get_text(xml_element: xml.etree.ElementTree.Element, cast=None) -> str:
def _check_derived_image_xml(derived: xml.etree.ElementTree.Element) -> None:
"""Perform sanity checks on derived images."""
if len(derived) < 9:
_log_and_raise("Derived image does not have enough field.")
_log_and_raise_xml_parser_error("Derived image does not have enough field.")
for idx, field, expected in zip(
[2, 3, 4, 5, 6],
["imageType", "tissue", "hemisphere", "anatomicStructure", "registration"],
Expand All @@ -152,15 +153,17 @@ def _check_derived_image_xml(derived: xml.etree.ElementTree.Element) -> None:
try:
_validate_date_iso_format(image_create_date)
except ValueError as e:
_log_and_raise(f"The creationDate for the derived image is not valid: {e}")
_log_and_raise_xml_parser_error(
f"The creationDate for the derived image is not valid: {e}"
)


def _check_xml_field(
xml_element: xml.etree.ElementTree.Element, field: str, expected_value: str
) -> None:
"""Check that the given field of the given XML element has the expected value."""
if (found_value := _check_xml_and_get_text(xml_element, field)) != expected_value:
_log_and_raise(
_log_and_raise_xml_parser_error(
f"The {field} for the derived image should be '{expected_value}'. "
f"{found_value} was found instead."
)
Expand Down Expand Up @@ -193,7 +196,7 @@ def _check_xml_project_identifier(
):
"""Check the project identifier."""
if _check_xml_and_get_text(project[0], "projectIdentifier") != expected:
_log_and_raise(f"Not {expected} cohort")
_log_and_raise_xml_parser_error(f"Not {expected} cohort")


def _get_original_image_metadata(original_image: xml.etree.ElementTree.Element) -> dict:
Expand Down Expand Up @@ -248,7 +251,9 @@ def _get_root_from_xml_path(xml_path: Path) -> xml.etree.ElementTree.Element:
root = tree.getroot()
return root
except Exception as e:
_log_and_raise(f"Error parsing XML file {xml_path.name} :\n{e}")
_log_and_raise_xml_parser_error(
f"Error parsing XML file {xml_path.name} :\n{e}"
)


def _parse_series(
Expand Down Expand Up @@ -294,13 +299,13 @@ def _check_image(img: xml.etree.ElementTree.Element):
"imagingProtocol", # for original image metadata
"originalRelatedImage", # for processed image
}:
_log_and_raise(
_log_and_raise_xml_parser_error(
f"Bad image tag <{img.tag}>. "
"Should be either 'imagingProtocol' or 'originalRelatedImage'."
)
if len(img) not in {3, 4}:
_log_and_raise(
f"Image XML element has {len(img)} children. " "Expected either 3 or 4."
_log_and_raise_xml_parser_error(
f"Image XML element has {len(img)} children. " "Expected either 3 or 4.",
)


Expand Down Expand Up @@ -361,7 +366,9 @@ def _check_modality(study: xml.etree.ElementTree.Element, expected_modality: str
series = _parse_series(study)
modality = _check_xml_and_get_text(series[1], "modality")
if modality != expected_modality:
_log_and_raise(f"Unexpected modality {modality}, expected {expected_modality}.")
_log_and_raise_xml_parser_error(
f"Unexpected modality {modality}, expected {expected_modality}."
)


def _parse_subject(
Expand Down Expand Up @@ -418,7 +425,9 @@ def _parse_xml_file(xml_path: Path) -> dict:
**derived_image_metadata,
}
if "image_proc_id" not in scan_metadata and "image_orig_id" not in scan_metadata:
_log_and_raise(f"Scan metadata for subject {subject_id} has no image ID.")
_log_and_raise_xml_parser_error(
f"Scan metadata for subject {subject_id} has no image ID."
)
return scan_metadata


Expand Down
16 changes: 16 additions & 0 deletions clinica/utils/stream.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""This module handles stream and log redirection."""

from enum import Enum
from typing import Type


class LoggingLevel(str, Enum):
Expand Down Expand Up @@ -38,3 +39,18 @@ def cprint(msg: str, lvl: str = "info") -> None:
logger.critical(msg=msg)
else:
pass


def log_and_raise(message: str, error_type: Type[Exception]):
"""Log the error message using cprint and raise.
Parameters
----------
message : str
The error message.
error_type : Exception
The error type to raise.
"""
cprint(message, lvl="error")
raise error_type(message)

0 comments on commit ca77bab

Please sign in to comment.