Skip to content

Commit

Permalink
Parallelize petab.get_optimization_to_simulation_parameter_mapping
Browse files Browse the repository at this point in the history
Run multi-threaded, controlled by environment variable. Closes #205
  • Loading branch information
dweindl authored Jan 17, 2020
1 parent a79e444 commit 1db777c
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 31 deletions.
11 changes: 10 additions & 1 deletion petab/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,13 @@
"""PEtab exports"""
"""PEtab exports
Attributes:
ENV_NUM_THREADS:
Name of environment variable to set number of threads or processes
PEtab should use for operations that can be performed in parallel.
By default, all operations are performed sequentially.
"""

ENV_NUM_THREADS = "PETAB_NUM_THREADS"

from .composite_problem import * # noqa: F403, F401
from .conditions import * # noqa: F403, F401
Expand Down
119 changes: 89 additions & 30 deletions petab/parameter_mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import logging
import numbers
import os
import re
from typing import Tuple, Dict, Union, Any, List, Optional, Iterable

Expand All @@ -11,6 +12,8 @@
import pandas as pd

from . import lint, measurements, sbml
from . import ENV_NUM_THREADS


logger = logging.getLogger(__name__)

Expand All @@ -34,6 +37,9 @@ def get_optimization_to_simulation_parameter_mapping(
"""
Create list of mapping dicts from PEtab-problem to SBML parameters.
Mapping can be performed in parallel. The number of threads is controlled
by the environment variable with the name of petab.ENV_NUM_THREADS.
Parameters:
condition_df, measurement_df, parameter_df:
The dataframes in the PEtab format.
Expand Down Expand Up @@ -63,36 +69,77 @@ def get_optimization_to_simulation_parameter_mapping(
simulation_conditions = measurements.get_simulation_conditions(
measurement_df)

mapping = []
for condition_ix, condition in simulation_conditions.iterrows():
cur_measurement_df = measurements.get_rows_for_condition(
measurement_df, condition)

if 'preequilibrationConditionId' not in condition \
or not isinstance(condition.preequilibrationConditionId, str) \
or not condition.preequilibrationConditionId:
preeq_map = {}
else:
preeq_map = get_parameter_mapping_for_condition(
condition_id=condition.preequilibrationConditionId,
is_preeq=True,
cur_measurement_df=cur_measurement_df,
condition_df=condition_df,
parameter_df=parameter_df, sbml_model=sbml_model,
warn_unmapped=warn_unmapped
)

sim_map = get_parameter_mapping_for_condition(
condition_id=condition.simulationConditionId,
is_preeq=False,
simulation_parameter_ids = sbml.get_model_parameters(sbml_model)

num_threads = int(os.environ.get(ENV_NUM_THREADS, 1))

# If sequential execution is request, let's not create any
# thread-allocation overhead
if num_threads == 1:
mapping = map(
_map_condition,
_map_condition_arg_packer(
simulation_conditions, measurement_df, condition_df,
parameter_df, simulation_parameter_ids, warn_unmapped))
return list(mapping)

# Run multi-threaded
from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=num_threads) as executor:
mapping = executor.map(
_map_condition,
_map_condition_arg_packer(
simulation_conditions, measurement_df, condition_df,
parameter_df, simulation_parameter_ids, warn_unmapped))
return list(mapping)


def _map_condition_arg_packer(simulation_conditions, measurement_df,
condition_df, parameter_df,
simulation_parameter_ids, warn_unmapped):
"""Helper function to pack extra arguments for _map_condition"""
for _, condition in simulation_conditions.iterrows():
yield(condition, measurement_df, condition_df, parameter_df,
simulation_parameter_ids, warn_unmapped)


def _map_condition(packed_args):
"""Helper function for parallel condition mapping.
For arguments see get_optimization_to_simulation_parameter_mapping"""

(condition, measurement_df, condition_df, parameter_df,
simulation_parameter_ids, warn_unmapped) = packed_args

cur_measurement_df = measurements.get_rows_for_condition(
measurement_df, condition)

if 'preequilibrationConditionId' not in condition \
or not isinstance(condition.preequilibrationConditionId, str) \
or not condition.preequilibrationConditionId:
preeq_map = {}
else:
preeq_map = get_parameter_mapping_for_condition(
condition_id=condition.preequilibrationConditionId,
is_preeq=True,
cur_measurement_df=cur_measurement_df,
condition_df=condition_df,
parameter_df=parameter_df, sbml_model=sbml_model,
parameter_df=parameter_df,
simulation_parameter_ids=simulation_parameter_ids,
warn_unmapped=warn_unmapped
)
mapping.append((preeq_map, sim_map),)

return mapping
sim_map = get_parameter_mapping_for_condition(
condition_id=condition.simulationConditionId,
is_preeq=False,
cur_measurement_df=cur_measurement_df,
condition_df=condition_df,
parameter_df=parameter_df,
simulation_parameter_ids=simulation_parameter_ids,
warn_unmapped=warn_unmapped
)

return preeq_map, sim_map


def get_parameter_mapping_for_condition(
Expand All @@ -101,7 +148,8 @@ def get_parameter_mapping_for_condition(
cur_measurement_df: pd.DataFrame,
condition_df: pd.DataFrame,
parameter_df: pd.DataFrame = None,
sbml_model: libsbml.Model = None,
sbml_model: Optional[libsbml.Model] = None,
simulation_parameter_ids: Optional[List[str]] = None,
warn_unmapped: bool = True) -> ParMappingDict:
"""
Create dictionary of mappings from PEtab-problem to SBML parameters for the
Expand All @@ -122,7 +170,13 @@ def get_parameter_mapping_for_condition(
sbml_model:
The sbml model with observables and noise specified according to
the PEtab format.
the PEtab format used to retrieve simulation parameter IDs.
Mutually exclusive with ``simulation_parameter_ids``.
simulation_parameter_ids:
Simulation parameter IDs used for mapping (output of
``petab.sbml.get_model_parameters``). Mutually exclusive with
``sbml_model``.
warn_unmapped:
If ``True``, log warning regarding unmapped parameters
Expand All @@ -134,19 +188,24 @@ def get_parameter_mapping_for_condition(
"""
_perform_mapping_checks(cur_measurement_df)

par_sim_ids = sbml.get_model_parameters(sbml_model)
if simulation_parameter_ids is not None and sbml_model is None:
pass
elif simulation_parameter_ids is None and sbml_model is not None:
simulation_parameter_ids = sbml.get_model_parameters(sbml_model)
else:
raise ValueError("Must provide exactly one of `sbml_model` and "
"`simulation_parameter_ids`.")

# initialize mapping dict
# for the case of matching simulation and optimization parameter vector
mapping = {par: par for par in par_sim_ids}
mapping = {par: par for par in simulation_parameter_ids}

_apply_dynamic_parameter_overrides(mapping, condition_id, condition_df)

if not is_preeq:
_apply_output_parameter_overrides(mapping, cur_measurement_df)

fill_in_nominal_values(mapping, parameter_df)

# TODO fill in fixed parameters (#103)

handle_missing_overrides(mapping, warn=warn_unmapped)
Expand Down
17 changes: 17 additions & 0 deletions tests/test_parameter_mapping.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import numpy as np
import os
import pandas as pd
import petab
from petab.sbml import add_global_parameter
Expand Down Expand Up @@ -95,6 +96,15 @@ def test_all_override(condition_df_2_conditions,

assert actual == expected

# For one case we test parallel execution, which must yield the same
# result
os.environ[petab.ENV_NUM_THREADS] = "4"
actual = petab.get_optimization_to_simulation_parameter_mapping(
measurement_df=measurement_df,
condition_df=condition_df,
sbml_model=sbml_model)
assert actual == expected

@staticmethod
def test_partial_override(condition_df_2_conditions,
minimal_sbml_model):
Expand Down Expand Up @@ -138,6 +148,13 @@ def test_partial_override(condition_df_2_conditions,
sbml_model=sbml_model
)

# Comparison with NaN containing expected results fails after pickling!
# Need to test first for correct NaNs, then for the rest.
assert np.isnan(expected[0][1]['observableParameter1_obs2'])
assert np.isnan(actual[0][1]['observableParameter1_obs2'])
expected[0][1]['observableParameter1_obs2'] = 0.0
actual[0][1]['observableParameter1_obs2'] = 0.0

assert actual == expected

@staticmethod
Expand Down

0 comments on commit 1db777c

Please sign in to comment.