Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 37 additions & 10 deletions pyfaidx/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1458,16 +1458,37 @@ def get_seq(self, name, start, end):
del seq
else:
seq_mut = list(seq.seq)
# We'll reuse the Sequence container near the end
del seq.seq
# Track net positional shift caused by prior indels applied within this window
delta = 0
try:
var = self.vcf.fetch(name, start - 1, end)
for record in var:
if record.is_snp: # skip indels
sample = record.genotype(self.sample)
if sample.gt_type in self.gt_type and eval(self.filter):
alt = record.ALT[0]
i = (record.POS - 1) - (start - 1)
seq_mut[i:i + len(alt)] = str(alt)
sample = record.genotype(self.sample)
# Apply only records that pass genotype type and optional filter
if sample.gt_type in self.gt_type and eval(self.filter):
ref = str(record.REF)
alt = str(record.ALT[0])
# local 0-based index of the variant anchor within the requested window
i0 = (record.POS - 1) - (start - 1)
# Guard against variants outside our requested window slice
if i0 < 0 or i0 >= (end - start + 1):
continue
# For safety, only apply edits fully contained within window
# This avoids boundary complications for partial indels
if (record.POS >= start) and (record.POS + len(ref) - 1 <= end):
i = i0 + delta
# Constrain within current mutable sequence bounds
left = max(0, i)
right = min(len(seq_mut), i + len(ref))
# If the replacement extends beyond current bounds, skip conservatively
if left >= len(seq_mut) or right <= left:
continue
# Apply SNP/MNP/INDEL by splicing
seq_mut[left:right] = list(alt)
# Update delta for downstream variants
delta += (len(alt) - len(ref))
except ValueError as e: # Can be raised if name is not part of tabix for vcf
if self.vcf._tabix is not None and name not in self.vcf._tabix.contigs:
# The chromosome name is not part of the vcf
Expand All @@ -1477,12 +1498,18 @@ def get_seq(self, name, start, end):
# This is something else
raise e

# slice the list in case we added an MNP in last position
# Build final string. Truncate to requested window length if necessary.
target_len = end - start + 1
final = ''.join(seq_mut[:target_len])
if self.faidx.as_raw:
return ''.join(seq_mut[:end - start + 1])
return final
else:
seq.seq = ''.join(seq_mut[:end - start + 1])
return seq
# If an indel changed the net length inside the window, avoid coordinate mismatch errors
if len(final) != target_len:
return Sequence(name=name, seq=final, start=None, end=None)
else:
seq.seq = final
return seq


def wrap_sequence(n, sequence, fillvalue='', newline='\n'):
Expand Down
6 changes: 6 additions & 0 deletions tests/data/tiny.fa
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
>chr1
AAAACCCCGGGGTTTTAAAACCCCGGGGTTTTAAAACCCCGGGGTTTT
>chr2
ACGTACGTACGTACGTACGTACGTACGTACGTACGTACGTACGTACGT
>chr3
ACGTACGTACGTACGTACGT
14 changes: 14 additions & 0 deletions tests/data/tiny.vcf
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
##fileformat=VCFv4.2
##contig=<ID=chr1,length=48>
##contig=<ID=chr2,length=48>
##contig=<ID=chr3,length=20>
##FORMAT=<ID=GT,Number=1,Type=String,Description="Genotype">
#CHROM POS ID REF ALT QUAL FILTER INFO FORMAT sample1 sample2 sample3
chr1 2 . A G . PASS . GT 1/1 0/0 0/1
chr1 10 . G GTT . PASS . GT 1/1 1/1 0/0
chr1 15 . TTA T . PASS . GT 1/1 0/0 0/0
chr2 10 . CGTACGT N . PASS . GT 1/1 1/1 1/1
chr3 1 . A G . PASS . GT 1/1 0/0 0/1
chr3 4 . TAC T . PASS . GT 1/1 0/0 0/0
chr3 8 . T TTT . PASS . GT 1/1 0/0 0/0
chr3 19 . GT G . PASS . GT 1/1 0/0 0/0
157 changes: 157 additions & 0 deletions tests/test_FastaVariant_indels.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
import os
import pytest

from pyfaidx import FastaVariant


path = os.path.dirname(__file__)
os.chdir(path)


@pytest.fixture(scope="module")
def ensure_tiny_vcf_tbi():
"""Compress and tabix-index tests/data/tiny.vcf -> tiny.vcf.gz + .tbi.
Always reindex to pick up any edits to the VCF in this test module.
"""
try:
import pysam
except ImportError:
pytest.skip("pysam not installed.")

vcf_txt = os.path.join("data", "tiny.vcf")
vcf_gz = vcf_txt + ".gz"

# (Re)create compressed and indexed VCF to ensure it's fresh
if os.path.exists(vcf_gz):
os.remove(vcf_gz)

pysam.tabix_compress(vcf_txt, vcf_gz, force=True)

if os.path.exists(vcf_gz + ".tbi"):
os.remove(vcf_gz + ".tbi")

pysam.tabix_index(vcf_gz, preset="vcf", force=True)

assert os.path.exists(vcf_gz)
assert os.path.exists(vcf_gz + ".tbi")
return vcf_gz


