Skip to content

Commit def05d2

Browse files
committed
Merge branch 'data-export' into 'develop'
CLI utility to export archive data See merge request nomad-lab/nomad-FAIR!1452
2 parents 9d907b5 + 88c59a9 commit def05d2

File tree

4 files changed

+86
-12
lines changed

4 files changed

+86
-12
lines changed

nomad/archive/query_reader.py

+1-2
Original file line numberDiff line numberDiff line change
@@ -713,8 +713,7 @@ def load_archive(self, upload_id: str, entry_id: str) -> ArchiveDict:
713713
self.upload_pool[upload_id] = upload.upload_files
714714

715715
try:
716-
with self.upload_pool[upload_id].read_archive(entry_id) as archive:
717-
return archive[entry_id]
716+
return self.upload_pool[upload_id].read_archive(entry_id)[entry_id]
718717
except KeyError:
719718
raise ArchiveError(f'Archive {entry_id} does not exist in upload {entry_id}.')
720719

nomad/archive/required.py

+7-3
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
Package
3131
from .query import ArchiveQueryError, to_json, _query_archive_key_pattern, _extract_key_and_index, _extract_child
3232
from .storage import ArchiveReader, ArchiveList, ArchiveError, ArchiveDict
33+
from .storage_v2 import ArchiveDict as NewArchiveDict
3334
from ..datamodel.context import parse_path, ServerContext
3435

3536

@@ -443,10 +444,13 @@ def _apply_required(
443444
if archive_item is None:
444445
return None # type: ignore
445446

446-
archive_item = to_json(archive_item)
447447
result: dict = {}
448448

449-
if isinstance(archive_item, dict) and 'm_def' in archive_item:
449+
# avoid the bug in the old reader that primitive key-value is not included in toc
450+
if isinstance(archive_item, ArchiveDict):
451+
archive_item = to_json(archive_item)
452+
453+
if isinstance(archive_item, (dict, NewArchiveDict)) and 'm_def' in archive_item:
450454
dataset = dataset.replace(definition=self._resolve_definition(
451455
dataset.upload_id, archive_item['m_def'].split('@')[0], dataset.archive_root))
452456
result['m_def'] = archive_item['m_def']
@@ -459,7 +463,7 @@ def _apply_required(
459463
return self._resolve_refs(dataset.definition, archive_item, dataset)
460464

461465
if directive in ['*', 'include']:
462-
return archive_item
466+
return to_json(archive_item)
463467

464468
raise ArchiveQueryError(f'unknown directive {required}')
465469

nomad/archive/storage.py

+1-7
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
# See the License for the specific language governing permissions and
1616
# limitations under the License.
1717
#
18-
import os
1918
from typing import Iterable, Any, Tuple, Dict, BinaryIO, Union, List, cast
2019
from io import BytesIO, BufferedReader
2120
from collections.abc import Mapping, Sequence
@@ -26,7 +25,7 @@
2625
import struct
2726
import json
2827

29-
from nomad import utils, config
28+
from nomad import utils
3029
from nomad.config import archive
3130

3231
__packer = msgpack.Packer(autoreset=True, use_bin_type=True)
@@ -552,11 +551,6 @@ def read_archive(file_or_path: Union[str, BytesIO], **kwargs) -> ArchiveReader:
552551
'''
553552
from .storage_v2 import ArchiveWriter as ArchiveWriterNew, ArchiveReader as ArchiveReaderNew
554553

555-
# if the file is smaller than the threshold, just read it into memory to avoid multiple small reads
556-
if isinstance(file_or_path, str) and os.path.getsize(file_or_path) < 4 * config.archive.read_buffer_size:
557-
with open(file_or_path, 'rb') as f:
558-
file_or_path = BytesIO(f.read())
559-
560554
if isinstance(file_or_path, str):
561555
with open(file_or_path, 'rb') as f:
562556
magic = f.read(ArchiveWriterNew.magic_len)

nomad/cli/admin/uploads.py

+77
Original file line numberDiff line numberDiff line change
@@ -268,6 +268,83 @@ def _query_uploads(
268268
return final_query, proc.Upload.objects(final_query)
269269

270270

271+
@uploads.command(help='List selected uploads')
272+
@click.argument('UPLOADS', nargs=-1)
273+
@click.option('--required', type=str, help='The required in JSON format')
274+
@click.option('-o', '--output', type=str, help='The file to write data to')
275+
@click.pass_context
276+
def export(ctx, uploads, required, output: str):
277+
import sys
278+
from nomad.processing import Entry
279+
from nomad.utils import get_logger
280+
from nomad.files import UploadFiles
281+
from nomad.archive import ArchiveQueryError, RequiredReader
282+
import time
283+
import zipfile
284+
285+
logger = get_logger(__name__)
286+
287+
if not output:
288+
logger.error('no output given')
289+
sys.exit(1)
290+
291+
if not output.endswith('.zip'):
292+
logger.error('only zip output is supported')
293+
sys.exit(1)
294+
295+
output_file = zipfile.ZipFile(output, 'w', allowZip64=True)
296+
297+
def write(entry_id, archive_data):
298+
archive_json = json.dumps(archive_data)
299+
output_file.writestr(f'{entry_id}.json', archive_json, compress_type=zipfile.ZIP_DEFLATED)
300+
301+
_, uploads = _query_uploads(uploads, **ctx.obj.uploads_kwargs)
302+
303+
try:
304+
required_data = json.loads(required)
305+
except Exception as e:
306+
logger.error('could not parse required', exc_info=e)
307+
sys.exit(1)
308+
309+
try:
310+
required_reader = RequiredReader(required_data)
311+
except Exception as e:
312+
logger.error('could not validate required', exc_info=e)
313+
sys.exit(1)
314+
315+
def get_rss():
316+
return time.time()
317+
318+
start_time = get_rss()
319+
320+
upload_count = 0
321+
total_count = 0
322+
for upload in uploads:
323+
upload_id = upload.upload_id
324+
upload_files = UploadFiles.get(upload_id)
325+
upload_count += 1
326+
entry_ids = list(entry.entry_id for entry in Entry.objects(upload_id=upload_id))
327+
entry_count = 0
328+
for entry_id in entry_ids:
329+
entry_count += 1
330+
total_count += 1
331+
try:
332+
archive = upload_files.read_archive(entry_id, use_blocked_toc=False)
333+
archive_data = required_reader.read(archive, entry_id, upload_id)
334+
write(entry_id, archive_data)
335+
except ArchiveQueryError as e:
336+
logger.error('could not read archive', exc_info=e, entry_id=entry_id)
337+
except KeyError as e:
338+
logger.error('missing archive', exc_info=e, entry_id=entry_id)
339+
340+
if total_count % 100 == 0:
341+
print(f'{upload_count:5}/{len(uploads)} {entry_count:5}/{len(entry_ids)} {total_count:5} {((get_rss() - start_time))} {upload_id}')
342+
343+
upload_files.close()
344+
345+
output_file.close()
346+
347+
271348
@uploads.command(help='List selected uploads')
272349
@click.argument('UPLOADS', nargs=-1)
273350
@click.option('-e', '--entries', is_flag=True, help='Show details about entries.')

0 commit comments

Comments
 (0)