Skip to content

Commit eac5062

Browse files
Yuanjing Shiylc
authored andcommitted
[Meta Schedule][M4a] Local runner (apache#9153)
* [Meta Schedule][M3a]Local runner (apache#479) * localrunner * localrunner init * linting * address comments * exception handling * single run testcase * two more cases added * add exception case * one case with AddModule added * address comments * address comments * remove unused dependency * optional arguments * linting * add utils * linting * address comments * remove non-ascii commennt * add sanity check * address comments
1 parent dc30704 commit eac5062

File tree

5 files changed

+797
-53
lines changed

5 files changed

+797
-53
lines changed

python/tvm/meta_schedule/runner/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,4 +20,5 @@
2020
"""
2121
from .config import EvaluatorConfig, RPCConfig
2222
from .rpc_runner import RPCRunner
23+
from .local_runner import LocalRunner, LocalRunnerFuture
2324
from .runner import PyRunner, Runner, RunnerFuture, RunnerInput, RunnerResult
Lines changed: 359 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,359 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
"""Local Runner"""
18+
from contextlib import contextmanager
19+
from typing import Callable, List, Optional, Union
20+
import tvm
21+
22+
from ...contrib.popen_pool import PopenPoolExecutor
23+
from ...runtime import Device, Module
24+
from ..utils import get_global_func_with_default_on_worker
25+
from .config import EvaluatorConfig
26+
from .runner import PyRunner, RunnerFuture, RunnerInput, RunnerResult
27+
from .utils import (
28+
T_ARG_INFO_JSON_OBJ_LIST,
29+
T_ARGUMENT_LIST,
30+
alloc_argument_common,
31+
run_evaluator_common,
32+
)
33+
34+
35+
class LocalRunnerFuture(RunnerFuture):
36+
"""Local based runner future
37+
38+
Parameters
39+
----------
40+
res: Optional[List[float]]
41+
The optional result as a list of float.
42+
error_message: Optional[str]
43+
The optional error message.
44+
45+
Note
46+
----
47+
Only one of the parameters should be None upon the creation
48+
of LocalRunnerFuture object
49+
"""
50+
51+
res: Optional[List[float]]
52+
error_message: Optional[str]
53+
54+
def __init__(
55+
self, res: Optional[List[float]] = None, error_message: Optional[str] = None
56+
) -> None:
57+
"""Constructor
58+
59+
Parameters
60+
----------
61+
res: Optional[List[float]]
62+
The result of this LocalRunnerFuture
63+
error_message: Optional[str]
64+
The stringfied error message of any exception during execution
65+
66+
"""
67+
super().__init__()
68+
self.res = res
69+
self.error_message = error_message
70+
71+
# sanity check upon the creation of LocalRunnerFuture object
72+
if (res is None and error_message is None) or (
73+
res is not None and error_message is not None
74+
):
75+
raise AttributeError(
76+
"Only one of the two parameters should be None upon the creation"
77+
"of LocalRunnerFuture object."
78+
)
79+
80+
def done(self) -> bool:
81+
return True
82+
83+
def result(self) -> RunnerResult:
84+
return RunnerResult(self.res, self.error_message)
85+
86+
87+
class LocalRunner(PyRunner):
88+
"""Local runner
89+
90+
Parameters
91+
----------
92+
evaluator_config: EvaluatorConfig
93+
The evaluator configuration.
94+
cooldown_sec: float
95+
The cooldown in seconds.
96+
alloc_repeat: int
97+
The number of times to repeat the allocation.
98+
f_alloc_argument: Optional[str, Callable]
99+
The function name to allocate the arguments or the function itself.
100+
f_run_evaluator: Optional[str, Callable]
101+
The function name to run the evaluator or the function itself.
102+
f_cleanup: Optional[str, Callable]
103+
The function name to cleanup the session or the function itself.
104+
pool: PopenPoolExecutor
105+
The popen pool executor.
106+
107+
Attributes
108+
----------
109+
T_ALLOC_ARGUMENT : typing._GenericAlias
110+
The signature of the function `f_alloc_argument`, which is:
111+
112+
.. code-block:: python
113+
114+
def default_alloc_argument(
115+
device: Device,
116+
args_info: T_ARG_INFO_JSON_OBJ_LIST,
117+
alloc_repeat: int,
118+
) -> List[T_ARGUMENT_LIST]:
119+
...
120+
121+
T_RUN_EVALUATOR : typing._GenericAlias
122+
The signature of the function `f_run_evaluator`, which is:
123+
124+
.. code-block:: python
125+
126+
def default_run_evaluator(
127+
rt_mod: Module,
128+
device: Device,
129+
evaluator_config: EvaluatorConfig,
130+
repeated_args: List[T_ARGUMENT_LIST],
131+
) -> List[float]:
132+
...
133+
134+
T_CLEANUP : typing._GenericAlias
135+
The signature of the function `f_cleanup`, which is:
136+
137+
.. code-block:: python
138+
139+
def default_cleanup() -> None:
140+
...
141+
"""
142+
143+
T_ALLOC_ARGUMENT = Callable[
144+
[
145+
Device, # The device on the remote
146+
T_ARG_INFO_JSON_OBJ_LIST, # The metadata information of the arguments to be allocated
147+
int, # The number of repeated allocations to be done
148+
],
149+
List[T_ARGUMENT_LIST], # A list of argument lists
150+
]
151+
T_RUN_EVALUATOR = Callable[
152+
[
153+
Module, # The Module opened on the remote
154+
Device, # The device on the remote
155+
EvaluatorConfig, # The evaluator configuration
156+
List[T_ARGUMENT_LIST], # A list of argument lists
157+
],
158+
List[float], # A list of running time
159+
]
160+
T_CLEANUP = Callable[
161+
[],
162+
None,
163+
]
164+
165+
timeout_sec: float
166+
evaluator_config: EvaluatorConfig
167+
cooldown_sec: float
168+
alloc_repeat: int
169+
170+
f_alloc_argument: Union[T_ALLOC_ARGUMENT, str, None]
171+
f_run_evaluator: Union[T_RUN_EVALUATOR, str, None]
172+
f_cleanup: Union[T_CLEANUP, str, None]
173+
174+
pool: PopenPoolExecutor
175+
176+
def __init__(
177+
self,
178+
timeout_sec: float,
179+
evaluator_config: Optional[EvaluatorConfig] = None,
180+
cooldown_sec: float = 0.0,
181+
alloc_repeat: int = 1,
182+
f_alloc_argument: Optional[str] = None,
183+
f_run_evaluator: Optional[str] = None,
184+
f_cleanup: Optional[str] = None,
185+
initializer: Optional[Callable[[], None]] = None,
186+
) -> None:
187+
super().__init__()
188+
self.timeout_sec = timeout_sec
189+
self.evaluator_config = EvaluatorConfig._normalized(evaluator_config)
190+
self.cooldown_sec = cooldown_sec
191+
self.alloc_repeat = alloc_repeat
192+
self.f_alloc_argument = f_alloc_argument
193+
self.f_run_evaluator = f_run_evaluator
194+
self.f_cleanup = f_cleanup
195+
196+
self.pool = PopenPoolExecutor(
197+
max_workers=1, # one local worker
198+
timeout=timeout_sec,
199+
initializer=initializer,
200+
)
201+
self._sanity_check()
202+
203+
def run(self, runner_inputs: List[RunnerInput]) -> List[RunnerFuture]:
204+
results: List[RunnerFuture] = []
205+
for runner_input in runner_inputs:
206+
future = self.pool.submit(
207+
LocalRunner._worker_func,
208+
self.f_alloc_argument,
209+
self.f_run_evaluator,
210+
self.f_cleanup,
211+
self.evaluator_config,
212+
self.alloc_repeat,
213+
str(runner_input.artifact_path),
214+
str(runner_input.device_type),
215+
tuple(arg_info.as_json() for arg_info in runner_input.args_info),
216+
)
217+
try:
218+
result: List[float] = future.result()
219+
error_message: str = None
220+
except TimeoutError as exception:
221+
result: List[float] = None
222+
error_message: str = (
223+
f"LocalRunner: Timeout, killed after {self.timeout_sec} seconds\n"
224+
)
225+
except Exception as exception: # pylint: disable=broad-except
226+
result: List[float] = None
227+
error_message: str = "LocalRunner: An exception occurred\n" + str(exception)
228+
local_future = LocalRunnerFuture(res=result, error_message=error_message)
229+
results.append(local_future)
230+
return results
231+
232+
def _sanity_check(self) -> None:
233+
def _check(
234+
f_alloc_argument,
235+
f_run_evaluator,
236+
f_cleanup,
237+
) -> None:
238+
get_global_func_with_default_on_worker(name=f_alloc_argument, default=None)
239+
get_global_func_with_default_on_worker(name=f_run_evaluator, default=None)
240+
get_global_func_with_default_on_worker(name=f_cleanup, default=None)
241+
get_global_func_with_default_on_worker(
242+
name="tvm.contrib.random.random_fill", default=None
243+
)
244+
245+
value = self.pool.submit(
246+
_check,
247+
self.f_alloc_argument,
248+
self.f_run_evaluator,
249+
self.f_cleanup,
250+
)
251+
value.result()
252+
253+
@staticmethod
254+
def _worker_func(
255+
_f_alloc_argument: Optional[str],
256+
_f_run_evaluator: Optional[str],
257+
_f_cleanup: Optional[str],
258+
evaluator_config: EvaluatorConfig,
259+
alloc_repeat: int,
260+
artifact_path: str,
261+
device_type: str,
262+
args_info: T_ARG_INFO_JSON_OBJ_LIST,
263+
) -> List[float]:
264+
f_alloc_argument: LocalRunner.T_ALLOC_ARGUMENT = get_global_func_with_default_on_worker(
265+
_f_alloc_argument, default_alloc_argument
266+
)
267+
f_run_evaluator: LocalRunner.T_RUN_EVALUATOR = get_global_func_with_default_on_worker(
268+
_f_run_evaluator, default_run_evaluator
269+
)
270+
f_cleanup: LocalRunner.T_CLEANUP = get_global_func_with_default_on_worker(
271+
_f_cleanup, default_cleanup
272+
)
273+
274+
@contextmanager
275+
def resource_handler():
276+
try:
277+
yield
278+
finally:
279+
# Final step. Always clean up
280+
f_cleanup()
281+
282+
with resource_handler():
283+
# Step 1: create the local runtime module
284+
rt_mod = tvm.runtime.load_module(artifact_path)
285+
# Step 2: create the local device
286+
device = tvm.runtime.device(dev_type=device_type, dev_id=0)
287+
# Step 3: Allocate input arguments
288+
repeated_args: List[T_ARGUMENT_LIST] = f_alloc_argument(
289+
device,
290+
args_info,
291+
alloc_repeat,
292+
)
293+
# Step 4: Run time_evaluator
294+
costs: List[float] = f_run_evaluator(
295+
rt_mod,
296+
device,
297+
evaluator_config,
298+
repeated_args,
299+
)
300+
return costs
301+
302+
303+
def default_alloc_argument(
304+
device: Device,
305+
args_info: T_ARG_INFO_JSON_OBJ_LIST,
306+
alloc_repeat: int,
307+
) -> List[T_ARGUMENT_LIST]:
308+
"""Default function to allocate the arguments
309+
310+
Parameters
311+
----------
312+
device: Device
313+
The device to allocate the arguments
314+
args_info: T_ARG_INFO_JSON_OBJ_LIST
315+
The arguments info
316+
alloc_repeat: int
317+
The number of times to repeat the allocation
318+
319+
Returns
320+
-------
321+
repeated_args: List[T_ARGUMENT_LIST]
322+
The allocation args
323+
"""
324+
f_random_fill = get_global_func_with_default_on_worker(
325+
name="tvm.contrib.random.random_fill", default=None
326+
)
327+
return alloc_argument_common(f_random_fill, device, args_info, alloc_repeat)
328+
329+
330+
def default_run_evaluator(
331+
rt_mod: Module,
332+
device: Device,
333+
evaluator_config: EvaluatorConfig,
334+
repeated_args: List[T_ARGUMENT_LIST],
335+
) -> List[float]:
336+
"""Default function to run the evaluator
337+
338+
Parameters
339+
----------
340+
rt_mod: Module
341+
The runtime module
342+
device: Device
343+
The device to run the evaluator
344+
evaluator_config: EvaluatorConfig
345+
The evaluator config
346+
repeated_args: List[T_ARGUMENT_LIST]
347+
The repeated arguments
348+
349+
Returns
350+
-------
351+
costs: List[float]
352+
The evaluator results
353+
"""
354+
return run_evaluator_common(rt_mod, device, evaluator_config, repeated_args)
355+
356+
357+
def default_cleanup() -> None:
358+
"""Default function to clean up the session"""
359+
pass # pylint: disable=unnecessary-pass

0 commit comments

Comments
 (0)