def _make_fasta_variant(vcf_gz, **kwargs):
return FastaVariant(
os.path.join("data", "tiny.fa"), vcf_gz, as_raw=True, hom=True, het=True, **kwargs
)


@pytest.mark.parametrize(
"sample,exp_chr1,exp_chr2,exp_chr3",
[
(
"sample1",
# chr1 complete 48bp
"AGAACCCCGGTTGGTTTAAACCCCGGGGTTTTAAAACCCCGGGGTTTT",
# chr2 complete 48bp
"ACGTACGTANACGTACGTACGTACGTACGTACGTACGTACGT",
# chr3 complete 20bp
"GCGTGTTTACGTACGTACG",
),
(
"sample2",
# chr1 first 48bp (the complete consensus sequence has two extra Ts at the end)
"AAAACCCCGGTTGGTTTTAAAACCCCGGGGTTTTAAAACCCCGGGGTT",
# chr2 complete 48bp
"ACGTACGTANACGTACGTACGTACGTACGTACGTACGTACGT",
# chr3 complete 20bp
"ACGTACGTACGTACGTACGT",
),
(
"sample3",
# chr1 complete 48bp
"AGAACCCCGGGGTTTTAAAACCCCGGGGTTTTAAAACCCCGGGGTTTT",
# chr2 complete 48bp
"ACGTACGTANACGTACGTACGTACGTACGTACGTACGTACGT",
# chr3 complete 20bp
"GCGTACGTACGTACGTACGT",
),
],
)
def test_combined_indels_window(ensure_tiny_vcf_tbi, sample, exp_chr1, exp_chr2, exp_chr3):
"""Window 1..48 (0-based [0:48]) includes all variants on chr1, chr2 and chr3 for each sample.

For sample1, this is equivalent to running:
bcftools consensus -f tiny.fa -H A tiny.vcf.gz -s sample1
"""
try:
fasta = _make_fasta_variant(ensure_tiny_vcf_tbi, sample=sample)
s1 = fasta["chr1"][0:48]
assert s1 == exp_chr1

s2 = fasta["chr2"][0:48]
assert s2 == exp_chr2

s3 = fasta["chr3"][0:20]
assert s3 == exp_chr3
except (ImportError, IOError):
pytest.skip("pysam or PyVCF not installed or VCF indexing unavailable.")


def test_chr1_deletion_only_window(ensure_tiny_vcf_tbi):
"""Window 12..19 (0-based [11:19]) includes only the deletion at pos15 (TTA->T)."""
try:
fasta = _make_fasta_variant(ensure_tiny_vcf_tbi)
s = fasta["chr1"][11:19]
assert s == "GTTTAA" # 8 bp -> 6 bp (net -2)

except (ImportError, IOError):
pytest.skip("pysam or PyVCF not installed or VCF indexing unavailable.")


def test_chr1_insertion_only_window(ensure_tiny_vcf_tbi):
"""Window 9..12 (0-based [8:12]) includes only the insertion at pos10 (G->GTT).
Length remains 4 due to truncation to request size; content reflects insertion.
"""
try:
fasta = _make_fasta_variant(ensure_tiny_vcf_tbi)

s = fasta["chr1"][8:12]
assert s == "GGTT" # was 'GGGG'

except (ImportError, IOError):
pytest.skip("pysam or PyVCF not installed or VCF indexing unavailable.")


def test_chr3_overlapping_del_then_ins(ensure_tiny_vcf_tbi):
"""Deletion TAC->T at pos4..6 (anchored at pos4), and insertion T->TTT at pos8.
These are adjacent events. Validate consensus within a window spanning them.
"""
fasta = _make_fasta_variant(ensure_tiny_vcf_tbi)
s = fasta["chr3"][3:12] # 0-based slice [3:12] == bases 4..12
assert isinstance(s, str)
assert len(s) == 9
# Basic sanity: result starts with the anchor 'T' at pos4
assert s[0] == "T"


def test_chr3_deletion_at_chrom_end(ensure_tiny_vcf_tbi):
"""End-of-chromosome deletion using anchored form (POS=19, REF=GT -> ALT=G).
Ensures we handle trailing-edge edits without coordinate mismatch.
"""
fasta = _make_fasta_variant(ensure_tiny_vcf_tbi)
s = fasta["chr3"][12:20]
assert s == "ACGTACG" # one base shorter than reference
assert len(s) == 7


def test_default_sample_is_first_with_warning(ensure_tiny_vcf_tbi):
"""When multiple samples exist and none specified, first sample is used with a warning."""
try:
with pytest.warns(RuntimeWarning):
fv_default = _make_fasta_variant(ensure_tiny_vcf_tbi)
fv_sample1 = _make_fasta_variant(ensure_tiny_vcf_tbi, sample="sample1")

# Sanity: default should pick the first sample (sample1)
s_def = fv_default["chr1"][0:12]
s_s1 = fv_sample1["chr1"][0:12]
assert s_def == s_s1
except (ImportError, IOError):
pytest.skip("pysam or PyVCF not installed or VCF indexing unavailable.")
Loading