-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsame_decoder.py
More file actions
242 lines (214 loc) · 7.48 KB
/
same_decoder.py
File metadata and controls
242 lines (214 loc) · 7.48 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
#!/usr/bin/env python3
"""
tools/same_decoder.py - SDR SAME/EAS alert decoder companion.
Runs rtl_fm piped through multimon-ng to decode NOAA Weather Radio SAME
headers, then writes decoded alerts directly to the MeshHall SQLite database.
This script is independent of the main bot and can be run as a systemd
service alongside it. Both processes share the SQLite database safely
via WAL mode.
Requirements:
pip install watchdog (optional, for log tailing)
apt install rtl-sdr multimon-ng
Usage:
python3 tools/same_decoder.py --db data/meshhall.db --freq 162.550M
NOAA WX frequencies (MHz):
162.400 162.425 162.450 162.475 162.500 162.525 162.550
Find your strongest: https://www.weather.gov/nwr/station_listing
"""
import argparse
import asyncio
import logging
import re
import sqlite3
import subprocess
import sys
import time
from datetime import datetime, timezone
logger = logging.getLogger(__name__)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
# SAME message format: ZCZC-ORG-EEE-PSSCCC-PSSCCC+TTTT-JJJHHMM-LLLLLLLL-
# We parse the key fields we care about.
SAME_RE = re.compile(
r"ZCZC-(?P<org>\w+)-(?P<event>\w+)-(?P<locations>[\d+\-]+)\+(?P<duration>\d{4})-(?P<issued>\d{7})-(?P<sender>\w+)-?"
)
# SAME event code → human readable
EVENT_CODES = {
"TOR": "Tornado Warning",
"TOW": "Tornado Watch",
"SVR": "Severe Thunderstorm Warning",
"SVA": "Severe Thunderstorm Watch",
"FFW": "Flash Flood Warning",
"FFA": "Flash Flood Watch",
"FLW": "Flood Warning",
"FLA": "Flood Watch",
"WSW": "Winter Storm Warning",
"WSA": "Winter Storm Watch",
"BZW": "Blizzard Warning",
"HUW": "Hurricane Warning",
"HUA": "Hurricane Watch",
"EWW": "Extreme Wind Warning",
"EAT": "Emergency Action Termination",
"EAN": "Emergency Action Notification",
"NIC": "National Information Center",
"NPT": "National Periodic Test",
"RMT": "Required Monthly Test",
"RWT": "Required Weekly Test",
"ADR": "Administrative Message",
"AVW": "Avalanche Warning",
"AVA": "Avalanche Watch",
"LAE": "Local Area Emergency",
"LEW": "Law Enforcement Warning",
"FRW": "Fire Warning",
"SMW": "Special Marine Warning",
"SPS": "Special Weather Statement",
"HLS": "Hurricane Local Statement",
"CFW": "Coastal Flood Warning",
"CFA": "Coastal Flood Watch",
}
# Suppress test/drill events by default (override with --include-tests)
TEST_EVENTS = {"RMT", "RWT", "NPT"}
def parse_same(raw: str):
"""Parse a raw SAME string, return dict or None."""
m = SAME_RE.search(raw)
if not m:
return None
d = m.groupdict()
event_code = d["event"]
event_name = EVENT_CODES.get(event_code, event_code)
# Parse duration: TTTT = HHMM
dur_str = d["duration"]
dur_h = int(dur_str[:2])
dur_m = int(dur_str[2:])
duration_mins = dur_h * 60 + dur_m
# Parse issue time: JJJHHMM (Julian day + HHMM UTC)
issued_str = d["issued"]
jday = int(issued_str[:3])
hour = int(issued_str[3:5])
minute = int(issued_str[5:7])
# Build approximate timestamp (current year)
year = datetime.now(timezone.utc).year
issue_ts = int(
datetime(year, 1, 1, hour, minute, tzinfo=timezone.utc).timestamp()
+ (jday - 1) * 86400
)
expires_ts = issue_ts + duration_mins * 60
return {
"event_id": f"SAME-{d['sender']}-{issued_str}",
"event_code": event_code,
"event_type": event_name,
"org": d["org"],
"locations": d["locations"],
"sender": d["sender"],
"issue_ts": issue_ts,
"expires_ts": expires_ts,
"headline": f"{event_name} issued by {d['org']}",
"raw": raw.strip(),
"is_test": event_code in TEST_EVENTS,
}
def store_alert(db_path: str, alert: dict, include_tests: bool = False):
"""Write alert to SQLite. Safe for concurrent access via WAL mode."""
if alert["is_test"] and not include_tests:
logger.info(f"Skipping test alert: {alert['event_code']}")
return
conn = sqlite3.connect(db_path)
conn.execute("PRAGMA journal_mode=WAL")
try:
conn.execute(
"""INSERT OR IGNORE INTO wx_alerts
(ts, source, event_id, event_type, headline, description, expires, area)
VALUES (?,?,?,?,?,?,?,?)""",
(
int(time.time()),
"same",
alert["event_id"],
alert["event_type"],
alert["headline"],
alert["raw"],
alert["expires_ts"],
alert["locations"],
),
)
conn.commit()
logger.info(f"Stored SAME alert: {alert['event_type']} from {alert['sender']}")
except Exception as e:
logger.error(f"DB write error: {e}")
finally:
conn.close()
def run_decoder(freq: str, device_index: int, db_path: str,
include_tests: bool = False, squelch: int = 0):
"""
Launch rtl_fm | multimon-ng pipeline and process output lines.
This is a blocking call - run in a thread or subprocess.
"""
rtl_cmd = [
"rtl_fm",
"-f", freq,
"-M", "fm",
"-s", "22050",
"-E", "dc",
"-p", "0",
"-d", str(device_index),
"-l", str(squelch),
"-",
]
mon_cmd = [
"multimon-ng",
"-t", "raw",
"-a", "EAS",
"-",
]
logger.info(f"Starting SDR decoder on {freq}...")
logger.info(f"rtl_fm: {' '.join(rtl_cmd)}")
try:
rtl = subprocess.Popen(rtl_cmd, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL)
mon = subprocess.Popen(
mon_cmd,
stdin=rtl.stdout,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
text=True,
)
rtl.stdout.close()
logger.info("Decoder running. Listening for SAME alerts...")
for line in mon.stdout:
line = line.strip()
if not line:
continue
logger.debug(f"Raw: {line}")
if "ZCZC" in line:
alert = parse_same(line)
if alert:
logger.info(f"Decoded: {alert['event_type']} ({alert['event_code']})")
store_alert(db_path, alert, include_tests=include_tests)
else:
logger.warning(f"Failed to parse SAME: {line}")
except FileNotFoundError as e:
logger.error(f"Missing tool: {e}. Install rtl-sdr and multimon-ng.")
sys.exit(1)
except KeyboardInterrupt:
logger.info("Decoder stopped.")
if rtl:
rtl.terminate()
if mon:
mon.terminate()
def main():
parser = argparse.ArgumentParser(description="MeshHall SAME/EAS SDR Decoder")
parser.add_argument("--db", default="data/meshhall.db", help="Path to MeshHall SQLite DB")
parser.add_argument("--freq", default="162.550M", help="NOAA WX frequency (e.g. 162.550M)")
parser.add_argument("--device", type=int, default=0, help="RTL-SDR device index")
parser.add_argument("--squelch", type=int, default=0, help="RTL-FM squelch level")
parser.add_argument("--include-tests", action="store_true",
help="Store weekly/monthly test alerts")
args = parser.parse_args()
run_decoder(
freq=args.freq,
device_index=args.device,
db_path=args.db,
include_tests=args.include_tests,
squelch=args.squelch,
)
if __name__ == "__main__":
main()