-
Notifications
You must be signed in to change notification settings - Fork 133
/
Copy pathutils.py
145 lines (121 loc) · 4.77 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
''' Utils for io, language, connectivity graphs etc '''
import os
import sys
import re
import string
import json
import time
import math
from collections import Counter
import numpy as np
import networkx as nx
# padding, unknown word, end of sentence
base_vocab = ['<PAD>', '<UNK>', '<EOS>']
padding_idx = base_vocab.index('<PAD>')
def load_nav_graphs(scans):
''' Load connectivity graph for each scan '''
def distance(pose1, pose2):
''' Euclidean distance between two graph poses '''
return ((pose1['pose'][3]-pose2['pose'][3])**2\
+ (pose1['pose'][7]-pose2['pose'][7])**2\
+ (pose1['pose'][11]-pose2['pose'][11])**2)**0.5
graphs = {}
for scan in scans:
with open('connectivity/%s_connectivity.json' % scan) as f:
G = nx.Graph()
positions = {}
data = json.load(f)
for i,item in enumerate(data):
if item['included']:
for j,conn in enumerate(item['unobstructed']):
if conn and data[j]['included']:
positions[item['image_id']] = np.array([item['pose'][3],
item['pose'][7], item['pose'][11]]);
assert data[j]['unobstructed'][i], 'Graph should be undirected'
G.add_edge(item['image_id'],data[j]['image_id'],weight=distance(item,data[j]))
nx.set_node_attributes(G, values=positions, name='position')
graphs[scan] = G
return graphs
def load_datasets(splits):
data = []
for split in splits:
assert split in ['train', 'val_seen', 'val_unseen', 'test']
with open('tasks/R2R/data/R2R_%s.json' % split) as f:
data += json.load(f)
return data
class Tokenizer(object):
''' Class to tokenize and encode a sentence. '''
SENTENCE_SPLIT_REGEX = re.compile(r'(\W+)') # Split on any non-alphanumeric character
def __init__(self, vocab=None, encoding_length=20):
self.encoding_length = encoding_length
self.vocab = vocab
self.word_to_index = {}
if vocab:
for i,word in enumerate(vocab):
self.word_to_index[word] = i
def split_sentence(self, sentence):
''' Break sentence into a list of words and punctuation '''
toks = []
for word in [s.strip().lower() for s in self.SENTENCE_SPLIT_REGEX.split(sentence.strip()) if len(s.strip()) > 0]:
# Break up any words containing punctuation only, e.g. '!?', unless it is multiple full stops e.g. '..'
if all(c in string.punctuation for c in word) and not all(c in '.' for c in word):
toks += list(word)
else:
toks.append(word)
return toks
def encode_sentence(self, sentence):
if len(self.word_to_index) == 0:
sys.exit('Tokenizer has no vocab')
encoding = []
for word in self.split_sentence(sentence)[::-1]: # reverse input sentences
if word in self.word_to_index:
encoding.append(self.word_to_index[word])
else:
encoding.append(self.word_to_index['<UNK>'])
encoding.append(self.word_to_index['<EOS>'])
if len(encoding) < self.encoding_length:
encoding += [self.word_to_index['<PAD>']] * (self.encoding_length-len(encoding))
return np.array(encoding[:self.encoding_length])
def decode_sentence(self, encoding):
sentence = []
for ix in encoding:
if ix == self.word_to_index['<PAD>']:
break
else:
sentence.append(self.vocab[ix])
return " ".join(sentence[::-1]) # unreverse before output
def build_vocab(splits=['train'], min_count=5, start_vocab=base_vocab):
''' Build a vocab, starting with base vocab containing a few useful tokens. '''
count = Counter()
t = Tokenizer()
data = load_datasets(splits)
for item in data:
for instr in item['instructions']:
count.update(t.split_sentence(instr))
vocab = list(start_vocab)
for word,num in count.most_common():
if num >= min_count:
vocab.append(word)
else:
break
return vocab
def write_vocab(vocab, path):
print('Writing vocab of size %d to %s' % (len(vocab),path))
with open(path, 'w') as f:
for word in vocab:
f.write("%s\n" % word)
def read_vocab(path):
vocab = []
with open(path) as f:
vocab = [word.strip() for word in f.readlines()]
return vocab
def asMinutes(s):
m = math.floor(s / 60)
s -= m * 60
return '%dm %ds' % (m, s)
def timeSince(since, percent):
now = time.time()
s = now - since
es = s / (percent)
rs = es - s
return '%s (- %s)' % (asMinutes(s), asMinutes(rs))