Skip to content

Commit 864fb1e

Browse files
committed
Create convert_fast5_to_blow5.py
Add fast5 convert script (for signal only fast5)
1 parent 3bb45b5 commit 864fb1e

File tree

1 file changed

+287
-0
lines changed

1 file changed

+287
-0
lines changed

convert_fast5_to_blow5.py

Lines changed: 287 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,287 @@
1+
#!/usr/bin/env python
2+
"""Convert Oxford Nanopore FAST5 files to BLOW5 using pyslow5.
3+
4+
This script is intended for FAST5 files that only contain raw signal data
5+
(current signal) and minimal metadata. Missing header attributes are
6+
populated with a user configurable default string ("CycloneSeq" by default)
7+
so the resulting BLOW5 can be processed by slow5tools and downstream tools.
8+
9+
Example usage:
10+
11+
python convert_fast5_to_blow5.py \
12+
--input fast5_WGA_p \
13+
--output blow5_out
14+
15+
Requirements: pyslow5, h5py
16+
"""
17+
18+
from __future__ import annotations
19+
20+
import argparse
21+
import logging
22+
import sys
23+
from pathlib import Path
24+
from typing import Dict, Iterable, Optional, Tuple
25+
26+
import h5py
27+
import numpy as np
28+
import pyslow5
29+
30+
31+
LOGGER = logging.getLogger(__name__)
32+
33+
34+
def parse_args(argv: Optional[Iterable[str]] = None) -> argparse.Namespace:
35+
parser = argparse.ArgumentParser(description=__doc__)
36+
parser.add_argument(
37+
"--input",
38+
required=True,
39+
type=Path,
40+
help="Path to a FAST5 file or directory containing FAST5 files.",
41+
)
42+
parser.add_argument(
43+
"--output",
44+
required=True,
45+
type=Path,
46+
help="Output directory for generated .blow5 files.",
47+
)
48+
parser.add_argument(
49+
"--default-meta",
50+
default="CycloneSeq",
51+
help="Fallback string used when a header attribute is missing.",
52+
)
53+
parser.add_argument(
54+
"--record-compression",
55+
default="zlib",
56+
choices=["none", "zlib", "zstd"],
57+
help="Record compression codec for the BLOW5 container.",
58+
)
59+
parser.add_argument(
60+
"--signal-compression",
61+
default="svb_zd",
62+
choices=["none", "svb_zd", "svb-zd", "ex_zd", "ex-zd"],
63+
help="Signal compression codec for the raw signal array.",
64+
)
65+
parser.add_argument(
66+
"--verbose",
67+
action="store_true",
68+
help="Enable verbose logging for debugging conversions.",
69+
)
70+
return parser.parse_args(argv)
71+
72+
73+
def discover_fast5_files(path: Path) -> Iterable[Path]:
74+
if path.is_file():
75+
if path.suffix != ".fast5":
76+
raise ValueError(f"Input file must have .fast5 extension: {path}")
77+
yield path
78+
return
79+
80+
if not path.is_dir():
81+
raise FileNotFoundError(f"Input path does not exist: {path}")
82+
83+
for fast5_path in sorted(path.rglob("*.fast5")):
84+
if fast5_path.is_file():
85+
yield fast5_path
86+
87+
88+
def decode_attr(value) -> Optional[str]:
89+
if value is None:
90+
return None
91+
if isinstance(value, (bytes, np.bytes_)):
92+
return value.decode("utf-8", errors="ignore")
93+
if isinstance(value, np.generic):
94+
return str(value.item())
95+
return str(value)
96+
97+
98+
def gather_header_metadata(read_group) -> Dict[str, str]:
99+
metadata: Dict[str, str] = {}
100+
101+
tracking = read_group.get("tracking_id")
102+
if tracking is not None:
103+
for key, value in tracking.attrs.items():
104+
metadata[key] = decode_attr(value) or ""
105+
106+
context = read_group.get("context_tags")
107+
if context is not None:
108+
for key, value in context.attrs.items():
109+
metadata[key] = decode_attr(value) or ""
110+
111+
channel = read_group.get("channel_id")
112+
if channel is not None:
113+
for key, value in channel.attrs.items():
114+
metadata[key] = decode_attr(value) or ""
115+
if "sampling_rate" in channel.attrs:
116+
sample_rate = decode_attr(channel.attrs.get("sampling_rate"))
117+
metadata.setdefault("sample_frequency", sample_rate)
118+
metadata.setdefault("sampling_rate", sample_rate)
119+
if "digitisation" in channel.attrs:
120+
metadata.setdefault("digitisation", decode_attr(channel.attrs.get("digitisation")))
121+
if "offset" in channel.attrs:
122+
metadata.setdefault("offset", decode_attr(channel.attrs.get("offset")))
123+
if "range" in channel.attrs:
124+
metadata.setdefault("range", decode_attr(channel.attrs.get("range")))
125+
126+
return metadata
127+
128+
129+
def build_header(writer: pyslow5.Open, metadata: Dict[str, str], default_value: str) -> Dict[str, str]:
130+
header = writer.get_empty_header()
131+
for key in header:
132+
value = metadata.get(key)
133+
if value is None or value == "":
134+
if key in {"exp_start_time", "protocol_start_time"}:
135+
value = "1970-01-01T00:00:00Z"
136+
elif key in {"sample_frequency", "digitisation", "offset", "range", "sampling_rate"}:
137+
value = metadata.get(key, metadata.get("sampling_rate", "0"))
138+
else:
139+
value = default_value
140+
header[key] = str(value)
141+
header["slow5_version"] = "1.0.0"
142+
header["num_read_groups"] = "1"
143+
header.setdefault("run_id", default_value)
144+
header.setdefault("exp_start_time", "1970-01-01T00:00:00Z")
145+
header.setdefault("flow_cell_id", default_value)
146+
header.setdefault("sample_id", default_value)
147+
return header
148+
149+
150+
def read_signal_group(group) -> Dict[str, np.ndarray]:
151+
raw = group.get("Raw")
152+
if raw is None or "Signal" not in raw:
153+
raise KeyError("Raw/Signal dataset is missing in FAST5 read group")
154+
signal = raw["Signal"][:]
155+
if signal.dtype != np.int16:
156+
signal = signal.astype(np.int16)
157+
return {
158+
"signal": signal,
159+
"len_raw_signal": int(signal.shape[0]),
160+
}
161+
162+
163+
def _safe_int(value, default: int = 0) -> int:
164+
try:
165+
return int(value)
166+
except Exception:
167+
return default
168+
169+
170+
def build_record(
171+
writer: pyslow5.Open,
172+
read_id: str,
173+
group,
174+
default_group: int = 0,
175+
) -> Tuple[Dict[str, object], Dict[str, object]]:
176+
record = writer.get_empty_record()
177+
channel_attrs = group["channel_id"].attrs if "channel_id" in group else {}
178+
raw_info = read_signal_group(group)
179+
record["read_id"] = read_id
180+
record["read_group"] = default_group
181+
record["digitisation"] = float(channel_attrs.get("digitisation", 0.0))
182+
record["offset"] = float(channel_attrs.get("offset", 0.0))
183+
record["range"] = float(channel_attrs.get("range", 0.0))
184+
record["sampling_rate"] = float(channel_attrs.get("sampling_rate", 0.0))
185+
record["len_raw_signal"] = int(raw_info["len_raw_signal"])
186+
record["signal"] = np.ascontiguousarray(raw_info["signal"], dtype=np.int16)
187+
188+
aux: Dict[str, object] = {}
189+
190+
if "channel_id" in group:
191+
channel_group = group["channel_id"].attrs
192+
if "channel_number" in channel_group:
193+
channel_number = channel_group.get("channel_number")
194+
if isinstance(channel_number, (bytes, np.bytes_)):
195+
aux["channel_number"] = channel_number.decode("utf-8", errors="ignore")
196+
else:
197+
aux["channel_number"] = str(channel_number)
198+
199+
raw_attrs = group["Raw"].attrs if "Raw" in group else {}
200+
if "median_before" in raw_attrs:
201+
aux["median_before"] = float(raw_attrs.get("median_before"))
202+
if "read_number" in raw_attrs:
203+
aux["read_number"] = _safe_int(raw_attrs.get("read_number"))
204+
if "start_mux" in raw_attrs:
205+
aux["start_mux"] = max(0, min(255, _safe_int(raw_attrs.get("start_mux"))))
206+
if "start_time" in raw_attrs:
207+
aux["start_time"] = max(0, _safe_int(raw_attrs.get("start_time")))
208+
209+
return record, aux
210+
211+
212+
def convert_fast5_to_blow5(
213+
fast5_path: Path,
214+
output_blow5: Path,
215+
default_meta: str,
216+
record_compression: str,
217+
signal_compression: str,
218+
) -> None:
219+
LOGGER.info("Converting %s -> %s", fast5_path, output_blow5)
220+
output_blow5.parent.mkdir(parents=True, exist_ok=True)
221+
222+
writer: Optional[pyslow5.Open] = None
223+
224+
with h5py.File(fast5_path, "r") as fast5_file:
225+
read_ids = list(fast5_file.keys())
226+
if not read_ids:
227+
LOGGER.warning("No read groups found in %s; skipping", fast5_path)
228+
return
229+
230+
first_group = fast5_file[read_ids[0]]
231+
metadata = gather_header_metadata(first_group)
232+
233+
writer = pyslow5.Open(str(output_blow5), "w", record_compression, signal_compression)
234+
header = build_header(writer, metadata, default_meta)
235+
writer.write_header(header)
236+
237+
for read_id in read_ids:
238+
group = fast5_file[read_id]
239+
record, aux = build_record(writer, read_id, group)
240+
if aux:
241+
writer.write_record(record, aux)
242+
else:
243+
writer.write_record(record)
244+
245+
if writer is not None:
246+
writer.close()
247+
248+
249+
def main(argv: Optional[Iterable[str]] = None) -> int:
250+
args = parse_args(argv)
251+
252+
logging.basicConfig(
253+
level=logging.DEBUG if args.verbose else logging.INFO,
254+
format="%(asctime)s - %(levelname)s - %(message)s",
255+
)
256+
257+
try:
258+
fast5_files = list(discover_fast5_files(args.input))
259+
except Exception as exc: # pragma: no cover - surfaced to user
260+
LOGGER.error("Failed to discover FAST5 files: %s", exc)
261+
return 1
262+
263+
if not fast5_files:
264+
LOGGER.error("No FAST5 files found under %s", args.input)
265+
return 1
266+
267+
for fast5_path in fast5_files:
268+
output_blow5 = args.output / (fast5_path.stem + ".blow5")
269+
try:
270+
convert_fast5_to_blow5(
271+
fast5_path,
272+
output_blow5,
273+
args.default_meta,
274+
args.record_compression,
275+
args.signal_compression,
276+
)
277+
except Exception as exc:
278+
LOGGER.error("Failed to convert %s: %s", fast5_path, exc)
279+
return 1
280+
281+
LOGGER.info("Conversion completed for %d FAST5 file(s)", len(fast5_files))
282+
return 0
283+
284+
285+
if __name__ == "__main__": # pragma: no cover - CLI entry point
286+
sys.exit(main())
287+

0 commit comments

Comments
 (0)