Skip to content

Commit

Permalink
Merge pull request #148 from SEDenmarkLab/recollect-fix
Browse files Browse the repository at this point in the history
Reimplementation of Recollect
  • Loading branch information
esalx authored Oct 15, 2024
2 parents 21f2bf9 + 418d663 commit 037849d
Show file tree
Hide file tree
Showing 4 changed files with 157 additions and 45 deletions.
34 changes: 15 additions & 19 deletions molli/chem/legacy.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def read_geom(k, g, ens):


def ensemble_from_molli_old_xml(
f: StringIO | BytesIO, mol_lib=False
f: StringIO | BytesIO, molecule
) -> ConformerEnsemble | Molecule:
"""Parses an old version of the collection.
This function is primarily intended for backwards compatibility
Expand All @@ -62,13 +62,13 @@ def ensemble_from_molli_old_xml(
----------
f : StringIO | BytesIO
xml file stream
mol_lib : bool, optional
Returns `ConformerEnsemble` if True, by default False
molecule : bool, optional
Indicates if it should be parsed as a Molecule or Conformer Ensemble
Returns
-------
ConformerEnsemble | Molecule
Returns Conformer Ensemble or Molecule
Returns ConformerEnsemble or Molecule
Notes
-----
Expand All @@ -85,38 +85,34 @@ def ensemble_from_molli_old_xml(
xgeom = mol.findall("./geometry/g")
xconfs = mol.findall("./conformers/g")

atoms = []
bonds = []
conformers = []

n_atoms = len(xatoms)

if len(xconfs) == 0:
n_conformers = len(xgeom)
else:
n_conformers = len(xconfs)

if mol_lib:
ens = Molecule(n_atoms=n_atoms, name=name)
if molecule:
res = Molecule(n_atoms=n_atoms, name=name)
else:
ens = ConformerEnsemble(n_conformers=n_conformers, n_atoms=n_atoms, name=name)
res = ConformerEnsemble(n_conformers=n_conformers, n_atoms=n_atoms, name=name)

for i, a in enumerate(xatoms):
aid, s, l, at = a.attrib["id"], a.attrib["s"], a.attrib["l"], a.attrib["t"]
ens.atoms[i].element = Element[s]
ens.atoms[i].label = l
ens.atoms[i].set_mol2_type(at)
res.atoms[i].element = Element[s]
res.atoms[i].label = l
res.atoms[i].set_mol2_type(at)

for j, b in enumerate(xbonds):
ia1, ia2 = map(int, b.attrib["c"].split())
ens.append_bond(_b := Bond(ens.atoms[ia1 - 1], ens.atoms[ia2 - 1]))
res.append_bond(_b := Bond(res.atoms[ia1 - 1], res.atoms[ia2 - 1]))
_b.set_mol2_type(b.attrib["t"])

if len(xconfs) == 0:
for k, g in enumerate(xgeom):
ens = read_geom(k, g, ens)
res = read_geom(k, g, res)
else:
for k, g in enumerate(xconfs):
ens = read_geom(k, g, ens)
res = read_geom(k, g, res)

return ens
return res
164 changes: 140 additions & 24 deletions molli/scripts/recollect.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,13 @@
from ..storage import Collection
from molli.external import openbabel as mob
from functools import partial
import warnings
import sys

class LegacyMolliWarning(UserWarning):
"Issued when Legacy Collections may behave unexpectedly during Molecule or ConformerEnsemble recollection"
pass

arg_parser = ArgumentParser(
"molli recollect",
description="Read old style molli collection and convert it to the new file format.",
Expand Down Expand Up @@ -68,6 +73,16 @@
help="This option is required if reading from a <zip> or directory to indicate the File Type being searched for (<mol2>, <xyz>, etc.)",
)

arg_parser.add_argument(
"-iconv",
"--input_conv",
choices=["molecule", "ensemble"],
action="store",
type=str,
default="molecule",
help="This option is required if reading from a <zip> or directory to indicate if the files being read should be read in as a Molecule or ConformerEnsemble"
)

arg_parser.add_argument(
"-o",
"--output",
Expand All @@ -91,19 +106,22 @@
"-oext",
"--output_ext",
action="store",
default="mol2",
default=None,
type=str,
help="This option is required if reading from a <zip> or directory to indicate the File Type being searched for (<mol2>, <xyz>, etc.)",
)

arg_parser.add_argument(
"-l",
"--library",
choices=["molli", "obabel"],
choices=["molli", "obabel", "openbabel"],
action="store",
type=str.lower,
default="molli",
help="This indicates the type of library, will default to molli for xyz and mol2, all other files will default to openbabel",
help="""This indicates the type of library to utilize, defaults to molli, but openbabel can be specified if non xyz/mol2 formats are used.
In the event a file format without connectivity is utilized, such as xyz, the molli parser will not create/perceive connectivity, while the
openbabel parser will connect/perceive bond orders.
""",
)

arg_parser.add_argument(
Expand Down Expand Up @@ -140,7 +158,13 @@
help="This option enables overwriting the destination collection.",
)

def re_ml_loads(_bytes_str:bytes, fmt:str, parser:str, otype:str):
_decoded_str = _bytes_str.decode()
return ml.loads(_decoded_str, fmt=fmt, parser=parser, otype=otype)

def re_ml_dumps(_mol:ml.Molecule | ml.ConformerEnsemble, fmt:str, writer:str):
return ml.dumps(_mol, fmt=fmt, writer=writer).encode()

def recollect(
source: Collection,
destination: Collection,
Expand Down Expand Up @@ -168,18 +192,46 @@ def recollect(
else:
destination[k] = res

def recollect_legacy(
source: ZipFile,
destination,
charge,
mult,
molecule,
dest_type: type = None,
progress: bool = False,
skip: bool = True,):

if dest_type is None:
dest_type = lambda x: x

fn = destination._path.name
with destination.writing():
for xml in (
pbar := tqdm(source.namelist(), disable=not progress, desc=f'Writing into {fn}')
):
if xml != '__molli__':
try:
src = ml.chem.ensemble_from_molli_old_xml(source.open(xml), molecule=molecule)
src.charge = charge
src.mult = mult
res = dest_type(src)
except Exception as xc:
if skip:
pbar.write(f"Error in {xml}: {xc}")
else:
raise
else:
destination[src.name] = res

def molli_main(args, **kwargs):
parsed = arg_parser.parse_args(args)

inp = Path(parsed.input)
it = parsed.input_type

if not it:
it = inp.suffix
input_type = parsed.input_type

out = Path(parsed.output)
ot = parsed.output_type
output_type = parsed.output_type

# if parsed.output is Ellipsis:
# out = inp.with_suffix(".mli")
Expand All @@ -200,48 +252,112 @@ def molli_main(args, **kwargs):
# with ml.aux.ForeColor("yellow"):
# print(f"Enabled skipping malformed files.")

if parsed.input_type is None:
if not input_type:
# deduce the input type here
if parsed.input.is_dir():
input_type = "dir"
else:
input_type = parsed.input.suffix[1:]
print(f"Recognized input type as {input_type!r}")

if parsed.output_type is None:
if not output_type:
# deduce the input type here
if parsed.input.is_dir():
if parsed.output.is_dir():
output_type = "dir"
else:
output_type = parsed.output.suffix[1:]
print(f"Recognized output type as {output_type!r}")

converter = None

match input_type, parsed.input_ext:
case "mlib", _:
output_conv = None
legacy = False

match input_type:
case "mlib":
source = ml.MoleculeLibrary(parsed.input, readonly=True)

case "clib", _:
case "clib":
source = ml.ConformerLibrary(parsed.input, readonly=True)

match output_type, parsed.output_ext:
case "mlib", _:
case "zip":
if not is_zipfile(inp):
raise ValueError(f'{inp} is not a valid zipfile!')
else:
with ZipFile(inp, mode='r') as zf:
if '__molli__' in zf.NameToInfo:
legacy = True
zf.close()
else:
zf.close()
suffixes = {Path(x).suffix for x in zf.namelist()}
assert len(suffixes) == 1, f'There are not uniform file types in this ZipFile: {suffixes}'

source = ml.storage.Collection[dict](
parsed.input,
ml.storage.ZipCollectionBackend,
ext=f'.{parsed.input_ext}',
value_decoder=partial(re_ml_loads, fmt=parsed.input_ext, parser=parsed.library, otype=parsed.input_conv),
readonly=True,
overwrite=False
)
case "dir":
source = ml.storage.Collection[dict](
inp,
ml.storage.DirCollectionBackend,
ext=f".{parsed.input_ext}",
value_decoder=partial(re_ml_loads, fmt=parsed.input_ext, parser=parsed.library, otype=parsed.input_conv),
readonly=True,
overwrite=False
)

match output_type:
case "mlib":
destination = ml.MoleculeLibrary(
parsed.output,
readonly=False,
overwrite=parsed.overwrite,
)
if input_type == "clib":
converter = lambda x: ml.Molecule(x[0])
if (input_type == "clib") | (
(input_type == 'zip') & (parsed.input_conv == 'ensemble')) | (
(input_type == 'dir') & (parsed.input_conv == 'ensemble')):

output_conv = lambda x: ml.Molecule(x[0])

case "clib", _:
case "clib":
destination = ml.ConformerLibrary(
parsed.output,
readonly=False,
overwrite=parsed.overwrite,
)
if input_type == "mlib":
converter = ml.ConformerEnsemble
if (input_type == "mlib") | (
(input_type == 'zip') & (parsed.input_conv == 'molecule')) | (
(input_type == 'dir') & (parsed.input_conv == 'molecule')):
output_conv = ml.ConformerEnsemble

case "dir":
destination = ml.storage.Collection[dict](
parsed.output,
ml.storage.DirCollectionBackend,
ext=f".{parsed.output_ext}",
value_encoder=partial(re_ml_dumps, fmt=parsed.output_ext, writer=parsed.library),
readonly=False,
overwrite=True
)

if not legacy:
recollect(source, destination, dest_type=output_conv, progress=True, skip=parsed.skip)
else:


if (parsed.input_conv == 'molecule') and (output_conv == ml.ConformerEnsemble):

warnings.warn("In the legacy implementation, the input converter specifies molecule, but the output converter writes to a ConformerEnsemble, The conformer ensemble will be written with the single base geometry from the xml file (NOT THE FIRST CONFORMER)\n", LegacyMolliWarning)
elif (parsed.input_conv == 'ensemble') and (output_conv == ml.Molecule):
warnings.warn("In the legacy implementation, the input converter specifies ensemble, but the output converter writes to a molecule, The Molecule will be written with the first conformer geometry from the xml file (NOT THE BASE GEOMETRY)\n""", LegacyMolliWarning)

if parsed.input_conv == 'molecule':
molecule = True
else:
molecule = False

recollect(source, destination, dest_type=converter, progress=True, skip=parsed.skip)
with ZipFile(inp, mode='r') as source:
recollect_legacy(source, destination, charge, mult, molecule=molecule, dest_type=output_conv, progress=True, skip=parsed.skip)
2 changes: 1 addition & 1 deletion molli/storage/backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ def update_keys(self):
}

def _write(self, key: str, value: bytes):
self._zipfile.writestr(f"{self.get_path(key)}{self.ext}", value)
self._zipfile.write(f"{self.get_path(key)}{self.ext}", value)

def _read(self, key: str) -> bytes:
with self._zipfile.open(key) as f:
Expand Down
2 changes: 1 addition & 1 deletion molli/storage/collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def __init__(
) -> None:
self._path = Path(path)

if not self._path.is_file() and readonly:
if not self._path.exists() and readonly:
raise FileNotFoundError(f"{path!r} is not a valid molli collection.")

self._backend = backend(
Expand Down

0 comments on commit 037849d

Please sign in to comment.