Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reimplementation of Recollect #148

Merged
merged 14 commits into from
Oct 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 15 additions & 19 deletions molli/chem/legacy.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def read_geom(k, g, ens):


def ensemble_from_molli_old_xml(
f: StringIO | BytesIO, mol_lib=False
f: StringIO | BytesIO, molecule
) -> ConformerEnsemble | Molecule:
"""Parses an old version of the collection.
This function is primarily intended for backwards compatibility
Expand All @@ -62,13 +62,13 @@ def ensemble_from_molli_old_xml(
----------
f : StringIO | BytesIO
xml file stream
mol_lib : bool, optional
Returns `ConformerEnsemble` if True, by default False
molecule : bool, optional
Indicates if it should be parsed as a Molecule or Conformer Ensemble

Returns
-------
ConformerEnsemble | Molecule
Returns Conformer Ensemble or Molecule
Returns ConformerEnsemble or Molecule

Notes
-----
Expand All @@ -85,38 +85,34 @@ def ensemble_from_molli_old_xml(
xgeom = mol.findall("./geometry/g")
xconfs = mol.findall("./conformers/g")

atoms = []
bonds = []
conformers = []

n_atoms = len(xatoms)

if len(xconfs) == 0:
n_conformers = len(xgeom)
else:
n_conformers = len(xconfs)

if mol_lib:
ens = Molecule(n_atoms=n_atoms, name=name)
if molecule:
res = Molecule(n_atoms=n_atoms, name=name)
else:
ens = ConformerEnsemble(n_conformers=n_conformers, n_atoms=n_atoms, name=name)
res = ConformerEnsemble(n_conformers=n_conformers, n_atoms=n_atoms, name=name)

for i, a in enumerate(xatoms):
aid, s, l, at = a.attrib["id"], a.attrib["s"], a.attrib["l"], a.attrib["t"]
ens.atoms[i].element = Element[s]
ens.atoms[i].label = l
ens.atoms[i].set_mol2_type(at)
res.atoms[i].element = Element[s]
res.atoms[i].label = l
res.atoms[i].set_mol2_type(at)

for j, b in enumerate(xbonds):
ia1, ia2 = map(int, b.attrib["c"].split())
ens.append_bond(_b := Bond(ens.atoms[ia1 - 1], ens.atoms[ia2 - 1]))
res.append_bond(_b := Bond(res.atoms[ia1 - 1], res.atoms[ia2 - 1]))
_b.set_mol2_type(b.attrib["t"])

if len(xconfs) == 0:
for k, g in enumerate(xgeom):
ens = read_geom(k, g, ens)
res = read_geom(k, g, res)
else:
for k, g in enumerate(xconfs):
ens = read_geom(k, g, ens)
res = read_geom(k, g, res)

return ens
return res
164 changes: 140 additions & 24 deletions molli/scripts/recollect.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,13 @@
from ..storage import Collection
from molli.external import openbabel as mob
from functools import partial
import warnings
import sys

class LegacyMolliWarning(UserWarning):
"Issued when Legacy Collections may behave unexpectedly during Molecule or ConformerEnsemble recollection"
pass

arg_parser = ArgumentParser(
"molli recollect",
description="Read old style molli collection and convert it to the new file format.",
Expand Down Expand Up @@ -68,6 +73,16 @@
help="This option is required if reading from a <zip> or directory to indicate the File Type being searched for (<mol2>, <xyz>, etc.)",
)

arg_parser.add_argument(
"-iconv",
"--input_conv",
choices=["molecule", "ensemble"],
action="store",
type=str,
default="molecule",
help="This option is required if reading from a <zip> or directory to indicate if the files being read should be read in as a Molecule or ConformerEnsemble"
)

arg_parser.add_argument(
"-o",
"--output",
Expand All @@ -91,19 +106,22 @@
"-oext",
"--output_ext",
action="store",
default="mol2",
default=None,
type=str,
help="This option is required if reading from a <zip> or directory to indicate the File Type being searched for (<mol2>, <xyz>, etc.)",
)

arg_parser.add_argument(
"-l",
"--library",
choices=["molli", "obabel"],
choices=["molli", "obabel", "openbabel"],
action="store",
type=str.lower,
default="molli",
help="This indicates the type of library, will default to molli for xyz and mol2, all other files will default to openbabel",
help="""This indicates the type of library to utilize, defaults to molli, but openbabel can be specified if non xyz/mol2 formats are used.
In the event a file format without connectivity is utilized, such as xyz, the molli parser will not create/perceive connectivity, while the
openbabel parser will connect/perceive bond orders.
""",
)

arg_parser.add_argument(
Expand Down Expand Up @@ -140,7 +158,13 @@
help="This option enables overwriting the destination collection.",
)

def re_ml_loads(_bytes_str:bytes, fmt:str, parser:str, otype:str):
_decoded_str = _bytes_str.decode()
return ml.loads(_decoded_str, fmt=fmt, parser=parser, otype=otype)

def re_ml_dumps(_mol:ml.Molecule | ml.ConformerEnsemble, fmt:str, writer:str):
return ml.dumps(_mol, fmt=fmt, writer=writer).encode()

def recollect(
source: Collection,
destination: Collection,
Expand Down Expand Up @@ -168,18 +192,46 @@ def recollect(
else:
destination[k] = res

def recollect_legacy(
source: ZipFile,
destination,
charge,
mult,
molecule,
dest_type: type = None,
progress: bool = False,
skip: bool = True,):

if dest_type is None:
dest_type = lambda x: x

fn = destination._path.name
with destination.writing():
for xml in (
pbar := tqdm(source.namelist(), disable=not progress, desc=f'Writing into {fn}')
):
if xml != '__molli__':
try:
src = ml.chem.ensemble_from_molli_old_xml(source.open(xml), molecule=molecule)
src.charge = charge
src.mult = mult
res = dest_type(src)
except Exception as xc:
if skip:
pbar.write(f"Error in {xml}: {xc}")
else:
raise
else:
destination[src.name] = res

def molli_main(args, **kwargs):
parsed = arg_parser.parse_args(args)

inp = Path(parsed.input)
it = parsed.input_type

if not it:
it = inp.suffix
input_type = parsed.input_type

out = Path(parsed.output)
ot = parsed.output_type
output_type = parsed.output_type

# if parsed.output is Ellipsis:
# out = inp.with_suffix(".mli")
Expand All @@ -200,48 +252,112 @@ def molli_main(args, **kwargs):
# with ml.aux.ForeColor("yellow"):
# print(f"Enabled skipping malformed files.")

if parsed.input_type is None:
if not input_type:
# deduce the input type here
if parsed.input.is_dir():
input_type = "dir"
else:
input_type = parsed.input.suffix[1:]
print(f"Recognized input type as {input_type!r}")

if parsed.output_type is None:
if not output_type:
# deduce the input type here
if parsed.input.is_dir():
if parsed.output.is_dir():
output_type = "dir"
else:
output_type = parsed.output.suffix[1:]
print(f"Recognized output type as {output_type!r}")

converter = None

match input_type, parsed.input_ext:
case "mlib", _:
output_conv = None
legacy = False

match input_type:
case "mlib":
source = ml.MoleculeLibrary(parsed.input, readonly=True)

case "clib", _:
case "clib":
source = ml.ConformerLibrary(parsed.input, readonly=True)

match output_type, parsed.output_ext:
case "mlib", _:
case "zip":
if not is_zipfile(inp):
raise ValueError(f'{inp} is not a valid zipfile!')
else:
with ZipFile(inp, mode='r') as zf:
if '__molli__' in zf.NameToInfo:
legacy = True
zf.close()
else:
zf.close()
suffixes = {Path(x).suffix for x in zf.namelist()}
assert len(suffixes) == 1, f'There are not uniform file types in this ZipFile: {suffixes}'

source = ml.storage.Collection[dict](
parsed.input,
ml.storage.ZipCollectionBackend,
ext=f'.{parsed.input_ext}',
value_decoder=partial(re_ml_loads, fmt=parsed.input_ext, parser=parsed.library, otype=parsed.input_conv),
readonly=True,
overwrite=False
)
case "dir":
source = ml.storage.Collection[dict](
inp,
ml.storage.DirCollectionBackend,
ext=f".{parsed.input_ext}",
value_decoder=partial(re_ml_loads, fmt=parsed.input_ext, parser=parsed.library, otype=parsed.input_conv),
readonly=True,
overwrite=False
)

match output_type:
case "mlib":
destination = ml.MoleculeLibrary(
parsed.output,
readonly=False,
overwrite=parsed.overwrite,
)
if input_type == "clib":
converter = lambda x: ml.Molecule(x[0])
if (input_type == "clib") | (
(input_type == 'zip') & (parsed.input_conv == 'ensemble')) | (
(input_type == 'dir') & (parsed.input_conv == 'ensemble')):

output_conv = lambda x: ml.Molecule(x[0])

case "clib", _:
case "clib":
destination = ml.ConformerLibrary(
parsed.output,
readonly=False,
overwrite=parsed.overwrite,
)
if input_type == "mlib":
converter = ml.ConformerEnsemble
if (input_type == "mlib") | (
(input_type == 'zip') & (parsed.input_conv == 'molecule')) | (
(input_type == 'dir') & (parsed.input_conv == 'molecule')):
output_conv = ml.ConformerEnsemble

case "dir":
destination = ml.storage.Collection[dict](
parsed.output,
ml.storage.DirCollectionBackend,
ext=f".{parsed.output_ext}",
value_encoder=partial(re_ml_dumps, fmt=parsed.output_ext, writer=parsed.library),
readonly=False,
overwrite=True
)

if not legacy:
recollect(source, destination, dest_type=output_conv, progress=True, skip=parsed.skip)
else:


if (parsed.input_conv == 'molecule') and (output_conv == ml.ConformerEnsemble):

warnings.warn("In the legacy implementation, the input converter specifies molecule, but the output converter writes to a ConformerEnsemble, The conformer ensemble will be written with the single base geometry from the xml file (NOT THE FIRST CONFORMER)\n", LegacyMolliWarning)
elif (parsed.input_conv == 'ensemble') and (output_conv == ml.Molecule):
warnings.warn("In the legacy implementation, the input converter specifies ensemble, but the output converter writes to a molecule, The Molecule will be written with the first conformer geometry from the xml file (NOT THE BASE GEOMETRY)\n""", LegacyMolliWarning)

if parsed.input_conv == 'molecule':
molecule = True
else:
molecule = False

recollect(source, destination, dest_type=converter, progress=True, skip=parsed.skip)
with ZipFile(inp, mode='r') as source:
recollect_legacy(source, destination, charge, mult, molecule=molecule, dest_type=output_conv, progress=True, skip=parsed.skip)
2 changes: 1 addition & 1 deletion molli/storage/backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ def update_keys(self):
}

def _write(self, key: str, value: bytes):
self._zipfile.writestr(f"{self.get_path(key)}{self.ext}", value)
self._zipfile.write(f"{self.get_path(key)}{self.ext}", value)

def _read(self, key: str) -> bytes:
with self._zipfile.open(key) as f:
Expand Down
2 changes: 1 addition & 1 deletion molli/storage/collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def __init__(
) -> None:
self._path = Path(path)

if not self._path.is_file() and readonly:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may need to be addressed in the future. I can definitely see how extension-less files and directories can conflict here, but let's hope for the best.
Non-issue for now.

if not self._path.exists() and readonly:
raise FileNotFoundError(f"{path!r} is not a valid molli collection.")

self._backend = backend(
Expand Down