forked from RasaHQ/rasa
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_memory_leak.py
228 lines (182 loc) · 7.18 KB
/
test_memory_leak.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
import abc
import json
import subprocess
import sys
import time
from pathlib import Path
from typing import Text, List, Tuple, Optional, Union
import memory_profiler
import psutil
import pytest
import rasa
import rasa.shared.utils.io
from rasa.utils.common import TempDirectoryPath, get_temp_dir_name
PROFILING_INTERVAL = 0.1
# Enable this to plot the results locally
WRITE_RESULTS_TO_DISK = False
def _custom_default_config(
tmp_path: Union[Path, Text], epochs: int, max_history: Optional[int] = -1
) -> Text:
# Override default config to use custom amount of epochs
default_config = Path("rasa", "shared", "importers", "default_config.yml")
config = rasa.shared.utils.io.read_yaml_file(default_config)
for model_part, items in config.items():
for item in items:
if "epochs" in item:
item["epochs"] = epochs
if "max_history" in item and max_history != -1:
item["max_history"] = None
config_for_test = Path(tmp_path) / "test_config.yml"
rasa.shared.utils.io.write_yaml(config, config_for_test)
return str(config_for_test)
class MemoryLeakTest(abc.ABC):
"""Generic template for memory leak tests."""
@property
def max_memory_threshold_mb(self) -> float:
return 1000
@pytest.fixture
@abc.abstractmethod
def name_for_dumped_files(self) -> Text:
raise NotImplementedError
@abc.abstractmethod
def function_to_profile(self) -> None:
raise NotImplementedError
@pytest.mark.timeout(720, func_only=True)
def test_for_memory_leak(self, name_for_dumped_files: Text, tmp_path: Path) -> None:
# Run as separate process to avoid other things affecting the memory usage.
# Unfortunately `memory-profiler` doesn't work properly with
# `multiprocessing.Process` as it can't handle the process exit
process = subprocess.Popen(
[
sys.executable,
"-c",
(
f"from {__name__} import {self.__class__.__name__}; "
f"t = {self.__class__.__name__}();"
f"t.function_to_profile()"
),
],
# Force TensorFlow to use CPU so we can track the memory usage
env={"CUDA_VISIBLE_DEVICES": "-1"},
)
# Wait until process is running to avoid race conditions with the memory
# profiling
while not psutil.pid_exists(process.pid):
time.sleep(0.01)
results = memory_profiler.memory_usage(
process, interval=PROFILING_INTERVAL, include_children=True, timestamps=True
)
# `memory-profiler` sometimes adds `None` values at the end which we don't need
results = [
memory_timestamp
for memory_timestamp in results
if memory_timestamp is not None
]
if WRITE_RESULTS_TO_DISK:
self._write_results(name_for_dumped_files, results)
max_memory_usage = max(results, key=lambda memory_time: memory_time[0])[0]
assert max_memory_usage < self.max_memory_threshold_mb
@staticmethod
def _write_results(base_name: Text, results: List[Tuple[float, float]]) -> None:
mprof_plot = Path(f"{base_name}_plot.txt")
mprof_results = Path(f"{base_name}_raw.json")
# plot this via `mprof plot mprof_result.txt`
with open(mprof_plot, "w") as f:
for memory, timestamp in results:
f.write(f"MEM {memory:.6f} {timestamp:.4f}\n")
# dump result as json to be able analyze them without re-running the test
with open(mprof_results, "w") as f:
f.write(json.dumps(results))
class TestNLULeakManyEpochs(MemoryLeakTest):
"""Tests for memory leaks in NLU components when training with many epochs."""
@property
def epochs(self) -> int:
return 30
@property
def max_memory_threshold_mb(self) -> float:
return 2200
def function_to_profile(self) -> None:
import rasa.model_training
with TempDirectoryPath(get_temp_dir_name()) as temp_dir:
rasa.model_training.train_nlu(
_custom_default_config(temp_dir, epochs=self.epochs),
Path("data", "test_nlu_no_responses", "sara_nlu_data.yml"),
output=temp_dir,
)
@pytest.fixture()
def name_for_dumped_files(self) -> Text:
return (
f"memory_usage_rasa_nlu_{rasa.__version__}_"
f"epochs{self.epochs}_training_runs1"
)
class TestCoreLeakManyEpochs(MemoryLeakTest):
"""Tests for memory leaks in Core policies when training with many epochs."""
@property
def epochs(self) -> int:
return 200
@property
def max_memory_threshold_mb(self) -> float:
return 2000
def function_to_profile(self) -> None:
import rasa.model_training
with TempDirectoryPath(get_temp_dir_name()) as temp_dir:
rasa.model_training.train_core(
"data/test_domains/default_with_slots.yml",
_custom_default_config(temp_dir, epochs=self.epochs, max_history=None),
"data/test_yaml_stories/stories_defaultdomain.yml",
output=temp_dir,
additional_arguments={"augmentation_factor": 20},
)
@pytest.fixture()
def name_for_dumped_files(self) -> Text:
return (
f"memory_usage_rasa_core_{rasa.__version__}_"
f"epochs{self.epochs}_training_runs1"
)
class TestCRFDenseFeaturesLeak(MemoryLeakTest):
"""Tests for memory leaks in NLU the CRF when using dense features."""
@property
def epochs(self) -> int:
return 1
@property
def max_memory_threshold_mb(self) -> float:
return 1600
def function_to_profile(self) -> None:
import rasa.model_training
config = {
"pipeline": [
{"name": "SpacyNLP"},
{"name": "SpacyTokenizer"},
{"name": "SpacyFeaturizer"},
{
"name": "CRFEntityExtractor",
"features": [
["pos", "pos2"],
[
"bias",
"prefix5",
"prefix2",
"suffix5",
"suffix3",
"suffix2",
"pos",
"pos2",
"digit",
"text_dense_features",
],
["pos", "pos2"],
],
},
]
}
with TempDirectoryPath(get_temp_dir_name()) as temp_dir:
config_for_test = Path(temp_dir) / "test_config.yml"
rasa.shared.utils.io.write_yaml(config, config_for_test)
rasa.model_training.train_nlu(
str(config_for_test),
str(Path("data", "test_nlu_no_responses", "sara_nlu_data.yml")),
output=temp_dir,
)
@pytest.fixture()
def name_for_dumped_files(self) -> Text:
return f"memory_usage_rasa_nlu_crf_dense_{rasa.__version__}_"