1
1
import multiprocessing
2
2
import shutil
3
+ import sys
3
4
from operator import attrgetter
4
5
from pathlib import Path
5
6
from typing import Iterable , List , Optional , Sequence , Set , Tuple , Type , Union
9
10
import plotext as plt
10
11
from structlog import get_logger
11
12
from unblob_native import math_tools as mt
13
+ from unblob_native .sandbox import AccessFS , restrict_access # type: ignore
12
14
13
15
from unblob .handlers import BUILTIN_DIR_HANDLERS , BUILTIN_HANDLERS , Handlers
14
16
@@ -112,6 +114,29 @@ def get_extract_dir_for(self, path: Path) -> Path:
112
114
return extract_dir .expanduser ().resolve ()
113
115
114
116
117
+ def sandbox (extract_dir : Path , report_file : Optional [Path ]):
118
+ restrictions = [
119
+ AccessFS .read ("/" ),
120
+ AccessFS .read_write ("/dev/shm" ), # noqa: S108
121
+ AccessFS .read_write (extract_dir .as_posix ()),
122
+ AccessFS .make_dir (extract_dir .parent .as_posix ()),
123
+ ]
124
+
125
+ if report_file :
126
+ restrictions += [
127
+ AccessFS .read_write (report_file ),
128
+ AccessFS .make_reg (report_file .parent ),
129
+ ]
130
+
131
+ if "pytest" in sys .modules :
132
+ restrictions += [
133
+ AccessFS .read_write ("/tmp" ), # noqa: S108
134
+ AccessFS .read_write (Path (__file__ ).parent .parent .resolve ().as_posix ()),
135
+ ]
136
+
137
+ restrict_access (* restrictions )
138
+
139
+
115
140
@terminate_gracefully
116
141
def process_file (
117
142
config : ExtractionConfig , input_path : Path , report_file : Optional [Path ] = None
@@ -136,6 +161,10 @@ def process_file(
136
161
)
137
162
return ProcessResult ()
138
163
164
+ if not hasattr (process_file , "_sandboxed" ):
165
+ sandbox (extract_dir , report_file )
166
+ process_file ._sandboxed = True # noqa: SLF001
167
+
139
168
process_result = _process_task (config , task )
140
169
141
170
if not config .skip_extraction :
0 commit comments