Skip to content

Commit 83b6988

Browse files
authored
Merge pull request #903 from cmu-delphi/v4-schema-revisions-release-prep-prep
V4 schema revisions candidate
2 parents e0364a7 + 7e28e0f commit 83b6988

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

47 files changed

+1163
-1504
lines changed

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
11
__pycache__/
2+
*.pyc
3+
*~
4+
\#*#
25
.DS_Store
36
/.vscode
47
/delphi-epidata

dev/docker/database/epidata/Dockerfile

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@ ENV MYSQL_DATABASE epidata
88
ENV MYSQL_USER user
99
ENV MYSQL_PASSWORD pass
1010

11+
# provide DDL which will configure dev environment at container startup
12+
COPY repos/delphi/delphi-epidata/dev/docker/database/epidata/_init.sql /docker-entrypoint-initdb.d/
13+
1114
# provide DDL which will create empty tables at container startup
1215
COPY repos/delphi/delphi-epidata/src/ddl/*.sql /docker-entrypoint-initdb.d/
1316

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
CREATE DATABASE covid;
2+
GRANT ALL ON covid.* TO 'user';
Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
geo_id,value,stderr,sample_size,issue,time_value,geo_type,signal,source
22
d_nonlatest,0,0,0,1,0,geo,sig,src
3-
d_latest, 0,0,0,3,0,geo,sig,src
3+
d_latest, 0,0,0,3,0,geo,sig,src
4+
d_justone, 0,0,0,1,0,geo,sig,src

integrations/acquisition/covidcast/test_covidcast_meta_caching.py

Lines changed: 21 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -36,13 +36,17 @@ def setUp(self):
3636
user='user',
3737
password='pass',
3838
host='delphi_database_epidata',
39-
database='epidata')
39+
database='covid')
4040
cur = cnx.cursor()
4141

42-
# clear the `covidcast` table
43-
cur.execute('truncate table covidcast')
42+
# clear all tables
43+
cur.execute("truncate table signal_load")
44+
cur.execute("truncate table signal_history")
45+
cur.execute("truncate table signal_latest")
46+
cur.execute("truncate table geo_dim")
47+
cur.execute("truncate table signal_dim")
4448
# reset the `covidcast_meta_cache` table (it should always have one row)
45-
cur.execute('update covidcast_meta_cache set timestamp = 0, epidata = ""')
49+
cur.execute('update covidcast_meta_cache set timestamp = 0, epidata = "[]"')
4650
cnx.commit()
4751
cur.close()
4852

@@ -67,30 +71,24 @@ def test_caching(self):
6771

6872
# insert dummy data
6973
self.cur.execute(f'''
70-
INSERT INTO
71-
`covidcast` (`id`, `source`, `signal`, `time_type`, `geo_type`,
72-
`time_value`, `geo_value`, `value_updated_timestamp`,
73-
`value`, `stderr`, `sample_size`, `direction_updated_timestamp`,
74-
`direction`, `issue`, `lag`, `is_latest_issue`, `is_wip`,`missing_value`,
75-
`missing_stderr`,`missing_sample_size`)
76-
VALUES
77-
(0, 'src', 'sig', 'day', 'state', 20200422, 'pa',
78-
123, 1, 2, 3, 456, 1, 20200422, 0, 1, False, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}),
79-
(0, 'src', 'sig', 'day', 'state', 20200422, 'wa',
80-
789, 1, 2, 3, 456, 1, 20200423, 1, 1, False, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}, {Nans.NOT_MISSING})
74+
INSERT INTO `signal_dim` (`signal_key_id`, `source`, `signal`) VALUES (42, 'src', 'sig');
75+
''')
76+
self.cur.execute(f'''
77+
INSERT INTO `geo_dim` (`geo_key_id`, `geo_type`, `geo_value`) VALUES (96, 'state', 'pa'), (97, 'state', 'wa');
8178
''')
8279
self.cur.execute(f'''
8380
INSERT INTO
84-
`covidcast` (`id`, `source`, `signal`, `time_type`, `geo_type`,
85-
`time_value`, `geo_value`, `value_updated_timestamp`,
86-
`value`, `stderr`, `sample_size`, `direction_updated_timestamp`,
87-
`direction`, `issue`, `lag`, `is_latest_issue`, `is_wip`,`missing_value`,
88-
`missing_stderr`,`missing_sample_size`)
81+
`signal_latest` (`signal_data_id`, `signal_key_id`, `geo_key_id`, `time_type`,
82+
`time_value`, `value_updated_timestamp`,
83+
`value`, `stderr`, `sample_size`,
84+
`issue`, `lag`, `missing_value`,
85+
`missing_stderr`,`missing_sample_size`)
8986
VALUES
90-
(100, 'src', 'wip_sig', 'day', 'state', 20200422, 'pa',
91-
456, 4, 5, 6, 789, -1, 20200422, 0, 1, True, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}, {Nans.NOT_MISSING})
87+
(15, 42, 96, 'day', 20200422,
88+
123, 1, 2, 3, 20200422, 0, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}),
89+
(16, 42, 97, 'day', 20200422,
90+
789, 1, 2, 3, 20200423, 1, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}, {Nans.NOT_MISSING})
9291
''')
93-
9492
self.cnx.commit()
9593

9694
# make sure the live utility is serving something sensible

integrations/acquisition/covidcast/test_csv_uploading.py

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
from delphi_utils import Nans
1616
from delphi.epidata.client.delphi_epidata import Epidata
1717
from delphi.epidata.acquisition.covidcast.csv_to_database import main
18+
from delphi.epidata.acquisition.covidcast.dbjobs_runner import main as dbjobs_main
1819
import delphi.operations.secrets as secrets
1920

2021
# py3tester coverage target (equivalent to `import *`)
@@ -32,9 +33,18 @@ def setUp(self):
3233
user='user',
3334
password='pass',
3435
host='delphi_database_epidata',
35-
database='epidata')
36+
database='covid')
3637
cur = cnx.cursor()
37-
cur.execute('truncate table covidcast')
38+
39+
# clear all tables
40+
cur.execute("truncate table signal_load")
41+
cur.execute("truncate table signal_history")
42+
cur.execute("truncate table signal_latest")
43+
cur.execute("truncate table geo_dim")
44+
cur.execute("truncate table signal_dim")
45+
# reset the `covidcast_meta_cache` table (it should always have one row)
46+
cur.execute('update covidcast_meta_cache set timestamp = 0, epidata = "[]"')
47+
3848
cnx.commit()
3949
cur.close()
4050

@@ -68,11 +78,12 @@ def apply_lag(expected_epidata):
6878
return expected_epidata
6979

7080
def verify_timestamps_and_defaults(self):
71-
self.cur.execute('select value_updated_timestamp, direction_updated_timestamp, direction from covidcast')
72-
for value_updated_timestamp, direction_updated_timestamp, direction in self.cur:
81+
self.cur.execute('''
82+
select value_updated_timestamp from signal_history
83+
UNION ALL
84+
select value_updated_timestamp from signal_latest''')
85+
for (value_updated_timestamp,) in self.cur:
7386
self.assertGreater(value_updated_timestamp, 0)
74-
self.assertEqual(direction_updated_timestamp, 0)
75-
self.assertIsNone(direction)
7687

7788
def test_uploading(self):
7889
"""Scan, parse, upload, archive, serve, and fetch a covidcast signal."""
@@ -112,6 +123,7 @@ def test_uploading(self):
112123

113124
# upload CSVs
114125
main(args)
126+
dbjobs_main()
115127
response = Epidata.covidcast('src-name', signal_name, 'day', 'state', 20200419, '*')
116128

117129
expected_values = pd.concat([values, pd.DataFrame({ "time_value": [20200419] * 3, "signal": [signal_name] * 3, "direction": [None] * 3})], axis=1).rename(columns=uploader_column_rename).to_dict(orient="records")
@@ -140,6 +152,7 @@ def test_uploading(self):
140152

141153
# upload CSVs
142154
main(args)
155+
dbjobs_main()
143156
response = Epidata.covidcast('src-name', signal_name, 'day', 'state', 20200419, '*')
144157

145158
expected_values = pd.concat([values, pd.DataFrame({
@@ -174,6 +187,7 @@ def test_uploading(self):
174187

175188
# upload CSVs
176189
main(args)
190+
dbjobs_main()
177191
response = Epidata.covidcast('src-name', signal_name, 'day', 'state', 20200419, '*')
178192

179193
expected_response = {'result': -2, 'message': 'no results'}
@@ -199,6 +213,7 @@ def test_uploading(self):
199213

200214
# upload CSVs
201215
main(args)
216+
dbjobs_main()
202217
response = Epidata.covidcast('src-name', signal_name, 'day', 'state', 20200419, '*')
203218

204219
expected_values_df = pd.concat([values, pd.DataFrame({
@@ -232,6 +247,7 @@ def test_uploading(self):
232247

233248
# upload CSVs
234249
main(args)
250+
dbjobs_main()
235251
response = Epidata.covidcast('src-name', signal_name, 'day', 'state', 20200419, '*')
236252

237253
expected_values = pd.concat([values, pd.DataFrame({
@@ -267,6 +283,7 @@ def test_uploading(self):
267283

268284
# upload CSVs
269285
main(args)
286+
dbjobs_main()
270287
response = Epidata.covidcast('src-name', signal_name, 'day', 'state', 20200419, '*')
271288

272289
expected_values = pd.concat([values, pd.DataFrame({
@@ -298,6 +315,7 @@ def test_uploading(self):
298315

299316
# upload CSVs
300317
main(args)
318+
dbjobs_main()
301319
response = Epidata.covidcast('src-name', signal_name, 'day', 'state', 20200419, '*')
302320

303321
expected_response = {'result': -2, 'message': 'no results'}
@@ -314,6 +332,7 @@ def test_uploading(self):
314332
f.write('this,header,is,wrong\n')
315333

316334
main(args)
335+
dbjobs_main()
317336

318337
path = data_dir + '/archive/failed/src-name/20200420_state_test.csv'
319338
self.assertIsNotNone(os.stat(path))
@@ -327,6 +346,7 @@ def test_uploading(self):
327346
f.write('file name is wrong\n')
328347

329348
main(args)
349+
dbjobs_main()
330350

331351
path = data_dir + '/archive/failed/unknown/hello.csv'
332352
self.assertIsNotNone(os.stat(path))
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
import unittest
2+
3+
from delphi_utils import Nans
4+
from delphi.epidata.acquisition.covidcast.database import Database, CovidcastRow
5+
import delphi.operations.secrets as secrets
6+
7+
# all the Nans we use here are just one value, so this is a shortcut to it:
8+
nmv = Nans.NOT_MISSING.value
9+
10+
class TestTest(unittest.TestCase):
11+
12+
def setUp(self):
13+
# use the local test instance of the database
14+
secrets.db.host = 'delphi_database_epidata'
15+
secrets.db.epi = ('user', 'pass')
16+
17+
self._db = Database()
18+
self._db.connect()
19+
20+
# empty all of the data tables
21+
for table in "signal_load signal_latest signal_history geo_dim signal_dim".split():
22+
self._db._cursor.execute(f"TRUNCATE TABLE {table}")
23+
24+
def tearDown(self):
25+
# close and destroy conenction to the database
26+
self._db.disconnect(False)
27+
del self._db
28+
29+
def _make_dummy_row(self):
30+
return CovidcastRow('src', 'sig', 'day', 'state', 2022_02_22, 'pa', 2, 22, 222, nmv,nmv,nmv, 2022_02_22, 0)
31+
# cols: ^ timeval v se ssz ^issue ^lag
32+
33+
def _insert_rows(self, rows):
34+
# inserts rows into the database using the full acquisition process, including 'dbjobs' load into history & latest tables
35+
self._db.insert_or_update_bulk(rows)
36+
self._db.run_dbjobs()
37+
###db._connection.commit() # NOTE: this isnt needed here, but would be if using external access (like through client lib)
38+
39+
def _find_matches_for_row(self, row):
40+
# finds (if existing) row from both history and latest views that matches long-key of provided CovidcastRow
41+
cols = "source signal time_type time_value geo_type geo_value issue".split()
42+
results = {}
43+
cur = self._db._cursor
44+
for table in ['signal_latest_v', 'signal_history_v']:
45+
q = f"SELECT * FROM {table} WHERE "
46+
# NOTE: repr() puts str values in single quotes but simply 'string-ifies' numerics;
47+
# getattr() accesses members by string of their name
48+
q += " AND ".join([f" `{c}` = {repr(getattr(row,c))} " for c in cols])
49+
q += " LIMIT 1;"
50+
cur.execute(q)
51+
res = cur.fetchone()
52+
if res:
53+
results[table] = dict(zip(cur.column_names, res))
54+
else:
55+
results[table] = None
56+
return results
57+
58+
def test_id_sync(self):
59+
# the history and latest tables have a non-AUTOINCREMENT primary key id that is fed by the
60+
# AUTOINCREMENT pk id from the load table. this test is intended to make sure that they
61+
# appropriately stay in sync with each other
62+
63+
pk_column = 'signal_data_id'
64+
histor_view = 'signal_history_v'
65+
latest_view = 'signal_latest_v'
66+
67+
# add a data point
68+
base_row = self._make_dummy_row()
69+
self._insert_rows([base_row])
70+
# ensure the primary keys match in the latest and history tables
71+
matches = self._find_matches_for_row(base_row)
72+
self.assertEqual(matches[latest_view][pk_column],
73+
matches[histor_view][pk_column])
74+
# save old pk value
75+
old_pk_id = matches[latest_view][pk_column]
76+
77+
# add a reissue for said data point
78+
next_row = self._make_dummy_row()
79+
next_row.issue += 1
80+
self._insert_rows([next_row])
81+
# ensure the new keys also match
82+
matches = self._find_matches_for_row(next_row)
83+
self.assertEqual(matches[latest_view][pk_column],
84+
matches[histor_view][pk_column])
85+
# check new ids were used
86+
new_pk_id = matches[latest_view][pk_column]
87+
self.assertNotEqual(old_pk_id, new_pk_id)
88+
89+
# verify old issue is no longer in latest table
90+
self.assertIsNone(self._find_matches_for_row(base_row)[latest_view])

0 commit comments

Comments
 (0)