3
3
import queue
4
4
import re
5
5
from concurrent .futures import Future
6
+ from typing import List
6
7
7
8
import cloudpickle
8
9
9
10
10
11
def deserialize (funct_dict : dict ) -> dict :
12
+ """
13
+ Deserialize a dictionary of serialized functions.
14
+
15
+ Args:
16
+ funct_dict (dict): A dictionary containing serialized functions.
17
+
18
+ Returns:
19
+ dict: A dictionary with deserialized functions.
20
+
21
+ """
11
22
try :
12
23
return {k : cloudpickle .loads (v ) for k , v in funct_dict .items ()}
13
24
except EOFError :
14
25
return {}
15
26
16
27
17
- def find_executed_tasks (future_queue : queue .Queue , cache_directory : str ):
28
+ def find_executed_tasks (future_queue : queue .Queue , cache_directory : str ) -> None :
29
+ """
30
+ Find executed tasks from the future queue and update the task memory dictionary.
31
+
32
+ Args:
33
+ future_queue (queue.Queue): The queue containing the futures of executed tasks.
34
+ cache_directory (str): The directory where the task cache is stored.
35
+
36
+ Returns:
37
+ None
38
+
39
+ """
18
40
task_memory_dict = {}
19
41
while True :
20
42
task_dict = {}
@@ -33,14 +55,36 @@ def find_executed_tasks(future_queue: queue.Queue, cache_directory: str):
33
55
34
56
35
57
def read_from_file (file_name : str ) -> dict :
58
+ """
59
+ Read the contents of a file and return it as a dictionary.
60
+
61
+ Args:
62
+ file_name (str): The name of the file to read.
63
+
64
+ Returns:
65
+ dict: A dictionary containing the contents of the file, with the file name as the key.
66
+
67
+ """
36
68
name = file_name .split ("/" )[- 1 ].split ("." )[0 ]
37
69
with open (file_name , "rb" ) as f :
38
70
return {name : f .read ()}
39
71
40
72
41
73
def reload_previous_futures (
42
74
future_queue : queue .Queue , future_dict : dict , cache_directory : str
43
- ):
75
+ ) -> None :
76
+ """
77
+ Reload previous futures from the cache directory and update the future dictionary.
78
+
79
+ Args:
80
+ future_queue (queue.Queue): The queue containing the futures of executed tasks.
81
+ future_dict (dict): A dictionary containing the current futures.
82
+ cache_directory (str): The directory where the task cache is stored.
83
+
84
+ Returns:
85
+ None
86
+
87
+ """
44
88
file_lst = os .listdir (cache_directory )
45
89
for f in file_lst :
46
90
if f .endswith (".in.pl" ):
@@ -56,16 +100,50 @@ def reload_previous_futures(
56
100
future_queue .put ({key : future_dict [key ]})
57
101
58
102
59
- def serialize_result (result_dict : dict ):
103
+ def serialize_result (result_dict : dict ) -> dict :
104
+ """
105
+ Serialize the values in a dictionary using cloudpickle.
106
+
107
+ Args:
108
+ result_dict (dict): A dictionary containing the values to be serialized.
109
+
110
+ Returns:
111
+ dict: A dictionary with serialized values.
112
+
113
+ """
60
114
return {k : cloudpickle .dumps (v ) for k , v in result_dict .items ()}
61
115
62
116
63
- def serialize_funct (fn : callable , * args , ** kwargs ):
117
+ def serialize_funct (fn : callable , * args , ** kwargs ) -> dict :
118
+ """
119
+ Serialize a function along with its arguments and keyword arguments.
120
+
121
+ Args:
122
+ fn (callable): The function to be serialized.
123
+ *args: The arguments to be passed to the function.
124
+ **kwargs: The keyword arguments to be passed to the function.
125
+
126
+ Returns:
127
+ dict: A dictionary containing the serialized function.
128
+
129
+ """
64
130
binary = cloudpickle .dumps ({"fn" : fn , "args" : args , "kwargs" : kwargs })
65
131
return {fn .__name__ + _get_hash (binary = binary ): binary }
66
132
67
133
68
- def write_to_file (funct_dict : dict , state , cache_directory : str ):
134
+ def write_to_file (funct_dict : dict , state : str , cache_directory : str ) -> List [str ]:
135
+ """
136
+ Write the contents of a dictionary to files in the cache directory.
137
+
138
+ Args:
139
+ funct_dict (dict): A dictionary containing the contents to be written.
140
+ state (str): The state of the files to be written.
141
+ cache_directory (str): The directory where the files will be written.
142
+
143
+ Returns:
144
+ List[str]: A list of file names that were written.
145
+
146
+ """
69
147
file_name_lst = []
70
148
for k , v in funct_dict .items ():
71
149
file_name = _get_file_name (name = k , state = state )
@@ -75,23 +153,69 @@ def write_to_file(funct_dict: dict, state, cache_directory: str):
75
153
return file_name_lst
76
154
77
155
78
- def _get_file_name (name , state ):
156
+ def _get_file_name (name : str , state : str ) -> str :
157
+ """
158
+ Generate a file name based on the given name and state.
159
+
160
+ Args:
161
+ name (str): The name to be included in the file name.
162
+ state (str): The state of the file.
163
+
164
+ Returns:
165
+ str: The generated file name.
166
+
167
+ """
79
168
return name + "." + state + ".pl"
80
169
81
170
82
- def _get_hash (binary ):
171
+ def _get_hash (binary : bytes ) -> str :
172
+ """
173
+ Get the hash of a binary using MD5 algorithm.
174
+
175
+ Args:
176
+ binary (bytes): The binary data to be hashed.
177
+
178
+ Returns:
179
+ str: The hexadecimal representation of the hash.
180
+
181
+ """
83
182
# Remove specification of jupyter kernel from hash to be deterministic
84
183
binary_no_ipykernel = re .sub (b"(?<=/ipykernel_)(.*)(?=/)" , b"" , binary )
85
184
return str (hashlib .md5 (binary_no_ipykernel ).hexdigest ())
86
185
87
186
88
- def _set_future (file_name , future ):
187
+ def _set_future (file_name : str , future : Future ) -> None :
188
+ """
189
+ Set the result of a future based on the contents of a file.
190
+
191
+ Args:
192
+ file_name (str): The name of the file containing the result.
193
+ future (Future): The future to set the result for.
194
+
195
+ Returns:
196
+ None
197
+
198
+ """
89
199
values = deserialize (funct_dict = read_from_file (file_name = file_name )).values ()
90
200
if len (values ) == 1 :
91
201
future .set_result (list (values )[0 ])
92
202
93
203
94
- def _update_task_dict (task_dict , task_memory_dict , cache_directory ):
204
+ def _update_task_dict (
205
+ task_dict : dict , task_memory_dict : dict , cache_directory : str
206
+ ) -> None :
207
+ """
208
+ Update the task memory dictionary with the futures from the task dictionary.
209
+
210
+ Args:
211
+ task_dict (dict): A dictionary containing the futures of tasks.
212
+ task_memory_dict (dict): The dictionary to store the task memory.
213
+ cache_directory (str): The directory where the task cache is stored.
214
+
215
+ Returns:
216
+ None
217
+
218
+ """
95
219
file_lst = os .listdir (cache_directory )
96
220
for key , future in task_dict .items ():
97
221
task_memory_dict [key ] = future
0 commit comments