Skip to content

Commit cc2da33

Browse files
authored
Merge pull request #947 from cmu-delphi/ds/jit-update-meta
[Draft] just-in-time (JIT) meta computations
2 parents 3ed17a8 + ecd49bc commit cc2da33

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

58 files changed

+2264
-2008
lines changed

.github/workflows/ci.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ jobs:
6262
- name: Start services
6363
run: |
6464
docker network create --driver bridge delphi-net
65-
docker run --rm -d -p 13306:3306 --network delphi-net --name delphi_database_epidata delphi_database_epidata
65+
docker run --rm -d -p 13306:3306 --network delphi-net --name delphi_database_epidata --cap-add=sys_nice delphi_database_epidata
6666
docker run --rm -d -p 10080:80 --env "SQLALCHEMY_DATABASE_URI=mysql+mysqldb://user:pass@delphi_database_epidata:3306/epidata" --env "FLASK_SECRET=abc" --env "FLASK_PREFIX=/epidata" --network delphi-net --name delphi_web_epidata delphi_web_epidata
6767
docker ps
6868
Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,14 @@
1-
# start with the `delphi_database` image
2-
FROM delphi_database
1+
# start with a standard percona mysql image
2+
FROM percona:ps-8
3+
4+
# percona exits with the mysql user but we need root for additional setup
5+
USER root
6+
7+
# use delphi's timezome
8+
RUN ln -s -f /usr/share/zoneinfo/America/New_York /etc/localtime
9+
10+
# specify a development-only password for the database user "root"
11+
ENV MYSQL_ROOT_PASSWORD pass
312

413
# create the `epidata` database
514
ENV MYSQL_DATABASE epidata
@@ -8,8 +17,17 @@ ENV MYSQL_DATABASE epidata
817
ENV MYSQL_USER user
918
ENV MYSQL_PASSWORD pass
1019

