-
Notifications
You must be signed in to change notification settings - Fork 8
/
AlignedReadPair.py
executable file
·171 lines (136 loc) · 7.06 KB
/
AlignedReadPair.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
import sys
class AlignedReadPair:
def __init__(self, read1, read2):
"""takes two pysam AlignedRead objects"""
if read1.qname != read2.qname:
print "error making aligned read pair: read names not the same"
print read1
print read2
sys.exit(2)
#self.read1 = read1
#self.read2 = read2
self.read1_str = str(read1)
self.read2_str = str(read2)
self.read1_is_TE = False
self.read2_is_TE = False
self.TE_annot_attr_list = []
self.TE_map_gff_list = []
self.interval_chr = ""
self.interval_start = 0
self.interval_end = 0
self.interval_direction = None
self.anchor_softclipped_pos = None
def calculate_inside_interval(self, size, read1, read2):
"""calculate the interval of putative insertion sites corresponding to this read pair. \
This interval is of length size and in the direction to which points the read that maps uniquely to a non-TE location
a reasonable setting of size is the predicted INSIDE insert size (+ some sdev)"""
## INSIDE INTERVAL
if not self.read1_is_TE:
#this means that the uniquely mapping read to a non-TE location is read1
if self.read1.is_reverse:
self.interval_start = int(read1.pos) - size
self.interval_end = int(read1.pos)
self.interval_direction = "rev"
else:
self.interval_start = int(read1.aend)
self.interval_end = int(read1.aend) + size
self.interval_direction = "fwd"
elif not self.read2_is_TE:
#this means that the uniquely mapping read to a non-TE location is read2
if self.read2.is_reverse:
self.interval_start = int(read2.pos) - size
self.interval_end = read2.pos
self.interval_direction = "rev"
else:
self.interval_start = int(read2.aend)
self.interval_end = int(read2.aend) + size
self.interval_direction = "fwd"
else:
print "error in read pair object - neither read is a TE"
def calculate_outside_interval(self, size, read1, read2):
"""calculate the interval of putative insertion sites corresponding to this read pair. \
This interval is of length size and in the direction to which points the read that maps uniquely to a non-TE location, INCLUDING THE MAPPING LOCATION OF THE READS. This accounts for split reads, or truncated reads that would be shorter
a reasonable setting of size is the predicted OUTSIDE insert(fragment) size (+ some sdev)"""
## OUTSIDE INTERVAL
if not self.read1_is_TE:
#this means that the uniquely mapping read to a non-TE location is read1
if read1.is_reverse:
self.interval_start = int(read1.aend) - size
self.interval_end = int(read1.aend)
self.interval_direction = "rev"
else:
self.interval_start = int(read1.pos)
self.interval_end = int(read1.pos) + size
self.interval_direction = "fwd"
elif not self.read2_is_TE:
#this means that the uniquely mapping read to a non-TE location is read2
if read2.is_reverse:
self.interval_start = int(read2.aend) - size
self.interval_end = read2.aend
self.interval_direction = "rev"
else:
self.interval_start = int(read2.pos)
self.interval_end = int(read2.pos) + size
self.interval_direction = "fwd"
else:
print "error in read pair object - neither read is a TE"
sys.exit(2)
def str(self):
# #string = "read1: %s \n\
# read2: %s \n\
# read1_is_TE %s \n\
# read1_is_rev %s \n\
# read2_is_TE %s \n\
# read2_is_rev %s \n\
# TE_annot_list %s \n\
# interval_chr %s\n\
# interval_start %d \n\
# interval_end %d \n\
# interval_direction %s\n" % (self.read1, self.read2, self.read1_is_TE, self.read1.is_reverse, self.read2_is_TE, self.read2.is_reverse, "; ".join([attr['Name'] for attr in self.TE_annot_attr_list]), self.interval_chr, self.interval_start, self.interval_end, self.interval_direction)
return string
def str_int(self):
return "[%s, %d, %d]" % (self.interval_chr, self.interval_start, self.interval_end)
def str_TE_annot_list(self):
try:
return "; ".join([attr['Name'] for attr in self.TE_annot_attr_list])
except KeyError:
return "no_name"
def TE_annot_tag_list(self, tag):
name_list = []
for attributes in self.TE_annot_attr_list:
try:
name = attributes[tag]
if name not in name_list:
name_list.append(name)
except KeyError:
continue
if len(name_list) == 0:
name_list = ["undefined"]
return name_list
def calc_anchor_is_softclipped(self, read1, read2):
#if the read is softclipped, return position at which it is clipped. If not, return None
if self.read1_is_TE: #then read2 is anchor
if read2.is_reverse and read2.cigar[0][0] == 4: #if the read is rev and softclipped to the left, ie into the interval
self.anchor_softclipped_pos = read2.pos
elif read2.cigar[-1][0] == 4: # if the read is fwd an softclipped at the right, ie into the interval
self.anchor_softclipped_pos = read2.aend
if self.read2_is_TE: # then read2 is anchor
if read1.is_reverse and read1.cigar[0][0] == 4:
self.anchor_softclipped_pos = read1.pos
elif read1.cigar[-1][0] == 4:
self.anchor_softclipped_pos = read1.aend
return None
def anchor_is_softclipped(self):
return self.anchor_softclipped_pos
def to_table(self, cluster_ID, library_name):
if self.interval_direction == "fwd":
mate_direction = "rev"
else:
mate_direction = "fwd"
if self.read1_is_TE:
read1_line = "%s\t%d\t%s\t%s\t%s\t%s\t%s\t%s\t%s" % ("R", cluster_ID, library_name, mate_direction, ".", ".", ".", "mate", self.read1_str)
read2_line = "%s\t%d\t%s\t%s\t%d\t%d\t%s\t%s\t%s" % ("R", cluster_ID, library_name, self.interval_direction, self.interval_start, self.interval_end, self.interval_chr, "anchor", self.read2_str)
elif self.read2_is_TE:
read2_line = "%s\t%d\t%s\t%s\t%s\t%s\t%s\t%s\t%s" % ("R", cluster_ID, library_name, mate_direction, ".", ".", ".", "mate", self.read2_str)
read1_line = "%s\t%d\t%s\t%s\t%d\t%d\t%s\t%s\t%s" % ("R", cluster_ID, library_name, self.interval_direction, self.interval_start, self.interval_end, self.interval_chr, "anchor", self.read1_str)
return read1_line + "\n" + read2_line