-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathcharex.py
426 lines (346 loc) · 13.9 KB
/
charex.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
"""
Characteristics Extractor
BSD 2-Clause License
Copyright (c) 2017, Atsushi Yokoyama, Firmlogics (yokoyama@flogics.com)
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
* Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
import os
import sys
config = None
# Set Python search path to the parent directory
sys.path.append(os.path.join(os.path.dirname(__file__), '..'))
import numpy as np
dtype = np.int16 # type of each value of I or Q
n_channels = 2 # sigdata must be L/R (I/Q) structure
len_input_sec = 10 # length of sigdata must be 10 seconds signal
len_noise_smooth = 10 # number of samples to find neighbor range
wid_freq_detect = 500 # width of frequency (in Hz) to detect beacon
# (as same as detect_freq_width of Monitor-1)
sec_sg_from = 0 # stat time where beacon can be heard
# (definition was changed from Monitor-1)
sec_sg_until = 7 # end time where beacon can be heard
# (definition was changed from Monitor-1)
n_filter_order = 64
lpf_cutoff = 0.5 * 0.95
offset_sn_split = 1 # number of FFT bin to split S/N
from lib.common import eprint
class character1:
"""
Monitor-1 Character Parameters and Database Access Methods
"""
def __init__(self, max_sn, best_pos_hz, total_ct, bg_pos_hz, bg_sn):
self.max_sn = max_sn
self.best_pos_hz = best_pos_hz
self.total_ct = total_ct
self.bg_pos_hz = bg_pos_hz
self.bg_sn = bg_sn
def updatedb(self, dbconn, datetime_sec):
"""
Update SQLite database dbconn by the parameters for datetime_sec
"""
c = dbconn.cursor()
c.execute('''UPDATE received SET
char1_max_sn = ?,
char1_best_pos_hz = ?,
char1_total_ct = ?,
char1_bg_pos_hz = ?,
char1_bg_sn = ?
WHERE datetime = ?''',
(
self.max_sn,
self.best_pos_hz,
self.total_ct,
self.bg_pos_hz,
self.bg_sn,
datetime_sec
))
dbconn.commit()
def read_sigdata(datetime_sec):
"""
Read signal data (as a raw byte stream) which corresponds to the specified
datetime_src
"""
from lib.fileio import getpath_signalfile
import time
import wave
# Open corresponding wave file
filename = getpath_signalfile(
time.strftime('%Y%m%d/%H%M%S.wav', time.gmtime(datetime_sec)))
wavfile = wave.open(filename, 'rb')
# Check properties of the signal
if wavfile.getnchannels() != 2:
raise Exception('Input wav file has illegal numbers of channel')
if wavfile.getsampwidth() != 2:
raise Exception('Input wav file has illegal sample width')
samplerate = wavfile.getframerate()
sigdata = wavfile.readframes(wavfile.getnframes())
wavfile.close()
return sigdata, samplerate
def get_maxvalues_inrange(sig, len_apply):
"""
Flatten signals spectrum by maximum values in neighbor range (or bin).
This special algorithm came from Monitor-1 proc.m.
"""
len_sig = len(sig)
maxvalues = np.array(sig)
for n in range(1, len_apply):
maxvalues = np.maximum(
maxvalues,
np.append(np.zeros(n), sig[0 : len_sig - n]))
return maxvalues
def get_bg_len(offset_ms, samplerate):
"""
Return length of samples which belongs to no-beacon signal part
"""
return int((- offset_ms) / 1000.0 * samplerate)
def bg_est(sig, samplerate, offset_ms):
"""
Background Estimation
"""
from scipy.fftpack import fft
if offset_ms > -100: # XXX offset_ms may be at least -100 [ms] ...
raise Exception('Too short offset')
# Extract very first part which shouldn't contain beacon signal
bg_len = get_bg_len(offset_ms, samplerate)
pre_sig = sig[0 : bg_len]
bg = np.absolute(fft(pre_sig))
bg_smooth = get_maxvalues_inrange(bg, len_noise_smooth)
return bg, bg_smooth
def get_sig_bins(pos):
"""
Return signal bin location parameters by central pos (position)
"""
from_sig_bin = pos - offset_sn_split
if from_sig_bin < 0:
from_sig_bin = 0
to_sig_bin = pos + offset_sn_split
if to_sig_bin >= wid_freq_detect:
to_sig_bin = wid_freq_detect - 1
len_sig_bin = to_sig_bin - from_sig_bin + 1
return from_sig_bin, to_sig_bin, len_sig_bin
def sg_est(sig, bg, start, samplerate, offset_ms, canceling=False):
"""
Signal-part Estimation and Background Canceling
"""
from scipy.fftpack import fft
# Determine which samples should be processed
# print start, start + samplerate
sg = np.absolute(fft(sig[start : start + samplerate]))
sg_smooth = get_maxvalues_inrange(sg, len_noise_smooth)
# Cancelling background if requested
if canceling:
true_sig = (sg_smooth / len(sg) * len(bg)) / bg
else:
true_sig = sg_smooth
# Find the sig peak freq
# It is done in the smoothed signal (background noise cancelled).
# Now the real signal consists of +/- (sample_rate / 4) [Hz].
# The true_sig() consists of +/- (detect_freq_width / 2) [Hz].
bfo_offset_hz = config.getint('Migration', 'bfo_offset_hz')
band_start = samplerate / 4 + bfo_offset_hz - wid_freq_detect / 2
band_end = samplerate / 4 + bfo_offset_hz + wid_freq_detect / 2
band = true_sig[band_start : band_end]
# Find approximate largest value and position in band
a_pos = np.argmax(band)
a_lvl = band[a_pos]
# Next, obtain band_sg. band_sg is not smoothed signal different from
# sg_smooth
band_sg = sg[band_start : band_end]
# To accurately find the peak, using the not-smoothed signal but need
# to eliminate not-likely spectrum.
end_low_eliminate = a_pos - len_noise_smooth + 1
if end_low_eliminate < 0:
end_low_eliminate = 0
begin_high_eliminate = a_pos + len_noise_smooth
if begin_high_eliminate > len(band_sg):
begin_high_eliminate = len(band_sg)
len_high_eliminate = len(band_sg) - begin_high_eliminate
band_sg[0 : end_low_eliminate] = np.zeros(end_low_eliminate)
band_sg[begin_high_eliminate : ] = np.zeros(len_high_eliminate)
# print a_pos, band_sg
# Find the exact largest value and position in band
pos = np.argmax(band_sg)
lvl = band_sg[pos]
# print a_pos, a_lvl
# print pos, lvl
from_sig_bin, to_sig_bin, len_sig_bin = get_sig_bins(pos)
# Calculating S/N. First get 'S'
sg_pow = np.sum(np.power(band_sg[from_sig_bin : to_sig_bin + 1], 2))
# Then get 'N'
band_sg[from_sig_bin : to_sig_bin + 1] = np.zeros(len_sig_bin)
# print band_sg
sn_db = np.log10(sg_pow / np.sum(np.power(band_sg, 2))) * 10
# print sn_db
return lvl, pos, sn_db
def bin_to_freq(pos):
"""
Convert bin pos number to true frequency offset (in Hz)
"""
return pos - wid_freq_detect / 2
def charex(sigdata, samplerate, offset_ms, bfo_offset_hz, debug=False):
"""
Actually calculate characteristics of the signal record and return the
result.
"""
from scipy import signal
# np.set_printoptions(edgeitems=100)
if debug:
eprint(samplerate, offset_ms, bfo_offset_hz)
n_samples = samplerate * len_input_sec
# Generating an LPF
# Not sure if we can apply Nuttall window and also Hamming window which
# firwin() automatically applies. But as same as Beacon Monitor-1 code.
if 'lpf' not in dir(charex):
charex.lpf = signal.nuttall(n_filter_order + 1) \
* signal.firwin(n_filter_order + 1, lpf_cutoff)
# Generating an complex tone (f = samplerate / 4)
# XXX There are errors in latter elements but ignorable...
if 'tone_f_4' not in dir(charex):
charex.tone_f_4 = \
np.exp(1j * np.deg2rad(np.arange(0, 90 * n_samples, 90)))
if len(sigdata) != n_samples * n_channels * np.dtype(dtype).itemsize:
eprint('Length of sigdata (%d) is illegal' % (len(sigdata)))
return character1(-float('Inf'), 0, 0, 0, 0.0)
# Convert the sigdata (raw stream) to input complex vector
# It is okay that each I/Q value is 16-bit signed integer and as same as
# the original Beacon Monitor-1 libexec/proc.m (MATLAB implementation).
iq_matrix = np.frombuffer(sigdata, dtype=dtype).reshape((n_samples, 2))
input_vec = iq_matrix[..., 0] + 1j * iq_matrix[..., 1]
# input_vec is like this.
# [ 88.-30.j 87.-29.j 88.-27.j ..., -2. +4.j -2. +0.j -2. -1.j]
# print input_vec, len(input_vec)
# Apply LPF to narrow band width to half, and remove preceding samples
# as same as Monitor-1
sig = signal.lfilter(charex.lpf, 1,
np.append(input_vec, np.zeros(n_filter_order / 2)))
sig = sig[n_filter_order / 2 : ]
# Applying tone (f = samplerate / 4) to shift signal upward on freq. domain
sig *= charex.tone_f_4
# Drop imaginary parts as same as Monitor-1
sig = np.real(sig)
# print sig, len(sig)
# Background noise estimation
bg, bg_smooth = bg_est(sig, samplerate, offset_ms)
# Now, start analysis in signal parts of time domain
max_sn = -np.inf
best_pos = 0
ct_pos = np.zeros(wid_freq_detect, dtype=np.int16)
for n in range(sec_sg_from, sec_sg_until):
start = n * samplerate - offset_ms / 1000 * samplerate
lvl, pos, sn = \
sg_est(sig, bg_smooth, start, samplerate, offset_ms, canceling=True)
ct_pos[pos] += 1
if sn > max_sn:
max_sn = sn
best_pos = pos
# Calculating values in the no-signal part (beginning part)
bg_len = get_bg_len(offset_ms, samplerate)
bg_lvl, bg_pos, bg_sn = \
sg_est(sig, bg_smooth, 0, bg_len, offset_ms, canceling=False)
# print 'bg_pos', bg_pos
# print 'bg_len', bg_len
lower_pos, upper_pos, dummy = get_sig_bins(best_pos)
# print lower_pos, upper_pos, ct_pos
# print ct_pos[lower_pos : upper_pos + 1]
total_ct = sum(ct_pos[lower_pos : upper_pos + 1])
if debug:
print 'SN: %4.1f Bias: %4d Ct: %d IF: %4d %4.1f Z: -1 -1.0' % \
(max_sn, bin_to_freq(best_pos), total_ct, bin_to_freq(bg_pos),
bg_sn)
return character1(max_sn, bin_to_freq(best_pos), total_ct,
bin_to_freq(bg_pos), bg_sn)
def charex_all(onepass=False, force=False, dryrun=False, debug=False):
"""
Retrieve any record in the database, which doesn't have calculated
characteristics by this charex.py yet, and pass them to charex()
"""
global config
from lib.fileio import connect_database
import time
from lib.config import BeaconConfigParser
config = BeaconConfigParser()
conn = connect_database()
while True:
c = conn.cursor()
# If specified 'force', even the record has characteristics parameters,
# fetch any records for update.
if force:
cond = ''
else:
cond = 'WHERE char1_max_sn IS NULL'
c.execute('''SELECT datetime, offset_ms, bfo_offset_hz
FROM received
%s
ORDER BY datetime''' % (cond))
for row in c.fetchall():
try:
sigdata, samplerate = read_sigdata(datetime_sec=row[0])
paramset = charex(
sigdata, samplerate, row[1], row[2], debug=debug)
if not dryrun:
paramset.updatedb(conn, row[0])
except IOError as err:
if err[1] == 'No such file or directory':
if debug:
pass
# eprint('Signal file not found. Skipped')
else:
raise
if onepass:
break
else:
# For continuous passes, 'force fetch' is NOT required
force = False
# To let rest database, wait for a short time period
time.sleep(0.5)
conn.close()
def task():
"""
Entry point for Task Keeper
"""
charex_all(onepass=False, force=False, dryrun=False, debug=False)
def main():
import argparse
import re
import sys
# Parse arguments
parser = argparse.ArgumentParser(
description='Beacon Monitor Characteristics Extractor')
parser.add_argument('-d', '--debug',
action='store_true',
default=False,
help='enable debug')
parser.add_argument('--force',
action='store_true',
default=False,
help='update database even they already have characteristics')
parser.add_argument('--dryrun',
action='store_true',
default=False,
help='does not touch database')
parser.add_argument('-q', '--quit',
action='store_true',
default=False,
help='quit after one-pass')
args = parser.parse_args()
charex_all(onepass=args.quit, force=args.force, dryrun=args.dryrun,
debug=args.debug)
if __name__ == "__main__":
main()