20+
# provide DDL which will configure dev environment at container startup
21+
COPY repos/delphi/delphi-epidata/dev/docker/database/epidata/_init.sql /docker-entrypoint-initdb.d/
22+
1123
# provide DDL which will create empty tables at container startup
1224
COPY repos/delphi/delphi-epidata/src/ddl/*.sql /docker-entrypoint-initdb.d/
1325

26+
# provide additional configuration needed for percona
27+
COPY repos/delphi/delphi-epidata/dev/docker/database/mysql.d/*.cnf /etc/my.cnf.d/
28+
1429
# grant access to SQL scripts
1530
RUN chmod o+r /docker-entrypoint-initdb.d/*.sql
31+
32+
# restore mysql user for percona
33+
USER mysql
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
CREATE DATABASE covid;
2+
GRANT ALL ON covid.* TO 'user';

dev/docker/database/mysql.d/my.cnf

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
[mysqld]
2+
default_authentication_plugin=mysql_native_password

dev/local/Makefile

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,19 +10,19 @@
1010
# Creates all prereq images (delphi_database, delphi_python) only if they don't
1111
# exist. If you need to rebuild a prereq, you're probably doing something
1212
# complicated, and can figure out the rebuild command on your own.
13-
#
14-
#
13+
#
14+
#
1515
# Commands:
16-
#
16+
#
1717
# web: Stops currently-running delphi_web_epidata instances, if any.
1818
# Rebuilds delphi_web_epidata image.
1919
# Runs image in the background and pipes stdout to a log file.
20-
#
20+
#
2121
# db: Stops currently-running delphi_database_epidata instances, if any.
2222
# Rebuilds delphi_database_epidata image.
2323
# Runs image in the background and pipes stdout to a log file.
2424
# Blocks until database is ready to receive connections.
25-
#
25+
#
2626
# python: Rebuilds delphi_web_python image. You shouldn't need to do this
2727
# often; only if you are installing a new environment, or have
2828
# made changes to delphi-epidata/dev/docker/python/Dockerfile.
@@ -35,7 +35,7 @@
3535
#
3636
# clean: Cleans up dangling Docker images.
3737
#
38-
#
38+
#
3939
# Optional arguments:
4040
# pdb=1 Drops you into debug mode upon test failure, if running tests.
4141
# test= Only runs tests in the directories provided here, e.g.
@@ -105,11 +105,12 @@ db:
105105
@# Run the database
106106
@docker run --rm -p 127.0.0.1:13306:3306 \
107107
--network delphi-net --name delphi_database_epidata \
108+
--cap-add=sys_nice \
108109
delphi_database_epidata >$(LOG_DB) 2>&1 &
109110

110111
@# Block until DB is ready
111112
@while true; do \
112-
sed -n '/Temporary server stopped/,/mysqld: ready for connections/p' $(LOG_DB) | grep "ready for connections" && break; \
113+
sed -n '/mysqld: ready for connections/p' $(LOG_DB) | grep "ready for connections" && break; \
113114
tail -1 $(LOG_DB); \
114115
sleep 1; \
115116
done
@@ -127,7 +128,7 @@ py:
127128
all: web db py
128129

129130
.PHONY=test
130-
test:
131+
test:
131132
@docker run -i --rm --network delphi-net \
132133
--mount type=bind,source=$(CWD)repos/delphi/delphi-epidata,target=/usr/src/app/repos/delphi/delphi-epidata,readonly \
133134
--mount type=bind,source=$(CWD)repos/delphi/delphi-epidata/src,target=/usr/src/app/delphi/epidata,readonly \

docs/Gemfile.lock

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,7 @@ GEM
246246
thread_safe (0.3.6)
247247
typhoeus (1.4.0)
248248
ethon (>= 0.9.0)
249-
tzinfo (1.2.9)
249+
tzinfo (1.2.10)
250250
thread_safe (~> 0.1)
251251
tzinfo-data (1.2021.1)
252252
tzinfo (>= 1.0.0)

integrations/acquisition/covidcast/test_covidcast_meta_caching.py

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,9 @@
1010

1111
# first party
1212
from delphi_utils import Nans
13-
from delphi.epidata.client.delphi_epidata import Epidata
1413
import delphi.operations.secrets as secrets
15-
import delphi.epidata.acquisition.covidcast.database as live
14+
from delphi.epidata.client.delphi_epidata import Epidata
15+
from delphi.epidata.acquisition.covidcast.database_meta import DatabaseMeta
1616
from delphi.epidata.acquisition.covidcast.covidcast_meta_cache_updater import main
1717

1818
# py3tester coverage target (equivalent to `import *`)
@@ -40,9 +40,9 @@ def setUp(self):
4040
cur = cnx.cursor()
4141

4242
# clear all tables
43-
cur.execute("truncate table signal_load")
44-
cur.execute("truncate table signal_history")
45-
cur.execute("truncate table signal_latest")
43+
cur.execute("truncate table epimetric_load")
44+
cur.execute("truncate table epimetric_full")
45+
cur.execute("truncate table epimetric_latest")
4646
cur.execute("truncate table geo_dim")
4747
cur.execute("truncate table signal_dim")
4848
# reset the `covidcast_meta_cache` table (it should always have one row)
@@ -71,14 +71,19 @@ def test_caching(self):
7171

7272
# insert dummy data
7373
self.cur.execute(f'''
74-
INSERT INTO `signal_dim` (`signal_key_id`, `source`, `signal`) VALUES (42, 'src', 'sig');
74+
INSERT INTO `signal_dim` (`signal_key_id`, `source`, `signal`)
75+
VALUES
76+
(42, 'src', 'sig');
7577
''')
7678
self.cur.execute(f'''
77-
INSERT INTO `geo_dim` (`geo_key_id`, `geo_type`, `geo_value`) VALUES (96, 'state', 'pa'), (97, 'state', 'wa');
79+
INSERT INTO `geo_dim` (`geo_key_id`, `geo_type`, `geo_value`)
80+
VALUES
81+
(96, 'state', 'pa'),
82+
(97, 'state', 'wa');
7883
''')
7984
self.cur.execute(f'''
8085
INSERT INTO
81-
`signal_latest` (`signal_data_id`, `signal_key_id`, `geo_key_id`, `time_type`,
86+
`epimetric_latest` (`epimetric_id`, `signal_key_id`, `geo_key_id`, `time_type`,
8287
`time_value`, `value_updated_timestamp`,
8388
`value`, `stderr`, `sample_size`,
8489
`issue`, `lag`, `missing_value`,
@@ -92,7 +97,7 @@ def test_caching(self):
9297
self.cnx.commit()
9398

9499
# make sure the live utility is serving something sensible
95-
cvc_database = live.Database()
100+
cvc_database = DatabaseMeta()
96101
cvc_database.connect()
97102
epidata1 = cvc_database.compute_covidcast_meta()
98103
cvc_database.disconnect(False)

integrations/acquisition/covidcast/test_csv_uploading.py

Lines changed: 7 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
from delphi_utils import Nans
1616
from delphi.epidata.client.delphi_epidata import Epidata
1717
from delphi.epidata.acquisition.covidcast.csv_to_database import main
18-
from delphi.epidata.acquisition.covidcast.dbjobs_runner import main as dbjobs_main
1918
import delphi.operations.secrets as secrets
2019

2120
# py3tester coverage target (equivalent to `import *`)
@@ -37,9 +36,9 @@ def setUp(self):
3736
cur = cnx.cursor()
3837

3938
# clear all tables
40-
cur.execute("truncate table signal_load")
41-
cur.execute("truncate table signal_history")
42-
cur.execute("truncate table signal_latest")
39+
cur.execute("truncate table epimetric_load")
40+
cur.execute("truncate table epimetric_full")
41+
cur.execute("truncate table epimetric_latest")
4342
cur.execute("truncate table geo_dim")
4443
cur.execute("truncate table signal_dim")
4544
# reset the `covidcast_meta_cache` table (it should always have one row)
@@ -79,9 +78,9 @@ def apply_lag(expected_epidata):
7978

8079
def verify_timestamps_and_defaults(self):
8180
self.cur.execute('''
82-
select value_updated_timestamp from signal_history
81+
select value_updated_timestamp from epimetric_full
8382
UNION ALL
84-
select value_updated_timestamp from signal_latest''')
83+
select value_updated_timestamp from epimetric_latest''')
8584
for (value_updated_timestamp,) in self.cur:
8685
self.assertGreater(value_updated_timestamp, 0)
8786

@@ -102,8 +101,6 @@ def test_uploading(self):
102101
log_file=log_file_directory +
103102
"output.log",
104103
data_dir=data_dir,
105-
is_wip_override=False,
106-
not_wip_override=False,
107104
specific_issue_date=False)
108105
uploader_column_rename = {"geo_id": "geo_value", "val": "value", "se": "stderr", "missing_val": "missing_value", "missing_se": "missing_stderr"}
109106

@@ -123,7 +120,6 @@ def test_uploading(self):
123120

124121
# upload CSVs
125122
main(args)
126-
dbjobs_main()
127123
response = Epidata.covidcast('src-name', signal_name, 'day', 'state', 20200419, '*')
128124

129125
expected_values = pd.concat([values, pd.DataFrame({ "time_value": [20200419] * 3, "signal": [signal_name] * 3, "direction": [None] * 3})], axis=1).rename(columns=uploader_column_rename).to_dict(orient="records")
@@ -152,7 +148,6 @@ def test_uploading(self):
152148

153149
# upload CSVs
154150
main(args)
155-
dbjobs_main()
156151
response = Epidata.covidcast('src-name', signal_name, 'day', 'state', 20200419, '*')
157152

158153
expected_values = pd.concat([values, pd.DataFrame({
@@ -187,7 +182,6 @@ def test_uploading(self):
187182

188183
# upload CSVs
189184
main(args)
190-
dbjobs_main()
191185
response = Epidata.covidcast('src-name', signal_name, 'day', 'state', 20200419, '*')
192186

193187
expected_response = {'result': -2, 'message': 'no results'}
@@ -213,7 +207,6 @@ def test_uploading(self):
213207

214208
# upload CSVs
215209
main(args)
216-
dbjobs_main()
217210
response = Epidata.covidcast('src-name', signal_name, 'day', 'state', 20200419, '*')
218211

219212
expected_values_df = pd.concat([values, pd.DataFrame({
@@ -232,42 +225,6 @@ def test_uploading(self):
232225
self.setUp()
233226

234227

235-
with self.subTest("Valid wip"):
236-
values = pd.DataFrame({
237-
"geo_id": ["me", "nd", "wa"],
238-
"val": [10.0, 20.0, 30.0],
239-
"se": [0.01, 0.02, 0.03],
240-
"sample_size": [100.0, 200.0, 300.0],
241-
"missing_val": [Nans.NOT_MISSING] * 3,
242-
"missing_se": [Nans.NOT_MISSING] * 3,
243-
"missing_sample_size": [Nans.NOT_MISSING] * 3
244-
})
245-
signal_name = "wip_prototype"
246-
values.to_csv(source_receiving_dir + f'/20200419_state_{signal_name}.csv', index=False)
247-
248-
# upload CSVs
249-
main(args)
250-
dbjobs_main()
251-
response = Epidata.covidcast('src-name', signal_name, 'day', 'state', 20200419, '*')
252-
253-
expected_values = pd.concat([values, pd.DataFrame({
254-
"time_value": [20200419] * 3,
255-
"signal": [signal_name] * 3,
256-
"direction": [None] * 3
257-
})], axis=1).rename(columns=uploader_column_rename).to_dict(orient="records")
258-
expected_response = {'result': 1, 'epidata': self.apply_lag(expected_values), 'message': 'success'}
259-
260-
self.assertEqual(response, expected_response)
261-
self.verify_timestamps_and_defaults()
262-
263-
# Verify that files were archived
264-
path = data_dir + f'/archive/successful/src-name/20200419_state_wip_prototype.csv.gz'
265-
self.assertIsNotNone(os.stat(path))
266-
267-
self.tearDown()
268-
self.setUp()
269-
270-
271228
with self.subTest("Valid signal with name length 32<x<64"):
272229
values = pd.DataFrame({
273230
"geo_id": ["pa"],
@@ -278,12 +235,11 @@ def test_uploading(self):
278235
"missing_se": [Nans.NOT_MISSING],
279236
"missing_sample_size": [Nans.NOT_MISSING]
280237
})
281-
signal_name = "wip_really_long_name_that_will_be_accepted"
238+
signal_name = "really_long_name_that_will_be_accepted"
282239
values.to_csv(source_receiving_dir + f'/20200419_state_{signal_name}.csv', index=False)
283240

284241
# upload CSVs
285242
main(args)
286-
dbjobs_main()
287243
response = Epidata.covidcast('src-name', signal_name, 'day', 'state', 20200419, '*')
288244

289245
expected_values = pd.concat([values, pd.DataFrame({
@@ -310,12 +266,11 @@ def test_uploading(self):
310266
"missing_se": [Nans.NOT_MISSING],
311267
"missing_sample_size": [Nans.NOT_MISSING]
312268
})
313-
signal_name = "wip_really_long_name_that_will_get_truncated_lorem_ipsum_dolor_sit_amet"
269+
signal_name = "really_long_name_that_will_get_truncated_lorem_ipsum_dolor_sit_amet"
314270
values.to_csv(source_receiving_dir + f'/20200419_state_{signal_name}.csv', index=False)
315271

316272
# upload CSVs
317273
main(args)
318-
dbjobs_main()
319274
response = Epidata.covidcast('src-name', signal_name, 'day', 'state', 20200419, '*')
320275

321276
expected_response = {'result': -2, 'message': 'no results'}
@@ -332,7 +287,6 @@ def test_uploading(self):
332287
f.write('this,header,is,wrong\n')
333288

334289
main(args)
335-
dbjobs_main()
336290

337291
path = data_dir + '/archive/failed/src-name/20200420_state_test.csv'
338292
self.assertIsNotNone(os.stat(path))
@@ -346,7 +300,6 @@ def test_uploading(self):
346300
f.write('file name is wrong\n')
347301

348302
main(args)
349-
dbjobs_main()
350303

351304
path = data_dir + '/archive/failed/unknown/hello.csv'
352305
self.assertIsNotNone(os.stat(path))

0 commit comments

Comments
 (0)