Skip to content

Commit 5adc607

Browse files
authored
Merge pull request #84 from franchuterivera/refactor_development_loggermsg
Reduce Deadlock Probability
2 parents e7ff3f1 + e336373 commit 5adc607

File tree

23 files changed

+368
-97
lines changed

23 files changed

+368
-97
lines changed

.github/workflows/pytest.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ jobs:
2929
- name: Run tests
3030
run: |
3131
if [ ${{ matrix.code-cov }} ]; then codecov='--cov=autoPyTorch --cov-report=xml'; fi
32-
python -m pytest --durations=20 --timeout=500 --timeout-method=thread -v $codecov test
32+
python -m pytest --durations=20 --timeout=300 --timeout-method=thread -v $codecov test
3333
- name: Check for files left behind by test
3434
if: ${{ always() }}
3535
run: |

autoPyTorch/api/base_task.py

Lines changed: 59 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import math
55
import multiprocessing
66
import os
7+
import platform
78
import sys
89
import tempfile
910
import time
@@ -186,6 +187,8 @@ def __init__(
186187

187188
self.stop_logging_server = None # type: Optional[multiprocessing.synchronize.Event]
188189

190+
self._dask_client = None
191+
189192
self.search_space_updates = search_space_updates
190193
if search_space_updates is not None:
191194
if not isinstance(self.search_space_updates,
@@ -504,7 +507,7 @@ def _do_dummy_prediction(self, num_run: int) -> None:
504507
backend=self._backend,
505508
seed=self.seed,
506509
metric=self._metric,
507-
logger=self._logger,
510+
logger_port=self._logger_port,
508511
cost_for_crash=get_cost_of_crash(self._metric),
509512
abort_on_first_run_crash=False,
510513
initial_num_run=num_run,
@@ -550,8 +553,11 @@ def _do_dummy_prediction(self, num_run: int) -> None:
550553

551554
def _do_traditional_prediction(self, num_run: int, time_for_traditional: int) -> int:
552555

556+
# Mypy Checkings -- Traditional prediction is only called for search
557+
# where the following objects are created
553558
assert self._metric is not None
554559
assert self._logger is not None
560+
assert self._dask_client is not None
555561

556562
self._logger.info("Starting to create dummy predictions.")
557563

@@ -573,7 +579,7 @@ def _do_traditional_prediction(self, num_run: int, time_for_traditional: int) ->
573579
backend=self._backend,
574580
seed=self.seed,
575581
metric=self._metric,
576-
logger=self._logger,
582+
logger_port=self._logger_port,
577583
cost_for_crash=get_cost_of_crash(self._metric),
578584
abort_on_first_run_crash=False,
579585
initial_num_run=num_run,
@@ -720,6 +726,9 @@ def _search(
720726

721727
self._backend.save_datamanager(dataset)
722728

729+
# Print debug information to log
730+
self._print_debug_info_to_log()
731+
723732
self._metric = get_metrics(
724733
names=[optimize_metric], dataset_properties=dataset_properties)[0]
725734

@@ -737,7 +746,14 @@ def _search(
737746
if self.task_type is None:
738747
raise ValueError("Cannot interpret task type from the dataset")
739748

740-
self._create_dask_client()
749+
# If no dask client was provided, we create one, so that we can
750+
# start a ensemble process in parallel to smbo optimize
751+
if (
752+
self._dask_client is None and (self.ensemble_size > 0 or self.n_jobs is not None and self.n_jobs > 1)
753+
):
754+
self._create_dask_client()
755+
else:
756+
self._is_dask_client_internally_created = False
741757

742758
# ============> Run dummy predictions
743759
num_run = 1
@@ -794,7 +810,7 @@ def _search(
794810
ensemble_memory_limit=self._memory_limit,
795811
random_state=self.seed,
796812
precision=precision,
797-
logger_port=self._logger_port
813+
logger_port=self._logger_port,
798814
)
799815
self._stopwatch.stop_task(ensemble_task_name)
800816

@@ -854,19 +870,21 @@ def _search(
854870
if proc_ensemble is not None:
855871
self.ensemble_performance_history = list(proc_ensemble.history)
856872

873+
if len(proc_ensemble.futures) > 0:
874+
# Also add ensemble runs that did not finish within smac time
875+
# and add them into the ensemble history
876+
self._logger.info("Ensemble script still running, waiting for it to finish.")
877+
result = proc_ensemble.futures.pop().result()
878+
if result:
879+
ensemble_history, _, _, _ = result
880+
self.ensemble_performance_history.extend(ensemble_history)
881+
self._logger.info("Ensemble script finished, continue shutdown.")
882+
857883
# save the ensemble performance history file
858884
if len(self.ensemble_performance_history) > 0:
859885
pd.DataFrame(self.ensemble_performance_history).to_json(
860886
os.path.join(self._backend.internals_directory, 'ensemble_history.json'))
861887

862-
if len(proc_ensemble.futures) > 0:
863-
future = proc_ensemble.futures.pop()
864-
# Now we need to wait for the future to return as it cannot be cancelled while it
865-
# is running: https://stackoverflow.com/a/49203129
866-
self._logger.info("Ensemble script still running, waiting for it to finish.")
867-
future.result()
868-
self._logger.info("Ensemble script finished, continue shutdown.")
869-
870888
self._logger.info("Closing the dask infrastructure")
871889
self._close_dask_client()
872890
self._logger.info("Finished closing the dask infrastructure")
@@ -1123,3 +1141,32 @@ def get_incumbent_config(
11231141
self
11241142
):
11251143
pass
1144+
1145+
def get_models_with_weights(self) -> List:
1146+
if self.models_ is None or len(self.models_) == 0 or \
1147+
self.ensemble_ is None:
1148+
self._load_models()
1149+
1150+
assert self.ensemble_ is not None
1151+
return self.ensemble_.get_models_with_weights(self.models_)
1152+
1153+
def show_models(self) -> str:
1154+
df = []
1155+
for weight, model in self.get_models_with_weights():
1156+
representation = model.get_pipeline_representation()
1157+
representation.update({'Weight': weight})
1158+
df.append(representation)
1159+
return pd.DataFrame(df).to_markdown()
1160+
1161+
def _print_debug_info_to_log(self) -> None:
1162+
"""
1163+
Prints to the log file debug information about the current estimator
1164+
"""
1165+
assert self._logger is not None
1166+
self._logger.debug("Starting to print environment information")
1167+
self._logger.debug(' Python version: %s', sys.version.split('\n'))
1168+
self._logger.debug(' System: %s', platform.system())
1169+
self._logger.debug(' Machine: %s', platform.machine())
1170+
self._logger.debug(' Platform: %s', platform.platform())
1171+
for key, value in vars(self).items():
1172+
self._logger.debug(f"\t{key}->{value}")

autoPyTorch/configs/default_pipeline_options.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
"runtime": 3600,
77
"torch_num_threads": 1,
88
"early_stopping": 20,
9-
"use_tensorboard_logger": "True",
9+
"use_tensorboard_logger": "False",
1010
"use_pynisher": "False",
1111
"metrics_during_training": "True"
1212
}

autoPyTorch/ensemble/abstract_ensemble.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from abc import ABCMeta, abstractmethod
2-
from typing import List, Tuple, Union
2+
from typing import Any, Dict, List, Tuple, Union
33

44
import numpy as np
55

@@ -49,7 +49,7 @@ def predict(self, base_models_predictions: Union[np.ndarray, List[np.ndarray]])
4949
self
5050

5151
@abstractmethod
52-
def get_models_with_weights(self, models: BasePipeline) -> List[Tuple[float, BasePipeline]]:
52+
def get_models_with_weights(self, models: Dict[Any, BasePipeline]) -> List[Tuple[float, BasePipeline]]:
5353
"""Return a list of (weight, model) pairs
5454
5555
Args:

autoPyTorch/ensemble/ensemble_builder.py

Lines changed: 61 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import logging
55
import logging.handlers
66
import math
7+
import multiprocessing
78
import numbers
89
import os
910
import pickle
@@ -157,7 +158,12 @@ def __call__(
157158
) -> None:
158159
self.build_ensemble(smbo.tae_runner.client)
159160

160-
def build_ensemble(self, dask_client: dask.distributed.Client) -> None:
161+
def build_ensemble(
162+
self,
163+
dask_client: dask.distributed.Client,
164+
pynisher_context: str = 'spawn',
165+
unit_test: bool = False
166+
) -> None:
161167

162168
# The second criteria is elapsed time
163169
elapsed_time = time.time() - self.start_time
@@ -227,11 +233,13 @@ def build_ensemble(self, dask_client: dask.distributed.Client) -> None:
227233
memory_limit=self.ensemble_memory_limit,
228234
read_at_most=self.read_at_most,
229235
random_state=self.seed,
230-
logger_port=self.logger_port,
231236
end_at=self.start_time + self.time_left_for_ensembles,
232237
iteration=self.iteration,
233238
return_predictions=False,
234239
priority=100,
240+
pynisher_context=pynisher_context,
241+
logger_port=self.logger_port,
242+
unit_test=unit_test,
235243
))
236244

237245
logger.info(
@@ -267,10 +275,12 @@ def fit_and_return_ensemble(
267275
memory_limit: Optional[int],
268276
read_at_most: int,
269277
random_state: int,
270-
logger_port: int,
271278
end_at: float,
272279
iteration: int,
273280
return_predictions: bool,
281+
pynisher_context: str,
282+
logger_port: int = logging.handlers.DEFAULT_TCP_LOGGING_PORT,
283+
unit_test: bool = False,
274284
) -> Tuple[
275285
List[Dict[str, float]],
276286
int,
@@ -317,13 +327,20 @@ def fit_and_return_ensemble(
317327
memory limit in mb. If ``None``, no memory limit is enforced.
318328
read_at_most: int
319329
read at most n new prediction files in each iteration
320-
logger_port: int
321-
port in localhost where to publish msg
322330
end_at: float
323331
At what time the job must finish. Needs to be the endtime and not the time left
324332
because we do not know when dask schedules the job.
325333
iteration: int
326334
The current iteration
335+
pynisher_context: str
336+
Context to use for multiprocessing, can be either fork, spawn or forkserver.
337+
logger_port: int
338+
The port where the logging server is listening to.
339+
unit_test: bool
340+
Turn on unit testing mode. This currently makes fit_ensemble raise a MemoryError.
341+
Having this is very bad coding style, but I did not find a way to make
342+
unittest.mock work through the pynisher with all spawn contexts. If you know a
343+
better solution, please let us know by opening an issue.
327344
Returns
328345
-------
329346
List[Tuple[int, float, float, float]]
@@ -346,33 +363,36 @@ def fit_and_return_ensemble(
346363
read_at_most=read_at_most,
347364
random_state=random_state,
348365
logger_port=logger_port,
366+
unit_test=unit_test,
349367
).run(
350368
end_at=end_at,
351369
iteration=iteration,
352370
return_predictions=return_predictions,
371+
pynisher_context=pynisher_context,
353372
)
354373
return result
355374

356375

357376
class EnsembleBuilder(object):
358377
def __init__(
359-
self,
360-
backend: Backend,
361-
dataset_name: str,
362-
task_type: int,
363-
output_type: int,
364-
metrics: List[autoPyTorchMetric],
365-
opt_metric: str,
366-
ensemble_size: int = 10,
367-
ensemble_nbest: int = 100,
368-
max_models_on_disc: Union[float, int] = 100,
369-
performance_range_threshold: float = 0,
370-
seed: int = 1,
371-
precision: int = 32,
372-
memory_limit: Optional[int] = 1024,
373-
read_at_most: int = 5,
374-
random_state: Optional[Union[int, np.random.RandomState]] = None,
375-
logger_port: int = logging.handlers.DEFAULT_TCP_LOGGING_PORT,
378+
self,
379+
backend: Backend,
380+
dataset_name: str,
381+
task_type: int,
382+
output_type: int,
383+
metrics: List[autoPyTorchMetric],
384+
opt_metric: str,
385+
ensemble_size: int = 10,
386+
ensemble_nbest: int = 100,
387+
max_models_on_disc: Union[float, int] = 100,
388+
performance_range_threshold: float = 0,
389+
seed: int = 1,
390+
precision: int = 32,
391+
memory_limit: Optional[int] = 1024,
392+
read_at_most: int = 5,
393+
random_state: Optional[Union[int, np.random.RandomState]] = None,
394+
logger_port: int = logging.handlers.DEFAULT_TCP_LOGGING_PORT,
395+
unit_test: bool = False,
376396
):
377397
"""
378398
Constructor
@@ -420,7 +440,12 @@ def __init__(
420440
read_at_most: int
421441
read at most n new prediction files in each iteration
422442
logger_port: int
423-
port where to publish messages
443+
port that receives logging records
444+
unit_test: bool
445+
Turn on unit testing mode. This currently makes fit_ensemble raise a MemoryError.
446+
Having this is very bad coding style, but I did not find a way to make
447+
unittest.mock work through the pynisher with all spawn contexts. If you know a
448+
better solution, please let us know by opening an issue.
424449
"""
425450

426451
super(EnsembleBuilder, self).__init__()
@@ -461,6 +486,7 @@ def __init__(
461486
self.memory_limit = memory_limit
462487
self.read_at_most = read_at_most
463488
self.random_state = check_random_state(random_state)
489+
self.unit_test = unit_test
464490

465491
# Setup the logger
466492
self.logger_port = logger_port
@@ -564,6 +590,7 @@ def run(
564590
end_at: Optional[float] = None,
565591
time_buffer: int = 5,
566592
return_predictions: bool = False,
593+
pynisher_context: str = 'spawn', # only change for unit testing!
567594
) -> Tuple[
568595
List[Dict[str, float]],
569596
int,
@@ -625,12 +652,16 @@ def run(
625652
else:
626653
raise NotImplementedError()
627654

628-
if time_left - time_buffer < 1:
655+
wall_time_in_s = int(time_left - time_buffer)
656+
if wall_time_in_s < 1:
629657
break
658+
context = multiprocessing.get_context(pynisher_context)
659+
630660
safe_ensemble_script = pynisher.enforce_limits(
631-
wall_time_in_s=int(time_left - time_buffer),
661+
wall_time_in_s=wall_time_in_s,
632662
mem_in_mb=self.memory_limit,
633-
logger=self.logger
663+
logger=self.logger,
664+
context=context,
634665
)(self.main)
635666
safe_ensemble_script(time_left, iteration, return_predictions)
636667
if safe_ensemble_script.exit_status is pynisher.MemorylimitException:
@@ -1216,6 +1247,10 @@ def fit_ensemble(self, selected_keys: List[str]) -> Optional[EnsembleSelection]:
12161247
ensemble: EnsembleSelection
12171248
trained Ensemble
12181249
"""
1250+
1251+
if self.unit_test:
1252+
raise MemoryError()
1253+
12191254
predictions_train = [self.read_preds[k][Y_ENSEMBLE] for k in selected_keys]
12201255
include_num_runs = [
12211256
(

autoPyTorch/ensemble/ensemble_selection.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ def __str__(self) -> str:
189189

190190
def get_models_with_weights(
191191
self,
192-
models: BasePipeline
192+
models: Dict[Any, BasePipeline]
193193
) -> List[Tuple[float, BasePipeline]]:
194194
output = []
195195
for i, weight in enumerate(self.weights_):

autoPyTorch/ensemble/singlebest_ensemble.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import os
2-
from typing import List, Tuple, Union
2+
from typing import Any, Dict, List, Tuple, Union
33

44
import numpy as np
55

@@ -97,7 +97,7 @@ def __str__(self) -> str:
9797
enumerate(self.identifiers_)
9898
if self.weights_[idx] > 0]))
9999

100-
def get_models_with_weights(self, models: BasePipeline
100+
def get_models_with_weights(self, models: Dict[Any, BasePipeline]
101101
) -> List[Tuple[float, BasePipeline]]:
102102
output = []
103103
for i, weight in enumerate(self.weights_):

0 commit comments

Comments
 (0)