-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathpython_executor.py
50 lines (41 loc) · 1.5 KB
/
python_executor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import logging
import sys
from io import StringIO
from typing import Any, AnyStr
from .function import Function
class PythonExecutor(Function):
specification = {
"name": "execute_python_code",
"description": "Execute Python code and return the output.",
"parameters": {
"type": "object",
"properties": {
"code": {
"type": "string",
"description": "The Python code to execute.",
},
"continue": {
"type": "string",
"enum": ["true", "false"],
"description": "The boolean flag that determines whether to process output from the function. If set to true, the function's output will be processed. If set to false, the function's output will not be processed. ",
},
},
"required": ["code", "continue"],
},
}
def get_name(self) -> AnyStr:
return self.specification.get("name", "")
def func(self, data, **kwargs) -> Any:
code = data if isinstance(data, str) else data.get("code")
logging.warning(f">>> {code}")
# redirect stdout to a string
old_stdout = sys.stdout
redirected_output = sys.stdout = StringIO()
try:
exec(code)
except Exception as e:
logging.error(e)
finally:
# reset stdout
sys.stdout = old_stdout
return redirected_output.getvalue()