From 7fd26adf49e0adcf93d560376a2ce90d6e1b823c Mon Sep 17 00:00:00 2001 From: NicolasGensollen Date: Mon, 25 Sep 2023 12:33:24 +0200 Subject: [PATCH] refactor... --- .../pipelines/statistics_surface/__init__.py | 2 +- .../pipelines/statistics_surface/_model.py | 1099 ----------------- .../pipelines/statistics_surface/_utils.py | 322 +++++ .../{statistics_surface_cli.py => cli.py} | 46 +- ...istics_surface_pipeline.py => pipeline.py} | 137 +- .../statistics_surface_utils.py | 276 ----- .../statistics_surface/surfstat/__init__.py | 4 + .../_surfstat.py} | 63 +- .../{_inputs.py => surfstat/_utils.py} | 25 +- .../surfstat/models/__init__.py | 3 + .../surfstat/models/_base.py | 234 ++++ .../surfstat/models/_contrast.py | 87 ++ .../surfstat/models/_correlation.py | 61 + .../surfstat/models/_factory.py | 99 ++ .../surfstat/models/_group.py | 118 ++ .../surfstat/models/_utils.py | 152 +++ .../surfstat/models/results/__init__.py | 9 + .../surfstat/models/results/_base.py | 77 ++ .../surfstat/models/results/_plot.py | 89 ++ .../surfstat/models/results/_serialize.py | 110 ++ .../surfstat/models/results/_statistics.py | 187 +++ .../pipelines/test_run_pipelines_stats.py | 26 +- .../statistics_surface/test_inputs.py | 18 +- .../statistics_surface/test_model.py | 163 ++- 24 files changed, 1794 insertions(+), 1613 deletions(-) delete mode 100644 clinica/pipelines/statistics_surface/_model.py create mode 100644 clinica/pipelines/statistics_surface/_utils.py rename clinica/pipelines/statistics_surface/{statistics_surface_cli.py => cli.py} (76%) rename clinica/pipelines/statistics_surface/{statistics_surface_pipeline.py => pipeline.py} (66%) delete mode 100644 clinica/pipelines/statistics_surface/statistics_surface_utils.py create mode 100644 clinica/pipelines/statistics_surface/surfstat/__init__.py rename clinica/pipelines/statistics_surface/{clinica_surfstat.py => surfstat/_surfstat.py} (78%) rename clinica/pipelines/statistics_surface/{_inputs.py => surfstat/_utils.py} (88%) create mode 100644 clinica/pipelines/statistics_surface/surfstat/models/__init__.py create mode 100644 clinica/pipelines/statistics_surface/surfstat/models/_base.py create mode 100644 clinica/pipelines/statistics_surface/surfstat/models/_contrast.py create mode 100644 clinica/pipelines/statistics_surface/surfstat/models/_correlation.py create mode 100644 clinica/pipelines/statistics_surface/surfstat/models/_factory.py create mode 100644 clinica/pipelines/statistics_surface/surfstat/models/_group.py create mode 100644 clinica/pipelines/statistics_surface/surfstat/models/_utils.py create mode 100644 clinica/pipelines/statistics_surface/surfstat/models/results/__init__.py create mode 100644 clinica/pipelines/statistics_surface/surfstat/models/results/_base.py create mode 100644 clinica/pipelines/statistics_surface/surfstat/models/results/_plot.py create mode 100644 clinica/pipelines/statistics_surface/surfstat/models/results/_serialize.py create mode 100644 clinica/pipelines/statistics_surface/surfstat/models/results/_statistics.py diff --git a/clinica/pipelines/statistics_surface/__init__.py b/clinica/pipelines/statistics_surface/__init__.py index 0cdbd511ed..f00831fce2 100644 --- a/clinica/pipelines/statistics_surface/__init__.py +++ b/clinica/pipelines/statistics_surface/__init__.py @@ -1 +1 @@ -from . import statistics_surface_cli +from . import cli diff --git a/clinica/pipelines/statistics_surface/_model.py b/clinica/pipelines/statistics_surface/_model.py deleted file mode 100644 index ffa32e07a2..0000000000 --- a/clinica/pipelines/statistics_surface/_model.py +++ /dev/null @@ -1,1099 +0,0 @@ -import abc -import warnings -from dataclasses import dataclass -from functools import reduce -from os import PathLike -from pathlib import Path -from string import Template -from typing import Callable, Dict, List, Optional, Union - -import numpy as np -import pandas as pd -from brainstat.stats.SLM import SLM -from brainstat.stats.terms import FixedEffect -from nilearn.surface import Mesh - -from clinica.utils.stream import cprint - -MISSING_TERM_ERROR_MSG = Template( - "Term ${term} from the design matrix is not in the columns of the " - "provided TSV file. Please make sure that there is no typo." -) - - -def _print_clusters(model: SLM, threshold: float) -> None: - """This function prints the results related to total number - of clusters, as well as the significative clusters. - - Parameters - ---------- - model : brainstat.stats.SLM - Fitted SLM model. - - threshold : float - Cluster defining threshold. - """ - cprint("#" * 40) - cprint("After correction (Cluster-wise Correction for Multiple Comparisons): ") - df = model.P["clus"][0] - cprint(df) - cprint(f"Clusters found: {len(df)}") - cprint( - f"Significative clusters (after correction): {len(df[df['P'] <= threshold])}" - ) - - -def _check_column_in_df(df: pd.DataFrame, column: str) -> None: - """Checks if the provided column name is in the provided DataFrame. - Raises a ValueError if not. - - Parameters - ---------- - df : pd.DataFrame - DataFrame to analyze. - - column : str - Name of the column to check. - """ - if column not in df.columns: - raise ValueError(MISSING_TERM_ERROR_MSG.safe_substitute(term=column)) - - -def _categorical_column(df: pd.DataFrame, column: str) -> bool: - """Returns `True` if the column is categorical and `False` otherwise. - - Parameters - ---------- - df : pd.DataFrame - The DataFrame to analyze. - - column : str - The name of the column to check. - - Returns - ------- - bool : - `True` if the column contains categorical values, `False` otherwise. - """ - return not df[column].dtype.name.startswith("float") - - -def _build_model(design_matrix: str, df: pd.DataFrame) -> FixedEffect: - """Build a brainstat model from the design matrix in - string format. - - This function assumes that the design matrix is formatted - in the following way: - - 1 + factor_1 + factor_2 + ... - - Or: - - factor_1 + factor_2 + ... - - in the latter case the intercept will be added automatically. - - Parameters - ---------- - design_matrix : str - Design matrix specified as a string. - - df : pd.DataFrame - Subjects DataFrame. - - Returns - ------- - model : FixedEffect - BrainStats model. - """ - if len(design_matrix) == 0: - raise ValueError("Design matrix cannot be empty.") - if "+" in design_matrix: - terms = [_.strip() for _ in design_matrix.split("+")] - else: - terms = [design_matrix.strip()] - model = [] - for term in terms: - # Intercept is automatically included in brainstat - if term == "1": - continue - # Handles the interaction effects - if "*" in term: - sub_terms = [_.strip() for _ in term.split("*")] - model_term = reduce( - lambda x, y: x * y, [_build_model_term(_, df) for _ in sub_terms] - ) - else: - model_term = _build_model_term(term, df) - model.append(model_term) - return reduce(lambda x, y: x + y, model) - - -def _build_model_term( - term: str, - df: pd.DataFrame, - add_intercept: Optional[bool] = True, -) -> FixedEffect: - """Builds a BrainStats model term from the subjects - DataFrame and a column name. - - Parameters - ---------- - term : str - The name of the column of the DataFrame to be used. - - df : pd.DataFrame - The subjects DataFrame. - - add_intercept : bool - If `True`, adds an intercept term. - - Returns - ------- - FixedEffect : - BrainStats model term. - """ - return FixedEffect(df[term], add_intercept=add_intercept) - - -class GLM: - """This class implements the functionalities common to all GLM models - used in the Clinica SurfaceStatistics pipeline. - - Attributes - ---------- - design : str - The design matrix specified in string format. - If this contains a "*", it will be interpreted as an interaction effect. - - df : pd.DataFrame - The subjects DataFrame. - - feature_label : str - The label used for building output filenames. - - contrast : str - The contrast specified in string format. - - fwhm : int, optional - The smoothing FWHM. This is used in the output file names. - Default=20. - - threshold_uncorrected_pvalue : float, optional - The threshold to be used with uncorrected P-values. Default=0.001. - - threshold_corrected_pvalue : float, optional - The threshold to be used with corrected P-values. Default=0.05. - - cluster_threshold : float, optional - The threshold to be used to declare clusters as significant. Default=0.001. - """ - - def __init__( - self, - design: str, - df: pd.DataFrame, - feature_label: str, - contrast: str, - fwhm: Optional[int] = 20, - threshold_uncorrected_pvalue: Optional[float] = 0.001, - threshold_corrected_pvalue: Optional[float] = 0.05, - cluster_threshold: Optional[float] = 0.001, - ): - self._two_tailed = False - self._correction = ["fdr", "rft"] - self.df = df - self.feature_label = feature_label - self.fwhm = fwhm - self.threshold_uncorrected_pvalue = threshold_uncorrected_pvalue - self.threshold_corrected_pvalue = threshold_corrected_pvalue - self.cluster_threshold = cluster_threshold - self.results_ = None - self.slm_models_ = None - self.contrasts = dict() - self.filenames = dict() - self.model = _build_model(design, df) - self.build_contrasts(contrast) - - @property - def contrast_names(self) -> List[str]: - if self.contrasts is not None: - return list(self.contrasts.keys()) - return list() - - @property - def results(self): - if self._is_fitted(): - return self.results_ - - @abc.abstractmethod - def build_contrasts(self, contrast: str): - """Build the contrasts from the provided contrast in string format. - - .. note:: - This method needs to be implemented in subclasses. - - Parameters - ---------- - contrast : str - Contrast in string format. - """ - pass - - @abc.abstractmethod - def filename_root(self, contrast: str): - """Returns the output file name root for the provided contrast. - - .. note:: - This method needs to be implemented in subclasses. - - Parameters - ---------- - contrast : str - Contrast for which to get the output filename. - """ - pass - - def _is_fitted(self) -> bool: - return self.results_ is not None - - def fit( - self, data: np.ndarray, surface: Dict, mask: Optional[np.ndarray] = None - ) -> None: - """Fit the GLM model instance. - - Parameters - ---------- - data : np.ndarray - The data on which to fit the GLM model. - - surface : dict - The Brainstat surface on which to fit the GLM model. - - mask : np.ndarray, optional - The mask to be used to mask the data. Default=None. - """ - if mask is None: - mask = data[0, :] > 0 - self.results_ = dict() - self.slm_models_ = dict() - for contrast_name, contrast in self.contrasts.items(): - slm_model = SLM( - self.model, - contrast=contrast, - surf=surface, - mask=mask, - two_tailed=self._two_tailed, - correction=self._correction, - cluster_threshold=self.cluster_threshold, - ) - cprint( - msg=f"Fitting the GLM model with contrast {contrast_name}...", - lvl="info", - ) - slm_model.fit(data) - _print_clusters(slm_model, self.threshold_corrected_pvalue) - self.results_[contrast_name] = StatisticsResults.from_slm_model( - slm_model, - mask, - self.threshold_uncorrected_pvalue, - self.threshold_corrected_pvalue, - ) - self.slm_models_[contrast_name] = slm_model - - def save_results(self, output_dir: PathLike, method: Union[str, List[str]]) -> None: - """Save results to the provided output directory. - - Parameters - ---------- - output_dir : PathLike - The output directory in which to write the results. - - method : str or List[str] - The method(s) to write the results. - """ - if not self._is_fitted(): - raise ValueError( - "GLM model needs to be fitted before accessing the results." - ) - if isinstance(method, str): - method = [method] - for contrast, result in self.results_.items(): - result_serializer = StatisticsResultsSerializer( - Path(output_dir) / Path(self.filename_root(contrast)) - ) - for meth in method: - result_serializer.save(result, meth) - - def plot_results( - self, - output_dir: PathLike, - method: Union[str, List[str]], - mesh: Mesh, - ) -> None: - """Plot results to the provided directory. - - Parameters - ---------- - output_dir : PathLike - The output directory in which to write the plot files. - - method : str or List[str] - The method(s) to make the plots. - - mesh : nilearn.surface.Mesh - The mesh on which to plot the result data. - """ - if not self._is_fitted(): - raise ValueError( - "GLM model needs to be fitted before accessing the results." - ) - if isinstance(method, str): - method = [method] - for contrast, result in self.results_.items(): - plotter = StatisticsResultsPlotter( - Path(output_dir) / Path(self.filename_root(contrast)), mesh - ) - for meth in method: - plotter.plot(result, meth) - - -class CorrelationGLM(GLM): - """Class implementing the correlation type GLM model. - - Attributes - ---------- - See documentation for `GLM` class. - - group_label : str, optional - The label to use for group GLM models. Default=None. - """ - - def __init__( - self, - design: str, - df: pd.DataFrame, - feature_label: str, - contrast: str, - group_label: Optional[str], - fwhm: Optional[int] = 20, - threshold_uncorrected_pvalue: Optional[float] = 0.001, - threshold_corrected_pvalue: Optional[float] = 0.05, - cluster_threshold: Optional[float] = 0.001, - ): - self.with_interaction = False - self.absolute_contrast_name = None - self.contrast_sign = None - self.group_label = group_label - super().__init__( - design, - df, - feature_label, - contrast, - fwhm, - threshold_uncorrected_pvalue, - threshold_corrected_pvalue, - cluster_threshold, - ) - - def build_contrasts(self, contrast: str): - """Build the contrast from the string specification. - - Parameters - ---------- - contrast : str - The contrast to build. - """ - absolute_contrast_name = contrast - contrast_sign = "positive" - if contrast.startswith("-"): - absolute_contrast_name = contrast[1:].lstrip() - contrast_sign = "negative" - built_contrast = self.df[absolute_contrast_name] - if contrast_sign == "negative": - built_contrast *= -1 - self.contrasts[contrast] = built_contrast - self.absolute_contrast_name = absolute_contrast_name - self.contrast_sign = contrast_sign - - def filename_root(self, contrast: str): - """Build the filename root part from class attributes and provided contrast. - - Parameters - ---------- - contrast : str - The contrast to use for building the filename. - """ - if contrast not in self.contrasts: - raise ValueError(f"Unknown contrast {contrast}.") - return ( - f"group-{self.group_label}_correlation-{self.absolute_contrast_name}" - f"-{self.contrast_sign}_measure-{self.feature_label}_fwhm-{self.fwhm}" - ) - - -class GroupGLM(GLM): - """Class implementing group GLM models. - - Attributes - ---------- - See documentation for `GLM` class. - - group_label : str, optional - The Label to use for group GLM models. Default="group". - """ - - def __init__( - self, - design: str, - df: pd.DataFrame, - feature_label: str, - contrast: str, - group_label: Optional[str] = "group", - fwhm: Optional[int] = 20, - threshold_uncorrected_pvalue: Optional[float] = 0.001, - threshold_corrected_pvalue: Optional[float] = 0.05, - cluster_threshold: Optional[float] = 0.001, - ): - self.with_interaction = False - self.group_label = group_label - super().__init__( - design, - df, - feature_label, - contrast, - fwhm, - threshold_uncorrected_pvalue, - threshold_corrected_pvalue, - cluster_threshold, - ) - - def build_contrasts(self, contrast: str): - """Build the contrast from the string specification. - - Parameters - ---------- - contrast : str - The contrast to build. - """ - _check_column_in_df(self.df, contrast) - if not _categorical_column(self.df, contrast): - raise ValueError( - "Contrast should refer to a categorical variable for group comparison. " - "Please select 'correlation' for 'glm_type' otherwise." - ) - group_values = np.unique(self.df[contrast]) - for contrast_type, (i, j) in zip(["positive", "negative"], [(0, 1), (1, 0)]): - contrast_name = f"{group_values[j]}-lt-{group_values[i]}" - self.contrasts[contrast_name] = ( - self.df[contrast] == group_values[i] - ).astype(int) - (self.df[contrast] == group_values[j]).astype(int) - - def filename_root(self, contrast: str): - """Build the filename root part from class attributes and provided contrast. - - Parameters - ---------- - contrast : str - The contrast to use for building the filename. - """ - if contrast not in self.contrasts: - raise ValueError(f"Unknown contrast {contrast}.") - return f"group-{self.group_label}_{contrast}_measure-{self.feature_label}_fwhm-{self.fwhm}" - - -class GroupGLMWithInteraction(GroupGLM): - """This class implements a GLM model for group comparison with - interaction effects. - - Attributes - ---------- - See attributes of parent class `GroupGLM`. - """ - - def __init__( - self, - design: str, - df: pd.DataFrame, - feature_label: str, - contrast: str, - group_label: Optional[str] = "group", - fwhm: Optional[int] = 20, - threshold_uncorrected_pvalue: Optional[float] = 0.001, - threshold_corrected_pvalue: Optional[float] = 0.05, - cluster_threshold: Optional[float] = 0.001, - ): - super().__init__( - design, - df, - feature_label, - contrast, - group_label, - fwhm, - threshold_uncorrected_pvalue, - threshold_corrected_pvalue, - cluster_threshold, - ) - self.with_interaction = True - warnings.warn( - "You included interaction as covariate in your model, " - "please carefully check the format of your tsv files." - ) - - def build_contrasts(self, contrast: str): - """Build the contrast from the string specification. - - Parameters - ---------- - contrast : str - The contrast to build. - """ - contrast_elements = [_.strip() for _ in contrast.split("*")] - for contrast_element in contrast_elements: - _check_column_in_df(self.df, contrast_element) - categorical = [_categorical_column(self.df, _) for _ in contrast_elements] - if len(contrast_elements) != 2 or sum(categorical) != 1: - raise ValueError( - "The contrast must be an interaction between one continuous " - "variable and one categorical variable. Your contrast contains " - f"the following variables : {contrast_elements}" - ) - idx = 0 if categorical[0] else 1 - categorical_contrast = contrast_elements[idx] - continue_contrast = contrast_elements[(idx + 1) % 2] - group_values = np.unique(self.df[categorical_contrast]) - built_contrast = self.df[continue_contrast].where( - self.df[categorical_contrast] == group_values[0], 0 - ) - self.df[continue_contrast].where( - self.df[categorical_contrast] == group_values[1], 0 - ) - self.contrasts[contrast] = built_contrast - - def filename_root(self, contrast: str): - """Build the filename root part from class attributes and provided contrast. - - Parameters - ---------- - contrast : str - The contrast to use for building the filename. - """ - if contrast not in self.contrasts: - raise ValueError(f"Unknown contrast {contrast}.") - return f"interaction-{contrast}_measure-{self.feature_label}_fwhm-{self.fwhm}" - - -def create_glm_model( - glm_type: str, - design: str, - df: pd.DataFrame, - contrast: str, - feature_label: str, - group_label: Optional[str] = "group", - fwhm: Optional[int] = 20, - threshold_uncorrected_pvalue: Optional[float] = 0.001, - threshold_corrected_pvalue: Optional[float] = 0.05, - cluster_threshold: Optional[float] = 0.001, -) -> GLM: - """Factory method for building a GLM model instance corresponding to the - provided type and design matrix. - - Parameters - ---------- - glm_type : str - The type of GLM to be created. Either "correlation" or "group_comparison". - - design : str - The design matrix specified in string format. - If this contains a "*", it will be interpreted as an interaction effect. - - df : pd.DataFrame - The subjects DataFrame. - - contrast : str - The contrast specified in string format. - - feature_label : str - The label used for building output filenames. - - group_label : str, optional - The label to use for group GLM models. Default="group". - - fwhm : int, optional - The smoothing FWHM. This is used in the output file names. - Default=20. - - threshold_uncorrected_pvalue : float, optional - The threshold to be used with uncorrected P-values. Default=0.001. - - threshold_corrected_pvalue : float, optional - The threshold to be used with corrected P-values. Default=0.05. - - cluster_threshold : float, optional - The threshold to be used to declare clusters as significant. Default=0.001. - - Returns - ------- - model : GLM - An instance of the `GLM` class. - - Raises - ------ - ValueError - If the glm_type is not supported. - """ - cprint( - msg=f"The GLM model is: {design} and the GLM type is: {glm_type}", - lvl="info", - ) - params = { - "group_label": group_label, - "fwhm": fwhm, - "threshold_uncorrected_pvalue": threshold_uncorrected_pvalue, - "threshold_corrected_pvalue": threshold_corrected_pvalue, - "cluster_threshold": cluster_threshold, - } - if glm_type == "correlation": - return CorrelationGLM(design, df, feature_label, contrast, **params) - elif glm_type == "group_comparison": - if "*" in design: - return GroupGLMWithInteraction( - design, df, feature_label, contrast, **params - ) - return GroupGLM(design, df, feature_label, contrast, **params) - raise ValueError( - f"create_glm_model received an unknown GLM type: {glm_type}." - f"Only 'correlation' and 'group_comparison' are supported." - ) - - -def _convert_arrays_to_lists(data: dict) -> dict: - """If the input dictionary contains numpy arrays, this function will - cast them to lists and return the same dictionary with the lists instead - of the numpy arrays. - - Parameters - ---------- - data : dict - The dictionary to clean. - - Returns - ------- - new_data : dict - The dictionary with arrays casted to lists. - """ - new_data = dict() - for k, v in data.items(): - if isinstance(v, dict): - new_data[k] = _convert_arrays_to_lists(v) - elif isinstance(v, np.ndarray): - new_data[k] = v.tolist() - else: - new_data[k] = v - return new_data - - -class Results: - """Common class for GLM results.""" - - def to_dict(self) -> dict: - """Returns the `Results` instance in dict format. - - Private attributes and all methods are not returned. - - This function does not perform any casting. - - Returns - ------- - data : dict - Resulting dictionary. - """ - import inspect - - data = dict() - for attribute in inspect.getmembers(self): - name, value = attribute - if not name.startswith("_"): - if not inspect.ismethod(value): - if hasattr(value, "to_dict"): - data[name] = value.to_dict() - else: - data[name] = value - return data - - def to_json(self, indent: Optional[int] = 4) -> str: - """Returns the json of the `Results` instance. - - Parameters - ---------- - indent : int, optional - Indent to use. Default=4. - - Returns - ------- - str : - The JSON dumps of the results. - """ - import json - - return json.dumps(_convert_arrays_to_lists(self.to_dict()), indent=indent) - - -@dataclass -class PValueResults(Results): - """This class implements a container for raw (uncorrected) - P-value results obtained with a GLM model. - - Attributes - ---------- - pvalues : np.ndarray - Array of uncorrected P-values. - - mask : np.ndarray - The binary mask. - - threshold : float - The threshold used. - """ - - pvalues: np.ndarray - mask: np.ndarray - threshold: float - - @property - def thresh(self): - """For compatibility with previous Matlab implementation.""" - return self.threshold - - @property - def P(self): - """For compatibility with previous Matlab implementation.""" - return self.pvalues - - @classmethod - def from_t_statistics( - cls, - tstats: np.ndarray, - df: pd.DataFrame, - mask: np.ndarray, - threshold: float, - ): - """Instantiate the class from an array of T-statistics. - - Parameters - ---------- - tstats : np.ndarray - Array of T-statistics. - - df : pd.DataFrame - The subjects DataFrame. - - mask : np.ndarray - The binary mask. - - threshold : float - The threshold to be used. - """ - from scipy.stats import t - - return cls(1 - t.cdf(tstats, df), mask, threshold) - - -@dataclass -class CorrectedPValueResults(PValueResults): - """This class implements a container for corrected P-value - results obtained with a GLM model. - - Attributes - ---------- - cluster_pvalues : np.ndarray - The cluster P-values. - """ - - cluster_pvalues: np.ndarray - - @property - def C(self): - """For compatibility with previous Matlab implementation.""" - return self.cluster_pvalues - - -@dataclass -class StatisticsResults(Results): - """This class implements a container for results obtained with - the GLM model classes. It holds information relative to a GLM - run with one specific contrast. - - Attributes - ---------- - coefficients : np.ndarray - The beta coefficients of the fitted GLM model. - - tstats : np.ndarray - The corresponding T-statistics. - - uncorrected_p_value : PValueResults - The corresponding uncorrected p values, stored in a `PValueResults` instance. - - fdr : np.ndarray - The corresponding False Discovery Rate. - - corrected_p_value : CorrectedPValueResults - The corresponding corrected p values, stored in a `CorrectedPValueResults` instance. - """ - - coefficients: np.ndarray - tstats: np.ndarray - uncorrected_p_values: PValueResults - fdr: np.ndarray - corrected_p_values: CorrectedPValueResults - - @property - def TStatistics(self): - """Needed for compatibility with previous implementation in Matlab.""" - return self.tstats - - @property - def uncorrectedPValue(self): - """Needed for compatibility with previous implementation in Matlab.""" - return self.uncorrected_p_values - - @property - def correctedPValue(self): - """Needed for compatibility with previous implementation in Matlab.""" - return self.corrected_p_values - - @property - def FDR(self): - """Needed for compatibility with previous implementation in Matlab.""" - return self.fdr - - @classmethod - def from_slm_model( - cls, - model: SLM, - mask: np.ndarray, - threshold_uncorrected_p_value: float, - threshold_corrected_p_value: float, - ): - """Instantiate from a SLM model. - - Parameters - ---------- - model : brainstat.stats.SLM - SLM model instance to use. - - mask : np.ndarray - The binary mask to use. - - threshold_uncorrected_p_value : float - The threshold to use with uncorrected P-values. - - threshold_corrected_p_value : float - The threshold to use with corrected P-values. - """ - idx = np.argwhere(np.isnan(model.t)) - corrected_pvals = model.P["pval"]["P"] - corrected_pvals[idx] = 1.0 - tstats = np.nan_to_num(model.t) - uncorrected_p_values = PValueResults.from_t_statistics( - tstats, - model.df, - mask, - threshold_uncorrected_p_value, - ) - corrected_p_values = CorrectedPValueResults( - corrected_pvals, - model.P["pval"]["C"], - mask, - threshold_corrected_p_value, - ) - return cls( - np.nan_to_num(model.coef), - tstats, - uncorrected_p_values, - model.Q, - corrected_p_values, - ) - - -class StatisticsResultsPlotter: - """Class responsible to plotting results of GLM fit. - - Attributes - ---------- - output_file : PathLike - Path to the output file. - - mesh : nilearn.surface.Mesh - The mesh to be used for plotting results. - """ - - def __init__(self, output_file: PathLike, mesh: Mesh): - self.output_file = output_file - self.mesh = mesh - self.plotting_extension = ".png" - self.no_plot = {"coefficients"} # Elements which should not be plotted - - def plot(self, result: StatisticsResults, method: str) -> None: - """Plot the results. - - Parameters - ---------- - result : StatisticsResults - The results to be plotted. - - method : str - The plotting method to use. - """ - plotter = self._get_plotter(method) - plotter(result) - - def _get_plotter(self, method: str) -> Callable[[StatisticsResults], None]: - """Returns the plotting method from its name. - - Parameters - ---------- - method : str - Name of the plotting method to use. - - Returns - ------- - Callable : - Plotting method. - """ - if method == "nilearn_plot_surf_stat_map": - return self._plot_stat_maps - else: - raise NotImplementedError(f"Plotting method {method} is not implemented.") - - def _plot_stat_maps(self, result: StatisticsResults) -> None: - """Wrapper around the `nilearn.plotting.plot_surf_stat_map` method. - - Parameters - ---------- - result : StatisticsResults - The results to plot. - """ - from nilearn.plotting import plot_surf_stat_map - - for name, res in result.to_dict().items(): - if name not in self.no_plot: - texture = res - threshold = None - plot_filename = ( - str(self.output_file) + "_" + name + self.plotting_extension - ) - if isinstance(res, dict): - texture = res["P"] - threshold = res["thresh"] - cprint(msg=f"Saving plot to {plot_filename}", lvl="info") - plot_surf_stat_map( - self.mesh, - texture, - threshold=threshold, - output_file=plot_filename, - title=name, - ) - - -class StatisticsResultsSerializer: - """This class is responsible for writing instances of `StatisticsResults` - to disk through different methods. - - Attributes - ---------- - output_file : PathLike - Path and filename root to be used. - """ - - def __init__(self, output_file: PathLike): - self.output_file = output_file - self.json_extension = "_results.json" - self.json_indent = 4 - self.mat_extension = ".mat" - - def save(self, result: StatisticsResults, method: str) -> None: - """Save provided `StatisticsResults` to disk with provided method. - - Parameters - ---------- - result : StatisticsResults - Results to be saved. - - method : str - Name of the saving method to use. - """ - writer = self._get_writer(method) - writer(result) - - def _get_writer(self, method: str) -> Callable[[StatisticsResults], None]: - """Returns a writer method from its name. - - Parameters - ---------- - method : str - The name of the writing method to use. - - Returns - ------- - Callable : - The writing method. - """ - if method.lower() == "json": - return self._write_to_json - elif method.lower() == "mat": - return self._write_to_mat - else: - raise NotImplementedError( - f"Serializing method {method} is not implemented." - ) - - def _write_to_json(self, results: StatisticsResults) -> None: - """Write the provided `StatisticsResults` to JSON format. - - Parameters - ---------- - results : StatisticsResults - The results to write to disk in JSON format. - """ - import json - import os - - out_json_file = Path(str(self.output_file) + self.json_extension) - if not os.path.exists(out_json_file.parents[0]): - os.makedirs(out_json_file.parents[0]) - cprint( - msg=f"Writing results to JSON in {out_json_file}...", - lvl="info", - ) - with open(out_json_file, "w") as fp: - json.dump(results.to_json(indent=self.json_indent), fp) - - def _write_to_mat(self, results: StatisticsResults) -> None: - """Write the provided `StatisticsResults` to MAT format. - - Parameters - ---------- - results : StatisticsResults - The results to write to disk in MAT format. - """ - from scipy.io import savemat - - # These labels are used for compatibility with the previous - # MATLAB implementation of the Statistics Surface Pipeline - # of Clinica. - struct_labels = { - "coefficients": "coef", - "TStatistics": "tvaluewithmask", - "uncorrectedPValue": "uncorrectedpvaluesstruct", - "correctedPValue": "correctedpvaluesstruct", - "FDR": "FDR", - } - for name, res in results.to_dict().items(): - if name in struct_labels: - mat_filename = str(self.output_file) + "_" + name + self.mat_extension - cprint( - msg=f"Writing {name} results to MAT in {mat_filename}", - lvl="info", - ) - savemat(mat_filename, {struct_labels[name]: res}) diff --git a/clinica/pipelines/statistics_surface/_utils.py b/clinica/pipelines/statistics_surface/_utils.py new file mode 100644 index 0000000000..3bc627fd6d --- /dev/null +++ b/clinica/pipelines/statistics_surface/_utils.py @@ -0,0 +1,322 @@ +from pathlib import Path +from typing import Optional + +__all__ = [ + "init_input_node", + "run_clinica_surfstat", + "save_to_caps", + "get_t1_freesurfer_custom_file", + "get_pet_surface_custom_file", + "create_glm_info_dictionary", + "build_design_matrix", +] + + +def get_t1_freesurfer_custom_file() -> str: + import os + + return os.path.join( + "%(subject)s", + "%(session)s", + "t1", + "freesurfer_cross_sectional", + "%(subject)s_%(session)s", + "surf", + "%(hemi)s.thickness.fwhm%(fwhm)s.fsaverage.mgh", + ) + + +def get_pet_surface_custom_file(acq_label: str, suvr_reference_region: str) -> str: + import os + + return os.path.join( + "%(subject)s", + "%(session)s", + "pet", + "surface", + f"%(subject)s_%(session)s_trc-{acq_label}_pet" + f"_space-fsaverage_suvr-{suvr_reference_region}_pvc-iy_hemi-%(hemi)s_fwhm-%(fwhm)s_projection.mgh", + ) + + +def init_input_node(parameters: dict, base_dir, subjects_visits_tsv): + """Initialize the pipeline. + + This function will: + - Create `surfstat_results_dir` in `base_dir`/ for SurfStat; + - Save pipeline parameters in JSON file; + - Copy TSV file with covariates; + - Print begin execution message. + + Parameters + ---------- + parameters : dict + The pipeline's parameters. + + base_dir : Path + The path to the pipeline's base directory. + This is a pathlib Path. No type hints because of Nipype. + + subjects_visits_tsv : Path + The path to the subjects TSV file. + This is a pathlib Path. No type hints because of Nipype. + + Returns + ------- + group_label : str + The group label. + + surfstat_results_dir : Path + The folder which will contain the results for SurfStat. + """ + import json + import shutil + + from clinica.pipelines.statistics_surface._utils import create_glm_info_dictionary + from clinica.utils.ux import print_begin_image + + group_id = "group-" + parameters["group_label"] + surfstat_results_dir = base_dir / group_id + surfstat_results_dir.mkdir(parents=True, exist_ok=True) + + # Save pipeline parameters in JSON file + glm_dict = create_glm_info_dictionary(subjects_visits_tsv, parameters) + with open(surfstat_results_dir / f"{group_id}_glm.json", "w") as json_file: + json.dump(glm_dict, json_file, indent=4) + + # Copy TSV file with covariates + tsv_filename = surfstat_results_dir / f"{group_id}_covariates.tsv" + shutil.copyfile(subjects_visits_tsv, tsv_filename) + + # Print begin message + list_keys = ["AnalysisType", "Covariates", "Contrast", "FWHM", "ClusterThreshold"] + list_values = [ + parameters["glm_type"], + parameters["covariates"], + parameters["contrast"], + str(parameters["full_width_at_half_maximum"]), + str(parameters["cluster_threshold"]), + ] + group_id = "group-" + parameters["group_label"] + print_begin_image(group_id, list_keys, list_values) + + return parameters["group_label"], surfstat_results_dir + + +def _get_string_format_from_tsv(tsv_file: Path) -> str: + """Determine string format from TSV file. + + If the TSV file is like: + + participant_id session_id sex group age + sub-CLNC0001 ses-M000 Female CN 71.1 + sub-CLNC0002 ses-M000 Male CN 81.3 + sub-CLNC0003 ses-M000 Male CN 75.4 + + The columns of the TSV file contains consecutively strings, strings, + strings, strings and float. The string_format is therefore "%s %s %s %s %f". + + Parameters + ---------- + tsv_file : Path + The path to the TSV file. + + Returns + ------- + str : + The string formatting of the TSV file (e.g. "%s %s %s %s %f") + """ + import pandas as pd + + demographics_df = pd.read_csv(tsv_file, sep="\t") + + return " ".join( + [ + _convert_dtype_to_str_format(demographics_df[column].dtype) + for column in demographics_df.columns + ] + ) + + +def _convert_dtype_to_str_format(dtype) -> str: + """Convert pandas dtypes (e.g. int64) to string format (e.g. %d)""" + import numpy as np + + if dtype == np.int64: + return "%d" + if dtype == np.float64: + return "%f" + if dtype == np.object: + return "%s" + raise ValueError(f"Unknown dtype (given: {dtype})") + + +def build_design_matrix(contrast: str, covariates: Optional[str] = None) -> str: + """Generate the design matrix for SurfStat based on the contrast and the optional list of covariates. + + Design matrix "1 + + + ... + " + + Example + ------- + >>> from clinica.pipelines.statistics_surface.statistics_surface_utils import _build_design_matrix + >>> _build_design_matrix('group', 'age sex group') + 1 + group + age + sex + >>> _build_design_matrix('group', 'age') + 1 + group + age + >>> _build_design_matrix('group', None) + 1 + group + """ + if covariates: + # Convert string to list while handling case where several spaces are present + list_covariates = list(covariates) + try: + list_covariates.remove("") + except ValueError: + pass + if contrast in list_covariates: + return "1 + " + " + ".join(covariate for covariate in list_covariates) + return ( + "1 + " + + contrast + + " + " + + " + ".join(covariate for covariate in list_covariates) + ) + return "1 + " + contrast + + +def run_clinica_surfstat( + caps_dir, # Path. no type hint because of Nipype self-contained requirement + output_dir, # Path. no type hint because of Nipype self-contained requirement + subjects_visits_tsv, # Path. no type hint because of Nipype self-contained requirement + pipeline_parameters: dict, +): + """Call clinica_surfstat function. + + Parameters + ---------- + caps_dir : Path + The path to CAPS directory containing surface-based features. + + output_dir : Path + The path to output directory that will contain outputs. + + subjects_visits_tsv : Path + The path to TSV file containing the GLM information. + + pipeline_parameters : dict + Parameters of StatisticsSurface pipeline. + + Returns + ------- + output_dir : Path + The path to the output directory. + """ + from pathlib import Path + + from clinica.pipelines.statistics_surface._utils import build_design_matrix + from clinica.pipelines.statistics_surface.surfstat import clinica_surfstat + from clinica.utils.check_dependency import check_environment_variable + + freesurfer_home = Path(check_environment_variable("FREESURFER_HOME", "FreeSurfer")) + + clinica_surfstat( + caps_dir / "subjects", + output_dir, + subjects_visits_tsv, + build_design_matrix( + pipeline_parameters["contrast"], pipeline_parameters["covariates"] + ), + pipeline_parameters["contrast"], + pipeline_parameters["glm_type"], + pipeline_parameters["group_label"], + freesurfer_home, + pipeline_parameters["measure_label"], + surface_file=pipeline_parameters["custom_file"], + fwhm=pipeline_parameters["full_width_at_half_maximum"], + cluster_threshold=pipeline_parameters["cluster_threshold"], + ) + return output_dir + + +def create_glm_info_dictionary(tsv_file: Path, pipeline_parameters: dict) -> dict: + """Create dictionary containing the GLM information that will be stored in a JSON file.""" + glm_info = { + # Clinica compulsory arguments + "AnalysisType": pipeline_parameters["glm_type"], + "DesignMatrix": build_design_matrix( + pipeline_parameters["contrast"], + pipeline_parameters["covariates"], + ), + "StringFormatTSV": _get_string_format_from_tsv(tsv_file), + "Contrast": pipeline_parameters["contrast"], + "GroupLabel": pipeline_parameters["group_label"], + # Optional arguments + "Covariates": pipeline_parameters["covariates"], + "FWHM": pipeline_parameters["full_width_at_half_maximum"], + # Optional arguments for custom pipeline + "custom_file": pipeline_parameters["custom_file"], + "measure_label": pipeline_parameters["measure_label"], + # Advanced arguments (i.e. tricky parameters) + "ThresholdUncorrectedPvalue": 0.001, + "ThresholdCorrectedPvalue": 0.05, + "ClusterThreshold": pipeline_parameters["cluster_threshold"], + } + # Optional arguments for inputs from pet-surface pipeline + if ( + pipeline_parameters["acq_label"] + and pipeline_parameters["suvr_reference_region"] + ): + glm_info["acq_label"] = pipeline_parameters["acq_label"] + glm_info["suvr_reference_region"] = pipeline_parameters["suvr_reference_region"] + + return glm_info + + +def save_to_caps( + source_dir, # Path. no type hint because of Nipype self-contained requirement + caps_dir, # Path. no type hint because of Nipype self-contained requirement + overwrite_caps: bool, + group_label: str, + glm_type: str, +) -> None: + """Save `source_dir`/ to CAPS folder. + + This function copies outputs of `source_dir`/ to + `caps_dir`/groups///surfstat_/ + + The `source_dir`/ folder should contain the following elements: + - group-_-lt-_measure-_fwhm-