diff --git a/petab/__init__.py b/petab/__init__.py index 792ad8fa..cbf32b58 100644 --- a/petab/__init__.py +++ b/petab/__init__.py @@ -1,4 +1,13 @@ -"""PEtab exports""" +"""PEtab exports + +Attributes: + ENV_NUM_THREADS: + Name of environment variable to set number of threads or processes + PEtab should use for operations that can be performed in parallel. + By default, all operations are performed sequentially. +""" + +ENV_NUM_THREADS = "PETAB_NUM_THREADS" from .composite_problem import * # noqa: F403, F401 from .conditions import * # noqa: F403, F401 diff --git a/petab/parameter_mapping.py b/petab/parameter_mapping.py index 3fb3da75..f2de9356 100644 --- a/petab/parameter_mapping.py +++ b/petab/parameter_mapping.py @@ -3,6 +3,7 @@ import logging import numbers +import os import re from typing import Tuple, Dict, Union, Any, List, Optional, Iterable @@ -11,6 +12,8 @@ import pandas as pd from . import lint, measurements, sbml +from . import ENV_NUM_THREADS + logger = logging.getLogger(__name__) @@ -34,6 +37,9 @@ def get_optimization_to_simulation_parameter_mapping( """ Create list of mapping dicts from PEtab-problem to SBML parameters. + Mapping can be performed in parallel. The number of threads is controlled + by the environment variable with the name of petab.ENV_NUM_THREADS. + Parameters: condition_df, measurement_df, parameter_df: The dataframes in the PEtab format. @@ -63,36 +69,77 @@ def get_optimization_to_simulation_parameter_mapping( simulation_conditions = measurements.get_simulation_conditions( measurement_df) - mapping = [] - for condition_ix, condition in simulation_conditions.iterrows(): - cur_measurement_df = measurements.get_rows_for_condition( - measurement_df, condition) - - if 'preequilibrationConditionId' not in condition \ - or not isinstance(condition.preequilibrationConditionId, str) \ - or not condition.preequilibrationConditionId: - preeq_map = {} - else: - preeq_map = get_parameter_mapping_for_condition( - condition_id=condition.preequilibrationConditionId, - is_preeq=True, - cur_measurement_df=cur_measurement_df, - condition_df=condition_df, - parameter_df=parameter_df, sbml_model=sbml_model, - warn_unmapped=warn_unmapped - ) - - sim_map = get_parameter_mapping_for_condition( - condition_id=condition.simulationConditionId, - is_preeq=False, + simulation_parameter_ids = sbml.get_model_parameters(sbml_model) + + num_threads = int(os.environ.get(ENV_NUM_THREADS, 1)) + + # If sequential execution is request, let's not create any + # thread-allocation overhead + if num_threads == 1: + mapping = map( + _map_condition, + _map_condition_arg_packer( + simulation_conditions, measurement_df, condition_df, + parameter_df, simulation_parameter_ids, warn_unmapped)) + return list(mapping) + + # Run multi-threaded + from concurrent.futures import ThreadPoolExecutor + with ThreadPoolExecutor(max_workers=num_threads) as executor: + mapping = executor.map( + _map_condition, + _map_condition_arg_packer( + simulation_conditions, measurement_df, condition_df, + parameter_df, simulation_parameter_ids, warn_unmapped)) + return list(mapping) + + +def _map_condition_arg_packer(simulation_conditions, measurement_df, + condition_df, parameter_df, + simulation_parameter_ids, warn_unmapped): + """Helper function to pack extra arguments for _map_condition""" + for _, condition in simulation_conditions.iterrows(): + yield(condition, measurement_df, condition_df, parameter_df, + simulation_parameter_ids, warn_unmapped) + + +def _map_condition(packed_args): + """Helper function for parallel condition mapping. + + For arguments see get_optimization_to_simulation_parameter_mapping""" + + (condition, measurement_df, condition_df, parameter_df, + simulation_parameter_ids, warn_unmapped) = packed_args + + cur_measurement_df = measurements.get_rows_for_condition( + measurement_df, condition) + + if 'preequilibrationConditionId' not in condition \ + or not isinstance(condition.preequilibrationConditionId, str) \ + or not condition.preequilibrationConditionId: + preeq_map = {} + else: + preeq_map = get_parameter_mapping_for_condition( + condition_id=condition.preequilibrationConditionId, + is_preeq=True, cur_measurement_df=cur_measurement_df, condition_df=condition_df, - parameter_df=parameter_df, sbml_model=sbml_model, + parameter_df=parameter_df, + simulation_parameter_ids=simulation_parameter_ids, warn_unmapped=warn_unmapped ) - mapping.append((preeq_map, sim_map),) - return mapping + sim_map = get_parameter_mapping_for_condition( + condition_id=condition.simulationConditionId, + is_preeq=False, + cur_measurement_df=cur_measurement_df, + condition_df=condition_df, + parameter_df=parameter_df, + simulation_parameter_ids=simulation_parameter_ids, + warn_unmapped=warn_unmapped + ) + + return preeq_map, sim_map def get_parameter_mapping_for_condition( @@ -101,7 +148,8 @@ def get_parameter_mapping_for_condition( cur_measurement_df: pd.DataFrame, condition_df: pd.DataFrame, parameter_df: pd.DataFrame = None, - sbml_model: libsbml.Model = None, + sbml_model: Optional[libsbml.Model] = None, + simulation_parameter_ids: Optional[List[str]] = None, warn_unmapped: bool = True) -> ParMappingDict: """ Create dictionary of mappings from PEtab-problem to SBML parameters for the @@ -122,7 +170,13 @@ def get_parameter_mapping_for_condition( sbml_model: The sbml model with observables and noise specified according to - the PEtab format. + the PEtab format used to retrieve simulation parameter IDs. + Mutually exclusive with ``simulation_parameter_ids``. + + simulation_parameter_ids: + Simulation parameter IDs used for mapping (output of + ``petab.sbml.get_model_parameters``). Mutually exclusive with + ``sbml_model``. warn_unmapped: If ``True``, log warning regarding unmapped parameters @@ -134,11 +188,17 @@ def get_parameter_mapping_for_condition( """ _perform_mapping_checks(cur_measurement_df) - par_sim_ids = sbml.get_model_parameters(sbml_model) + if simulation_parameter_ids is not None and sbml_model is None: + pass + elif simulation_parameter_ids is None and sbml_model is not None: + simulation_parameter_ids = sbml.get_model_parameters(sbml_model) + else: + raise ValueError("Must provide exactly one of `sbml_model` and " + "`simulation_parameter_ids`.") # initialize mapping dict # for the case of matching simulation and optimization parameter vector - mapping = {par: par for par in par_sim_ids} + mapping = {par: par for par in simulation_parameter_ids} _apply_dynamic_parameter_overrides(mapping, condition_id, condition_df) @@ -146,7 +206,6 @@ def get_parameter_mapping_for_condition( _apply_output_parameter_overrides(mapping, cur_measurement_df) fill_in_nominal_values(mapping, parameter_df) - # TODO fill in fixed parameters (#103) handle_missing_overrides(mapping, warn=warn_unmapped) diff --git a/tests/test_parameter_mapping.py b/tests/test_parameter_mapping.py index d717825b..3cc3d7d0 100644 --- a/tests/test_parameter_mapping.py +++ b/tests/test_parameter_mapping.py @@ -1,4 +1,5 @@ import numpy as np +import os import pandas as pd import petab from petab.sbml import add_global_parameter @@ -95,6 +96,15 @@ def test_all_override(condition_df_2_conditions, assert actual == expected + # For one case we test parallel execution, which must yield the same + # result + os.environ[petab.ENV_NUM_THREADS] = "4" + actual = petab.get_optimization_to_simulation_parameter_mapping( + measurement_df=measurement_df, + condition_df=condition_df, + sbml_model=sbml_model) + assert actual == expected + @staticmethod def test_partial_override(condition_df_2_conditions, minimal_sbml_model): @@ -138,6 +148,13 @@ def test_partial_override(condition_df_2_conditions, sbml_model=sbml_model ) + # Comparison with NaN containing expected results fails after pickling! + # Need to test first for correct NaNs, then for the rest. + assert np.isnan(expected[0][1]['observableParameter1_obs2']) + assert np.isnan(actual[0][1]['observableParameter1_obs2']) + expected[0][1]['observableParameter1_obs2'] = 0.0 + actual[0][1]['observableParameter1_obs2'] = 0.0 + assert actual == expected @staticmethod