@@ -99,42 +99,27 @@ def _copy_hopsfs_model(self, model_path, dataset_model_version_path):
99
99
_ , file_name = os .path .split (path )
100
100
self ._dataset_api .copy (path , dataset_model_version_path + "/" + file_name )
101
101
102
- def _upload_local_model_folder (self , model_path , dataset_model_version_path ):
102
+ def _upload_local_model_folder (
103
+ self , local_model_path , model_version , dataset_model_name_path
104
+ ):
103
105
archive_out_dir = None
106
+ uploaded_archive_path = None
104
107
try :
105
108
archive_out_dir = tempfile .TemporaryDirectory (dir = os .getcwd ())
106
- archive_path = util .compress (archive_out_dir .name , model_path )
107
- self ._dataset_api .upload (archive_path , dataset_model_version_path )
109
+ archive_path = util .compress (
110
+ archive_out_dir .name , str (model_version ), local_model_path
111
+ )
112
+ uploaded_archive_path = (
113
+ dataset_model_name_path + "/" + os .path .basename (archive_path )
114
+ )
115
+ self ._dataset_api .upload (archive_path , dataset_model_name_path )
116
+ self ._dataset_api .unzip (uploaded_archive_path , block = True , timeout = 600 )
108
117
except RestAPIError :
109
118
raise
110
119
finally :
111
120
if archive_out_dir is not None :
112
121
archive_out_dir .cleanup ()
113
-
114
- extracted_archive_path = (
115
- dataset_model_version_path + "/" + os .path .basename (archive_path )
116
- )
117
-
118
- self ._dataset_api .unzip (extracted_archive_path , block = True , timeout = 480 )
119
-
120
- self ._dataset_api .rm (extracted_archive_path )
121
-
122
- extracted_model_dir = (
123
- dataset_model_version_path
124
- + "/"
125
- + os .path .basename (archive_path [: archive_path .index ("." )])
126
- )
127
-
128
- # Observed that when decompressing a large folder and directly moving the files sometimes caused filesystem exceptions
129
- time .sleep (5 )
130
-
131
- for artifact in os .listdir (model_path ):
132
- _ , file_name = os .path .split (artifact )
133
- self ._dataset_api .move (
134
- extracted_model_dir + "/" + file_name ,
135
- dataset_model_version_path + "/" + file_name ,
136
- )
137
- self ._dataset_api .rm (extracted_model_dir )
122
+ self ._dataset_api .rm (uploaded_archive_path )
138
123
139
124
def _set_model_version (
140
125
self , model_instance , dataset_models_root_path , dataset_model_path
@@ -147,7 +132,10 @@ def _set_model_version(
147
132
]:
148
133
_ , file_name = os .path .split (item ["attributes" ]["path" ])
149
134
try :
150
- current_version = int (file_name )
135
+ try :
136
+ current_version = int (file_name )
137
+ except ValueError :
138
+ continue
151
139
if current_version > current_highest_version :
152
140
current_highest_version = current_version
153
141
except RestAPIError :
@@ -199,20 +187,16 @@ def save(self, model_instance, model_path, await_registration=480):
199
187
)
200
188
201
189
# Create /Models/{model_instance._name} folder
202
- dataset_model_path = dataset_models_root_path + "/" + model_instance ._name
203
- if not self ._dataset_api .path_exists (dataset_model_path ):
204
- self ._dataset_api .mkdir (dataset_model_path )
190
+ dataset_model_name_path = dataset_models_root_path + "/" + model_instance ._name
191
+ if not self ._dataset_api .path_exists (dataset_model_name_path ):
192
+ self ._dataset_api .mkdir (dataset_model_name_path )
205
193
206
194
model_instance = self ._set_model_version (
207
- model_instance , dataset_models_root_path , dataset_model_path
195
+ model_instance , dataset_models_root_path , dataset_model_name_path
208
196
)
209
197
210
198
dataset_model_version_path = (
211
- dataset_models_root_path
212
- + "/"
213
- + model_instance ._name
214
- + "/"
215
- + str (model_instance ._version )
199
+ dataset_model_name_path + "/" + str (model_instance ._version )
216
200
)
217
201
218
202
# Attach model summary xattr to /Models/{model_instance._name}/{model_instance._version}
@@ -264,17 +248,23 @@ def save(self, model_instance, model_path, await_registration=480):
264
248
)
265
249
if step ["id" ] == 2 :
266
250
# Upload Model files from local path to /Models/{model_instance._name}/{model_instance._version}
267
- if os .path .exists (model_path ): # check local absolute
251
+ # check local absolute
252
+ if os .path .exists (model_path ):
268
253
self ._upload_local_model_folder (
269
- model_path , dataset_model_version_path
254
+ model_path ,
255
+ model_instance .version ,
256
+ dataset_model_name_path ,
270
257
)
258
+ # check local relative
271
259
elif os .path .exists (
272
260
os .path .join (os .getcwd (), model_path )
273
261
): # check local relative
274
262
self ._upload_local_model_folder (
275
263
os .path .join (os .getcwd (), model_path ),
276
- dataset_model_version_path ,
264
+ model_instance .version ,
265
+ dataset_model_name_path ,
277
266
)
267
+ # check project relative
278
268
elif self ._dataset_api .path_exists (
279
269
model_path
280
270
): # check hdfs relative and absolute
0 commit comments