diff --git a/src/scippneutron/io/cif.py b/src/scippneutron/io/cif.py new file mode 100644 index 000000000..ff947a1d5 --- /dev/null +++ b/src/scippneutron/io/cif.py @@ -0,0 +1,654 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright (c) 2023 Scipp contributors (https://github.com/scipp) +"""CIF file writer. + +This module contains tools for writing `CIF `_ +files with diffraction data. +It does not support reading CIF files. + +Examples +-------- +Make mockup powder diffraction data: + + >>> import scipp as sc + >>> tof = sc.array(dims=['tof'], values=[1.2, 1.4, 2.3], unit='us') + >>> intensity = sc.array( + ... dims=['tof'], + ... values=[13.6, 26.0, 9.7], + ... variances=[0.7, 1.1, 0.5], + ... ) + +Wrap the data in a ``Loop`` to write them together as columns. +(Note that this particular example could more easily be done with +:math:`scippneutron.io.cif.Block.add_reduced_powder_data`.) + + >>> from scippneutron.io import cif + >>> tof_loop = cif.Loop({ + ... 'pd_meas.time_of_flight': tof, + ... 'pd_meas.intensity_total': sc.values(intensity), + ... 'pd_meas.intensity_total_su': sc.stddevs(intensity), + ... }) + +Write the data to file along with some metadata: + + >>> block = cif.Block('example', [ + ... { + ... 'diffrn_radiation.probe': 'neutron', + ... 'diffrn_source.beamline': 'some-beamline', + ... }, + ... tof_loop, + ... ]) + >>> cif.save_cif('example.cif', block) + +This results in a file containing + +.. code-block:: + + #\\#CIF_1.1 + data_example + + _diffrn_radiation.probe neutron + _diffrn_source.beamline some-beamline + + loop_ + _pd_meas.time_of_flight + _pd_meas.intensity_total + _pd_meas.intensity_total_su + 1.2 13.6 0.8366600265340756 + 1.4 26.0 1.0488088481701516 + 2.3 9.7 0.7071067811865476 +""" + +from __future__ import annotations + +import io +import warnings +from contextlib import contextmanager +from dataclasses import dataclass +from datetime import datetime +from pathlib import Path +from typing import Any, Iterable, Mapping, Optional, Union + +import scipp as sc + + +@dataclass(frozen=True) +class CIFSchema: + name: str + version: str + location: str + + +CORE_SCHEMA = CIFSchema( + name='coreCIF', + version='3.3.0', + location='https://github.com/COMCIFS/cif_core/blob/fc3d75a298fd7c0c3cde43633f2a8616e826bfd5/cif_core.dic', # noqa: E501 +) +PD_SCHEMA = CIFSchema( + name='pdCIF', + version='2.5.0', + location='https://github.com/COMCIFS/Powder_Dictionary/blob/7608b92165f58f968f054344e67662e01d4b401a/cif_pow.dic', # noqa: E501 +) + + +def save_cif( + fname: Union[str, Path, io.TextIOBase], blocks: Union[Block, Iterable[Block]] +) -> None: + """Save data blocks to a CIF file. + + To use, first create :class:`scippneutron.io.cif.Block` objects to collect and + structure data for the file, then use this function to write the file. + + Parameters + ---------- + fname: + Path or file handle for the output file. + blocks: + One or more CIF data blocks to write to the file. + """ + if isinstance(blocks, Block): + blocks = (blocks,) + with _open(fname) as f: + _write_file_heading(f) + _write_multi(f, blocks) + + +class Chunk: + """A group of CIF key-value pairs. + + Chunks contain one or more key-value pairs where values are scalars, + i.e., not arrays. + Chunks are represented in files as a group of pairs separated from + other chunks and loops by empty lines. + + Note that CIF has no concept of chunks; they are only used for organizing + data in ScippNeutron. + """ + + def __init__( + self, + pairs: Union[Mapping[str, Any], Iterable[tuple[str, Any]], None], + /, + *, + comment: str = '', + schema: Optional[Union[CIFSchema, Iterable[CIFSchema]]] = None, + ) -> None: + """Create a new CIF chunk. + + Parameters + ---------- + pairs: + Defines a mapping from keys (a.k.a. tags) to values. + comment: + Optional comment that can be written above the chunk in the file. + schema: + CIF Schema used for the chunk. + Content is not checked against the schema, but the schema is written + to the file. + """ + self._pairs = dict(pairs) if pairs is not None else {} + self._comment = '' + self.comment = comment + self._schema = _preprocess_schema(schema) + + @property + def comment(self) -> str: + """Optional comment that can be written above the chunk in the file.""" + return self._comment + + @comment.setter + def comment(self, comment: str) -> None: + self._comment = _encode_non_ascii(comment) + + @property + def schema(self) -> set[CIFSchema]: + """CIF Schema used for the chunk.""" + return self._schema + + def write(self, f: io.TextIOBase) -> None: + """Write this chunk to a file. + + Used mainly internally, use :func:`scippneutron.io.cif.save_cif` instead. + + Parameters + ---------- + f: + File handle. + """ + _write_comment(f, self.comment) + for key, val in self._pairs.items(): + v = _format_value(val) + if v.startswith(';'): + f.write(f'_{key}\n{v}\n') + else: + f.write(f'_{key} {v}\n') + + +class Loop: + """A CIF loop. + + Contains a mapping from strings to Scipp variables. + The strings are arbitrary and ``Loop`` can merge items from different categories + into a single loop. + All variables must have the same length. + """ + + def __init__( + self, + columns: Union[ + Mapping[str, sc.Variable], Iterable[tuple[str, sc.Variable]], None + ], + *, + comment: str = '', + schema: Optional[Union[CIFSchema, Iterable[CIFSchema]]] = None, + ) -> None: + """Create a new CIF loop. + + Parameters + ---------- + columns: + Defines a mapping from column names (including their category) + to column values as Scipp variables. + comment: + Optional comment that can be written above the loop in the file. + schema: + CIF Schema used for the loop. + Content is not checked against the schema, but the schema is written + to the file. + """ + self._columns = {} + for key, column in columns.items(): + self[key] = column + self._comment = '' + self.comment = comment + self._schema = _preprocess_schema(schema) + + @property + def comment(self) -> str: + """Optional comment that can be written above the loop in the file.""" + return self._comment + + @comment.setter + def comment(self, comment: str) -> None: + self._comment = _encode_non_ascii(comment) + + @property + def schema(self) -> set[CIFSchema]: + """CIF Schema used for the loop.""" + return self._schema + + def __setitem__(self, name: str, value: sc.Variable) -> None: + if value.ndim != 1: + raise sc.DimensionError( + "CIF loops can only contain 1d variables, got " f"{value.ndim} dims" + ) + if self._columns: + existing = next(iter(self._columns.values())).sizes + if existing != value.sizes: + raise sc.DimensionError( + f"Inconsistent dims in CIF loop: {value.sizes} " + f"loop dims: {existing}" + ) + + self._columns[name] = value + + def write(self, f: io.TextIOBase) -> None: + """Write this loop to a file. + + Used mainly internally, use :func:`scippneutron.io.cif.save_cif` instead. + + Parameters + ---------- + f: + File handle. + """ + _write_comment(f, self.comment) + f.write('loop_\n') + for key in self._columns: + f.write(f'_{key}\n') + formatted_values = [ + tuple(map(_format_value, row)) for row in zip(*self._columns.values()) + ] + # If any value is a multi-line string, lay out elements as a flat vertical + # list, otherwise use a 2d table. + sep = ( + '\n' + if any(';' in item for row in formatted_values for item in row) + else ' ' + ) + for row in formatted_values: + f.write(sep.join(row)) + f.write('\n') + + +class Block: + """A CIF data block. + + A block contains an ordered sequence of loops + and chunks (groups of key-value-pairs). + The contents are written to file in the order specified in the block. + """ + + def __init__( + self, + name: str, + content: Optional[Iterable[Union[Mapping[str, Any], Loop, Chunk]]] = None, + *, + comment: str = '', + schema: Optional[Union[CIFSchema, Iterable[CIFSchema]]] = None, + ) -> None: + """Create a new CIF data block. + + Parameters + ---------- + name: + Name of the block. + Can contain any non-whitespace characters. + Can be at most 75 characters long. + content: + Initial loops and chunks. + ``dicts`` are converted to :class:`scippneutron.io.cif.Chunk`. + comment: + Optional comment that can be written above the block in the file. + schema: + CIF Schema used for the block. + Content is not checked against the schema, but the schema is written + to the file. + """ + self._name = '' + self.name = name + self._content = _convert_input_content(content) if content is not None else [] + self._comment = '' + self.comment = comment + self._schema = _preprocess_schema(schema) + + @property + def name(self) -> str: + """Name of the block.""" + return self._name + + @name.setter + def name(self, name: str) -> None: + self._name = _encode_non_ascii(name) + if ' ' in self._name or '\t' in self._name or '\n' in self._name: + raise ValueError( + "Block name must not contain spaces or line breaks, " + f"got: '{self._name}'" + ) + if len(self._name) > 75: + warnings.warn( + "cif.Block name should not be longer than 75 characters, got " + f"{len(self._name)} characters ('{self._name}')", + UserWarning, + stacklevel=2, + ) + + @property + def comment(self) -> str: + """Optional comment that can be written above the block in the file.""" + return self._comment + + @comment.setter + def comment(self, comment: str) -> None: + self._comment = _encode_non_ascii(comment) + + @property + def schema(self) -> set[CIFSchema]: + """CIF Schema used for the block.""" + merged = set(self._schema) + for item in self._content: + merged.update(item.schema) + return merged + + def add( + self, + content: Union[Mapping[str, Any], Iterable[tuple[str, Any]], Chunk, Loop], + /, + comment: str = '', + ) -> None: + """Add a chunk or loop to the end of the block. + + Parameters + ---------- + content: + A loop, chunk, or mapping to add. + Mappings get converted to chunks. + comment: + Optional comment that can be written above the chunk or loop in the file. + """ + if not isinstance(content, (Chunk, Loop)): + content = Chunk(content, comment=comment) + self._content.append(content) + + def add_reduced_powder_data(self, data: sc.DataArray, *, comment: str = '') -> None: + """Add a loop with reduced powder data. + + The input must be 1-dimensional with a dimension name in + ``('tof', 'dspacing')``. + The data array may also have a name in + ``('intensity_net', 'intensity_norm', 'intensity_total')``. + If the name is not set, it defaults to ``'intensity_net'``. + + The data gets written as intensity along a single coord whose + name matches the dimension name. + Standard uncertainties are also written if present. + + The unit of the coordinate must match the requirement of pdCIF. + + Parameters + ---------- + data: + 1-dimensional data array with a recognized dimension name + comment: + Optional comment that can be written above the data in the file. + + Examples + -------- + Make mockup powder diffraction data: + + >>> import scipp as sc + >>> tof = sc.array(dims=['tof'], values=[1.2, 1.4, 2.3], unit='us') + >>> intensity = sc.array( + ... dims=['tof'], + ... values=[13.6, 26.0, 9.7], + ... variances=[0.7, 1.1, 0.5], + ... ) + + Add to a block: + + >>> from scippneutron.io import cif + >>> block = cif.Block('reduced-data') + >>> da = sc.DataArray(intensity, coords={'tof': tof}) + >>> block.add_reduced_powder_data(da) + """ + self.add(_make_reduced_powder_loop(data, comment=comment)) + + def add_powder_calibration(self, cal: sc.DataArray, *, comment: str = '') -> None: + r"""Add a powder calibration table. + + The calibration data encode the following transformation from + d-spacing to time-of-flight: + + .. math:: + + t = \sum_{i=0}^N\, c_i d^{p_i} + + where :math:`c_i` is the i-th element of ``cal`` and :math:`p^{p_i}` + is the i-th element of ``cal.coords['power']``. + + Parameters + ---------- + cal: + The data are the calibration coefficients (possibly with variances). + Must have a coordinate called ``'power'`` defining :math:`p` in the + equation above. + comment: + Optional comment that can be written above the data in the file. + + Examples + -------- + Add a mockup calibration table: + + >>> import scipp as sc + >>> from scippneutron.io import cif + >>> cal = sc.DataArray( + ... sc.array(dims=['cal'], values=[3.4, 0.2]), + ... coords={'power': sc.array(dims=['cal'], values=[0, 1])}, + ... ) + >>> block = cif.Block('powder-calibration') + >>> block.add_powder_calibration(cal) + """ + self.add(_make_powder_calibration_loop(cal, comment=comment)) + + def write(self, f: io.TextIOBase) -> None: + """Write this block to a file. + + Used mainly internally, use :func:`scippneutron.io.cif.save_cif` instead. + + Parameters + ---------- + f: + File handle. + """ + schema_loop = _make_schema_loop(self.schema) + + _write_comment(f, self.comment) + f.write(f'data_{self.name}\n\n') + if schema_loop is not None: + schema_loop.write(f) + f.write('\n') + _write_multi(f, self._content) + + +def _convert_input_content( + content: Iterable[Union[Mapping[str, Any], Loop, Chunk]] +) -> list[Union[Loop, Chunk]]: + return [ + item if isinstance(item, (Loop, Chunk)) else Chunk(item) for item in content + ] + + +@contextmanager +def _open(fname: Union[str, Path, io.TextIOBase]): + if isinstance(fname, io.TextIOBase): + yield fname + else: + with open(fname, 'w') as f: + yield f + + +def _preprocess_schema( + schema: Optional[Union[CIFSchema, Iterable[CIFSchema]]] +) -> set[CIFSchema]: + if schema is None: + return set() + if isinstance(schema, CIFSchema): + res = {schema} + else: + res = set(schema) + res.add(CORE_SCHEMA) # needed to encode schema itself + return res + + +def _make_schema_loop(schema: set[CIFSchema]) -> Optional[Loop]: + if not schema: + return None + columns = { + 'audit_conform.dict_name': [], + 'audit_conform.dict_version': [], + 'audit_conform.dict_location': [], + } + for s in schema: + columns['audit_conform.dict_name'].append(s.name) + columns['audit_conform.dict_version'].append(s.version) + columns['audit_conform.dict_location'].append(s.location) + return Loop( + {key: sc.array(dims=['schema'], values=val) for key, val in columns.items()} + ) + + +def _quotes_for_string_value(value: str) -> Optional[str]: + if '\n' in value: + return ';' + if "'" in value: + if '"' in value: + return ';' + return '"' + if '"' in value: + return "'" + if ' ' in value: + return "'" + return None + + +def _encode_non_ascii(s: str) -> str: + return s.encode('ascii', 'backslashreplace').decode('ascii') + + +def _format_value(value: Any) -> str: + if isinstance(value, sc.Variable): + if value.variance is not None: + without_unit = sc.scalar(value.value, variance=value.variance) + s = f'{without_unit:c}' + else: + s = str(value.value) + elif isinstance(value, datetime): + s = value.isoformat() + else: + s = str(value) + + s = _encode_non_ascii(s) + + if (quotes := _quotes_for_string_value(s)) == ';': + return f'; {s}\n;' + elif quotes is not None: + return quotes + s + quotes + return s + + +def _write_comment(f: io.TextIOBase, comment: str) -> None: + if comment: + f.write('# ') + f.write('\n# '.join(comment.splitlines())) + f.write('\n') + + +def _write_multi(f: io.TextIOBase, to_write: Iterable[Any]) -> None: + first = True + for item in to_write: + if not first: + f.write('\n') + first = False + item.write(f) + + +def _write_file_heading(f: io.TextIOBase) -> None: + f.write('#\\#CIF_1.1\n') + + +def _reduced_powder_coord(data) -> tuple[str, sc.Variable]: + if data.ndim != 1: + raise sc.DimensionError(f'Can only save 1d powder data, got {data.ndim} dims.') + known_coords = { + 'tof': ('pd_meas.time_of_flight', 'us'), + 'dspacing': ('pd_proc.d_spacing', 'Å'), + } + try: + name, unit = known_coords[data.dim] + except KeyError: + raise sc.CoordError( + f'Unrecognized dim: {data.dim}. Must be one of {list(known_coords)}' + ) from None + + coord = data.coords[data.dim] + if coord.unit != unit: + raise sc.UnitError( + f'Incorrect unit for powder coordinate {name}: {coord.unit} ' + f'expected {unit}' + ) + return name, coord + + +def _normalize_reduced_powder_name(name: str) -> str: + if name not in ('intensity_net', 'intensity_norm', 'intensity_total'): + raise ValueError(f'Unrecognized name for reduced powder data: {name}') + return f'pd_proc.{name}' + + +def _make_reduced_powder_loop(data: sc.DataArray, comment: str) -> Loop: + coord_name, coord = _reduced_powder_coord(data) + data_name = _normalize_reduced_powder_name(data.name or 'intensity_net') + + res = Loop({coord_name: sc.values(coord)}, comment=comment, schema=PD_SCHEMA) + if coord.variances is not None: + res[coord_name + '_su'] = sc.stddevs(coord) + res[data_name] = sc.values(data.data) + if data.variances is not None: + res[data_name + '_su'] = sc.stddevs(data.data) + + if data.unit != 'one': + res.comment = f'Unit of intensity: [{data.unit}]' + + return res + + +def _make_powder_calibration_loop(data: sc.DataArray, comment: str) -> Loop: + id_by_power = {0: 'tzero', 1: 'DIFC', 2: 'DIFA', -1: 'DIFB'} + ids = sc.array( + dims=[data.dim], + values=[ + id_by_power.get(power, str(power)) for power in data.coords['power'].values + ], + ) + res = Loop( + { + 'pd_calib_d_to_tof.id': ids, + 'pd_calib_d_to_tof.power': data.coords['power'], + 'pd_calib_d_to_tof.coeff': sc.values(data.data), + }, + comment=comment, + schema=PD_SCHEMA, + ) + if data.variances is not None: + res['pd_calib_d_to_tof.coeff_su'] = sc.stddevs(data.data) + return res diff --git a/tests/io/cif_test.py b/tests/io/cif_test.py new file mode 100644 index 000000000..4c65812a7 --- /dev/null +++ b/tests/io/cif_test.py @@ -0,0 +1,903 @@ +# SPDX-License-Identifier: BSD-3-Clause +# Copyright (c) 2023 Scipp contributors (https://github.com/scipp) + +import io +from datetime import datetime, timezone +from pathlib import Path + +import pytest +import scipp as sc + +from scippneutron.io import cif + + +def write_to_str(block: cif.Block) -> str: + buffer = io.StringIO() + block.write(buffer) + buffer.seek(0) + return buffer.getvalue() + + +def test_write_block_empty(): + block = cif.Block('a-block-name') + res = write_to_str(block) + assert res == 'data_a-block-name\n\n' + + +def test_write_block_name_with_space(): + with pytest.raises(ValueError): + cif.Block('a block-name with space') + + +def test_write_block_comment(): + block = cif.Block('a-block-name', comment='some comment\n to describe the block') + res = write_to_str(block) + assert ( + res + == '''# some comment +# to describe the block +data_a-block-name + +''' + ) + + +def test_write_block_single_pair_string(): + block = cif.Block('single', [{'audit.creation_method': 'written_by_scippneutron'}]) + res = write_to_str(block) + assert ( + res + == '''data_single + +_audit.creation_method written_by_scippneutron +''' + ) + + +def test_write_block_single_pair_string_variable(): + block = cif.Block( + 'single', [{'audit.creation_method': sc.scalar('written_by_scippneutron')}] + ) + res = write_to_str(block) + assert ( + res + == '''data_single + +_audit.creation_method written_by_scippneutron +''' + ) + + +def test_write_block_single_pair_number(): + block = cif.Block('number', [{'cell.angle_alpha': 62}]) + res = write_to_str(block) + assert ( + res + == '''data_number + +_cell.angle_alpha 62 +''' + ) + + +@pytest.mark.parametrize('unit', (None, 'deg')) +def test_write_block_single_pair_number_variable(unit): + block = cif.Block('number', [{'cell.angle_alpha': sc.scalar(93, unit=unit)}]) + res = write_to_str(block) + assert ( + res + == '''data_number + +_cell.angle_alpha 93 +''' + ) + + +@pytest.mark.parametrize('unit', (None, 'deg')) +def test_write_block_single_pair_number_error(unit): + block = cif.Block( + 'number', [{'cell.angle_alpha': sc.scalar(93.2, variance=2.1**2, unit=unit)}] + ) + res = write_to_str(block) + assert ( + res + == '''data_number + +_cell.angle_alpha 93(2) +''' + ) + + +def test_write_block_single_pair_datetime(): + dt = datetime( + year=2023, month=12, day=1, hour=15, minute=9, second=45, tzinfo=timezone.utc + ) + block = cif.Block( + 'datetime', + [ + { + 'audit.creation_date': dt, + } + ], + ) + res = write_to_str(block) + assert ( + res + == '''data_datetime + +_audit.creation_date 2023-12-01T15:09:45+00:00 +''' + ) + + +def test_write_block_single_pair_datetime_variable(): + block = cif.Block( + 'datetime', + [ + { + 'audit.creation_date': sc.datetime('2023-12-01T15:12:33'), + } + ], + ) + res = write_to_str(block) + # No timezone info in the output! + assert ( + res + == '''data_datetime + +_audit.creation_date 2023-12-01T15:12:33 +''' + ) + + +def test_write_block_single_pair_space(): + block = cif.Block('single', [{'audit.creation_method': 'written by scippneutron'}]) + res = write_to_str(block) + assert ( + res + == '''data_single + +_audit.creation_method 'written by scippneutron' +''' + ) + + +def test_write_block_single_pair_single_quote(): + block = cif.Block( + 'single', [{'audit.creation_method': "written by 'scippneutron'"}] + ) + res = write_to_str(block) + assert ( + res + == '''data_single + +_audit.creation_method "written by 'scippneutron'" +''' + ) + + +def test_write_block_single_pair_double_quote(): + block = cif.Block( + 'single', [{'audit.creation_method': 'written by "scippneutron"'}] + ) + res = write_to_str(block) + assert ( + res + == '''data_single + +_audit.creation_method 'written by "scippneutron"' +''' + ) + + +def test_write_block_single_pair_both_quotes(): + block = cif.Block( + 'single', [{'audit.creation_method': """'written by "scippneutron"'"""}] + ) + res = write_to_str(block) + assert ( + res + == '''data_single + +_audit.creation_method +; 'written by "scippneutron"' +; +''' + ) + + +def test_write_block_single_pair_newline(): + block = cif.Block( + 'single', + [{'audit.creation_method': "written by scippneutron\n version 2000"}], + ) + res = write_to_str(block) + assert ( + res + == '''data_single + +_audit.creation_method +; written by scippneutron + version 2000 +; +''' + ) + + +def test_write_block_single_pair_utf8(): + block = cif.Block('utf-8', [{'audit.creation_method': 'Unicode: \xb5\xc5'}]) + res = write_to_str(block) + assert ( + res + == r'''data_utf-8 + +_audit.creation_method 'Unicode: \xb5\xc5' +''' + ) + + +def test_write_block_single_pair_single_line_comment(): + block = cif.Block('comment') + block.add({'diffrn_radiation.probe': 'neutron'}, comment='a comment') + res = write_to_str(block) + assert ( + res + == '''data_comment + +# a comment +_diffrn_radiation.probe neutron +''' + ) + + +def test_write_block_single_pair_multi_line_comment(): + block = cif.Block('comment') + block.add( + {'diffrn_radiation.probe': 'neutron'}, + comment='Guessing that\nthis is the\n correct probe', + ) + res = write_to_str(block) + assert ( + res + == '''data_comment + +# Guessing that +# this is the +# correct probe +_diffrn_radiation.probe neutron +''' + ) + + +def test_write_block_single_pair_single_line_comment_utf8(): + block = cif.Block('comment') + block.add({'diffrn_radiation.probe': 'neutron'}, comment='unicode: \xc5') + res = write_to_str(block) + assert ( + res + == r'''data_comment + +# unicode: \xc5 +_diffrn_radiation.probe neutron +''' + ) + + +def test_write_block_multiple_pairs(): + block = cif.Block( + 'multiple', + [ + { + 'audit.creation_method': 'written_by_scippneutron', + 'audit.creation_date': '2023-12-01T13:52:00Z', + } + ], + ) + res = write_to_str(block) + assert ( + res + == '''data_multiple + +_audit.creation_method written_by_scippneutron +_audit.creation_date 2023-12-01T13:52:00Z +''' + ) + + +def test_write_block_multiple_chunks(): + block = cif.Block( + 'multiple', + [ + { + 'audit.creation_method': 'written_by_scippneutron', + 'audit.creation_date': '2023-12-01T13:52:00Z', + } + ], + ) + block.add({'diffrn_radiation.probe': 'neutron'}) + res = write_to_str(block) + assert ( + res + == '''data_multiple + +_audit.creation_method written_by_scippneutron +_audit.creation_date 2023-12-01T13:52:00Z + +_diffrn_radiation.probe neutron +''' + ) + + +def test_write_block_multiple_chunks_comment(): + block = cif.Block( + 'multiple', + [ + { + 'audit.creation_method': 'written_by_scippneutron', + 'audit.creation_date': '2023-12-01T13:52:00Z', + } + ], + ) + block.add({'diffrn_radiation.probe': 'neutron'}, comment='Guessed') + res = write_to_str(block) + assert ( + res + == '''data_multiple + +_audit.creation_method written_by_scippneutron +_audit.creation_date 2023-12-01T13:52:00Z + +# Guessed +_diffrn_radiation.probe neutron +''' + ) + + +def test_write_block_single_loop_one_column(): + env = sc.array(dims=['x'], values=['water', 'sulfur']) + block = cif.Block('looped', [cif.Loop({'diffrn.ambient_environment': env})]) + res = write_to_str(block) + assert ( + res + == '''data_looped + +loop_ +_diffrn.ambient_environment +water +sulfur +''' + ) + + +def test_write_block_single_loop_one_column_comment(): + env = sc.array(dims=['x'], values=['water', 'sulfur']) + block = cif.Block( + 'looped', + [ + cif.Loop( + {'diffrn.ambient_environment': env}, + comment='This data is completely made up!', + ) + ], + ) + res = write_to_str(block) + assert ( + res + == '''data_looped + +# This data is completely made up! +loop_ +_diffrn.ambient_environment +water +sulfur +''' + ) + + +def test_write_block_single_loop_two_columns(): + env = sc.array(dims=['x'], values=['water', 'sulfur']) + id_ = sc.array(dims=['x'], values=['123', 'x6a']) + block = cif.Block( + 'looped', [cif.Loop({'diffrn.ambient_environment': env, 'diffrn.id': id_})] + ) + res = write_to_str(block) + assert ( + res + == '''data_looped + +loop_ +_diffrn.ambient_environment +_diffrn.id +water 123 +sulfur x6a +''' + ) + + +def test_write_block_single_loop_multi_line_string(): + env = sc.array(dims=['x'], values=['water\nand some salt', 'sulfur']) + id_ = sc.array(dims=['x'], values=['123', 'x6a']) + block = cif.Block( + 'looped', [cif.Loop({'diffrn.ambient_environment': env, 'diffrn.id': id_})] + ) + res = write_to_str(block) + assert ( + res + == '''data_looped + +loop_ +_diffrn.ambient_environment +_diffrn.id +; water +and some salt +; +123 +sulfur +x6a +''' + ) + + +def test_write_block_single_loop_numbers(): + coeff = sc.array(dims=['cal'], values=[3.65, -0.012, 1.2e-5]) + power = sc.array(dims=['cal'], values=[0, 1, 2]) + id_ = sc.array(dims=['cal'], values=['tzero', 'DIFC', 'DIFA']) + block = cif.Block( + 'looped', + [ + cif.Loop( + { + 'pd_calib_d_to_tof.id': id_, + 'pd_calib_d_to_tof.power': power, + 'pd_calib_d_to_tof.coeff': coeff, + } + ) + ], + ) + res = write_to_str(block) + assert ( + res + == '''data_looped + +loop_ +_pd_calib_d_to_tof.id +_pd_calib_d_to_tof.power +_pd_calib_d_to_tof.coeff +tzero 0 3.65 +DIFC 1 -0.012 +DIFA 2 1.2e-05 +''' + ) + + +def test_write_block_single_loop_numbers_errors(): + coeff = sc.array( + dims=['cal'], + values=[3.65, -0.012, 1.2e-5], + variances=[0.13**2, 0.001**2, 2e-6**2], + ) + power = sc.array(dims=['cal'], values=[0, 1, 2]) + id_ = sc.array(dims=['cal'], values=['tzero', 'DIFC', 'DIFA']) + block = cif.Block( + 'looped', + [ + cif.Loop( + { + 'pd_calib_d_to_tof.id': id_, + 'pd_calib_d_to_tof.power': power, + 'pd_calib_d_to_tof.coeff': coeff, + } + ) + ], + ) + res = write_to_str(block) + assert ( + res + == '''data_looped + +loop_ +_pd_calib_d_to_tof.id +_pd_calib_d_to_tof.power +_pd_calib_d_to_tof.coeff +tzero 0 3.65(13) +DIFC 1 -0.0120(10) +DIFA 2 0.000012(2) +''' + ) + + +def test_write_block_pairs_then_loop(): + env = sc.array(dims=['x'], values=['water', 'sulfur']) + block = cif.Block( + 'looped', + [ + { + 'audit.creation_method': 'written by scippneutron', + 'audit.creation_date': '2023-12-01T13:52:00Z', + }, + cif.Loop({'diffrn.ambient_environment': env}), + ], + ) + res = write_to_str(block) + assert ( + res + == '''data_looped + +_audit.creation_method 'written by scippneutron' +_audit.creation_date 2023-12-01T13:52:00Z + +loop_ +_diffrn.ambient_environment +water +sulfur +''' + ) + + +def test_write_block_loop_then_pairs(): + env = sc.array(dims=['x'], values=['water', 'sulfur']) + block = cif.Block( + 'looped', + [ + cif.Loop({'diffrn.ambient_environment': env}), + { + 'audit.creation_method': 'written by scippneutron', + 'audit.creation_date': '2023-12-01T13:52:00Z', + }, + ], + ) + res = write_to_str(block) + assert ( + res + == '''data_looped + +loop_ +_diffrn.ambient_environment +water +sulfur + +_audit.creation_method 'written by scippneutron' +_audit.creation_date 2023-12-01T13:52:00Z +''' + ) + + +def test_write_block_pair_then_loop_then_pairs(): + env = sc.array(dims=['x'], values=['water', 'sulfur']) + block = cif.Block( + 'looped', + [ + {'diffrn_radiation.probe': 'neutron'}, + cif.Loop({'diffrn.ambient_environment': env}), + { + 'audit.creation_method': 'written by scippneutron', + 'audit.creation_date': '2023-12-01T13:52:00Z', + }, + ], + ) + res = write_to_str(block) + assert ( + res + == '''data_looped + +_diffrn_radiation.probe neutron + +loop_ +_diffrn.ambient_environment +water +sulfur + +_audit.creation_method 'written by scippneutron' +_audit.creation_date 2023-12-01T13:52:00Z +''' + ) + + +def test_write_block_two_loops(): + env = sc.array(dims=['env'], values=['water', 'sulfur']) + author = sc.array(dims=['author'], values=['Ridcully, M.', 'Librarian']) + email = sc.array(dims=['author'], values=['m.ridcully@uu.am', 'lib@uu.am']) + block = cif.Block( + 'looped', + [ + cif.Loop({'diffrn.ambient_environment': env}), + cif.Loop({'audit_author.name': author, 'audit_author.email': email}), + ], + ) + res = write_to_str(block) + assert ( + res + == '''data_looped + +loop_ +_diffrn.ambient_environment +water +sulfur + +loop_ +_audit_author.name +_audit_author.email +'Ridcully, M.' m.ridcully@uu.am +Librarian lib@uu.am +''' + ) + + +def test_write_block_core_schema_from_chunk(): + chunk = cif.Chunk( + {'audit.creation_method': 'written by scippneutron'}, schema=cif.CORE_SCHEMA + ) + block = cif.Block('block-with-schema', [chunk]) + res = write_to_str(block) + assert ( + res + == '''data_block-with-schema + +loop_ +_audit_conform.dict_name +_audit_conform.dict_version +_audit_conform.dict_location +coreCIF 3.3.0 https://github.com/COMCIFS/cif_core/blob/fc3d75a298fd7c0c3cde43633f2a8616e826bfd5/cif_core.dic + +_audit.creation_method 'written by scippneutron' +''' # noqa: E501 + ) + + +def test_write_block_core_schema_from_loop(): + author = sc.array(dims=['author'], values=['Ridcully, M.', 'Librarian']) + email = sc.array(dims=['author'], values=['m.ridcully@uu.am', 'lib@uu.am']) + loop = cif.Loop( + {'audit_author.name': author, 'audit_author.email': email}, + schema=cif.CORE_SCHEMA, + ) + block = cif.Block('block-with-schema', [loop]) + res = write_to_str(block) + assert ( + res + == '''data_block-with-schema + +loop_ +_audit_conform.dict_name +_audit_conform.dict_version +_audit_conform.dict_location +coreCIF 3.3.0 https://github.com/COMCIFS/cif_core/blob/fc3d75a298fd7c0c3cde43633f2a8616e826bfd5/cif_core.dic + +loop_ +_audit_author.name +_audit_author.email +'Ridcully, M.' m.ridcully@uu.am +Librarian lib@uu.am +''' # noqa: E501 + ) + + +def test_write_block_pd_schema_from_chunk(): + chunk = cif.Chunk( + {'pd_meas.units_of_intensity': '1/(micro ampere)'}, schema=cif.PD_SCHEMA + ) + block = cif.Block('block-with-schema', [chunk]) + res = write_to_str(block) + # The order of schemas is arbitrary, so we cannot easily check the whole string. + assert 'pdCIF' in res + assert 'coreCIF' in res + + +def test_write_block_multi_schema_schema(): + core_chunk = cif.Chunk( + {'audit.creation_method': 'written by scippneutron'}, schema=cif.CORE_SCHEMA + ) + pd_chunk = cif.Chunk( + {'pd_meas.units_of_intensity': '1/(micro ampere)'}, schema=cif.PD_SCHEMA + ) + block = cif.Block('block-with-schema', [core_chunk, pd_chunk]) + res = write_to_str(block) + # The order of schemas is arbitrary, so we cannot easily check the whole string. + assert 'pdCIF' in res + assert 'coreCIF' in res + + +def test_save_cif_one_block_buffer(): + block1 = cif.Block( + 'block-1', [{'audit.creation_method': 'written by scippneutron'}] + ) + buffer = io.StringIO() + cif.save_cif(buffer, block1) + buffer.seek(0) + assert ( + buffer.read() + == r'''#\#CIF_1.1 +data_block-1 + +_audit.creation_method 'written by scippneutron' +''' + ) + + +def test_save_cif_two_blocks_buffer(): + env = sc.array(dims=['env'], values=['water', 'sulfur']) + block1 = cif.Block( + 'block-1', [{'audit.creation_method': 'written by scippneutron'}] + ) + block2 = cif.Block( + 'block-2', + [ + {'diffrn_radiation.probe': 'neutron'}, + cif.Loop({'diffrn.ambient_environment': env}), + ], + ) + buffer = io.StringIO() + cif.save_cif(buffer, [block1, block2]) + buffer.seek(0) + assert ( + buffer.read() + == r'''#\#CIF_1.1 +data_block-1 + +_audit.creation_method 'written by scippneutron' + +data_block-2 + +_diffrn_radiation.probe neutron + +loop_ +_diffrn.ambient_environment +water +sulfur +''' + ) + + +@pytest.mark.parametrize('path_type', (str, Path)) +def test_save_cif_one_block_file(tmpdir, path_type): + path = path_type(Path(tmpdir) / "test_save_cif_one_block.cif") + block1 = cif.Block( + 'block-1', [{'audit.creation_method': 'written by scippneutron'}] + ) + + cif.save_cif(path, block1) + with open(path, 'r') as f: + assert ( + f.read() + == r'''#\#CIF_1.1 +data_block-1 + +_audit.creation_method 'written by scippneutron' +''' + ) + + +def test_loop_requires_1d(): + with pytest.raises(sc.DimensionError): + cif.Loop({'fake': sc.zeros(sizes={'x': 4, 'y': 3})}) + + +def test_loop_requires_matching_dims(): + with pytest.raises(sc.DimensionError): + cif.Loop({'a': sc.zeros(sizes={'x': 4}), 'b': sc.zeros(sizes={'x': 3})}) + with pytest.raises(sc.DimensionError): + cif.Loop({'a': sc.zeros(sizes={'x': 4}), 'b': sc.zeros(sizes={'y': 4})}) + + +def test_block_with_reduced_powder_data(): + da = sc.DataArray( + sc.array( + dims=['tof'], + values=[13.6, 26.0, 9.7], + variances=[0.81, 1, 0.36], + ), + coords={'tof': sc.array(dims=['tof'], values=[1.2, 1.4, 2.3], unit='us')}, + ) + + block = cif.Block('reduced', []) + block.add_reduced_powder_data(da) + res = write_to_str(block) + + assert 'pdCIF' in res + assert 'coreCIF' in res + + _, _, tof_loop = res.split('\n\n') + assert ( + tof_loop + == '''loop_ +_pd_meas.time_of_flight +_pd_proc.intensity_net +_pd_proc.intensity_net_su +1.2 13.6 0.9 +1.4 26.0 1.0 +2.3 9.7 0.6 +''' + ) + + +def test_block_with_reduced_powder_data_custom_unit(): + da = sc.DataArray( + sc.array(dims=['tof'], values=[13.6, 26.0, 9.7], unit='counts'), + coords={'tof': sc.array(dims=['tof'], values=[1.2, 1.4, 2.3], unit='us')}, + ) + + block = cif.Block('reduced', []) + block.add_reduced_powder_data(da) + res = write_to_str(block) + + assert 'pdCIF' in res + assert 'coreCIF' in res + + _, _, tof_loop = res.split('\n\n') + assert ( + tof_loop + == '''# Unit of intensity: [counts] +loop_ +_pd_meas.time_of_flight +_pd_proc.intensity_net +1.2 13.6 +1.4 26.0 +2.3 9.7 +''' + ) + + +def test_block_with_reduced_powder_data_bad_dim(): + da = sc.DataArray( + sc.array( + dims=['time'], + values=[13.6, 26.0, 9.7], + ), + coords={'time': sc.array(dims=['time'], values=[1.2, 1.4, 2.3], unit='us')}, + ) + + block = cif.Block('reduced', []) + with pytest.raises(sc.CoordError): + block.add_reduced_powder_data(da) + + +def test_block_with_reduced_powder_data_bad_name(): + da = sc.DataArray( + sc.array( + dims=['tof'], + values=[13.6, 26.0, 9.7], + ), + coords={'tof': sc.array(dims=['tof'], values=[1.2, 1.4, 2.3], unit='us')}, + name='bad', + ) + + block = cif.Block('reduced', []) + with pytest.raises(ValueError): + block.add_reduced_powder_data(da) + + +def test_block_with_reduced_powder_data_bad_coord_unit(): + da = sc.DataArray( + sc.array(dims=['tof'], values=[13.6, 26.0, 9.7]), + coords={'tof': sc.array(dims=['tof'], values=[1.2, 1.4, 2.3], unit='ns')}, + ) + + block = cif.Block('reduced', []) + with pytest.raises(sc.UnitError): + block.add_reduced_powder_data(da) + + +def test_block_powder_calibration(): + da = sc.DataArray( + sc.array(dims=['cal'], values=[1.2, 4.5, 6.7]), + coords={'power': sc.array(dims=['cal'], values=[0, 1, -1])}, + ) + block = cif.Block('cal', []) + block.add_powder_calibration(da) + res = write_to_str(block) + + assert 'pdCIF' in res + assert 'coreCIF' in res + + _, _, cal_loop = res.split('\n\n') + assert ( + cal_loop + == '''loop_ +_pd_calib_d_to_tof.id +_pd_calib_d_to_tof.power +_pd_calib_d_to_tof.coeff +tzero 0 1.2 +DIFC 1 4.5 +DIFB -1 6.7 +''' + )