Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
__pycache__/
*.pyc
*~
\#*#
.DS_Store
/.vscode
/delphi-epidata
Expand Down
3 changes: 2 additions & 1 deletion integrations/acquisition/covidcast/delete_batch.csv
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
geo_id,value,stderr,sample_size,issue,time_value,geo_type,signal,source
d_nonlatest,0,0,0,1,0,geo,sig,src
d_latest, 0,0,0,3,0,geo,sig,src
d_latest, 0,0,0,3,0,geo,sig,src
d_justone, 0,0,0,1,0,geo,sig,src
44 changes: 21 additions & 23 deletions integrations/acquisition/covidcast/test_covidcast_meta_caching.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,17 @@ def setUp(self):
user='user',
password='pass',
host='delphi_database_epidata',
database='epidata')
database='covid')
cur = cnx.cursor()

# clear the `covidcast` table
cur.execute('truncate table covidcast')
# clear all tables
cur.execute("truncate table signal_load")
cur.execute("truncate table signal_history")
cur.execute("truncate table signal_latest")
cur.execute("truncate table geo_dim")
cur.execute("truncate table signal_dim")
# reset the `covidcast_meta_cache` table (it should always have one row)
cur.execute('update covidcast_meta_cache set timestamp = 0, epidata = ""')
cur.execute('update covidcast_meta_cache set timestamp = 0, epidata = "[]"')
cnx.commit()
cur.close()

Expand All @@ -67,30 +71,24 @@ def test_caching(self):

# insert dummy data
self.cur.execute(f'''
INSERT INTO
`covidcast` (`id`, `source`, `signal`, `time_type`, `geo_type`,
`time_value`, `geo_value`, `value_updated_timestamp`,
`value`, `stderr`, `sample_size`, `direction_updated_timestamp`,
`direction`, `issue`, `lag`, `is_latest_issue`, `is_wip`,`missing_value`,
`missing_stderr`,`missing_sample_size`)
VALUES
(0, 'src', 'sig', 'day', 'state', 20200422, 'pa',
123, 1, 2, 3, 456, 1, 20200422, 0, 1, False, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}),
(0, 'src', 'sig', 'day', 'state', 20200422, 'wa',
789, 1, 2, 3, 456, 1, 20200423, 1, 1, False, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}, {Nans.NOT_MISSING})
INSERT INTO `signal_dim` (`signal_key_id`, `source`, `signal`) VALUES (42, 'src', 'sig');
''')
self.cur.execute(f'''
INSERT INTO `geo_dim` (`geo_key_id`, `geo_type`, `geo_value`) VALUES (96, 'state', 'pa'), (97, 'state', 'wa');
''')
self.cur.execute(f'''
INSERT INTO
`covidcast` (`id`, `source`, `signal`, `time_type`, `geo_type`,
`time_value`, `geo_value`, `value_updated_timestamp`,
`value`, `stderr`, `sample_size`, `direction_updated_timestamp`,
`direction`, `issue`, `lag`, `is_latest_issue`, `is_wip`,`missing_value`,
`missing_stderr`,`missing_sample_size`)
`signal_latest` (`signal_data_id`, `signal_key_id`, `geo_key_id`, `time_type`,
`time_value`, `value_updated_timestamp`,
`value`, `stderr`, `sample_size`,
`issue`, `lag`, `missing_value`,
`missing_stderr`,`missing_sample_size`)
VALUES
(100, 'src', 'wip_sig', 'day', 'state', 20200422, 'pa',
456, 4, 5, 6, 789, -1, 20200422, 0, 1, True, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}, {Nans.NOT_MISSING})
(15, 42, 96, 'day', 20200422,
123, 1, 2, 3, 20200422, 0, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}),
(16, 42, 97, 'day', 20200422,
789, 1, 2, 3, 20200423, 1, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}, {Nans.NOT_MISSING})
''')

self.cnx.commit()

# make sure the live utility is serving something sensible
Expand Down
32 changes: 26 additions & 6 deletions integrations/acquisition/covidcast/test_csv_uploading.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from delphi_utils import Nans
from delphi.epidata.client.delphi_epidata import Epidata
from delphi.epidata.acquisition.covidcast.csv_to_database import main
from delphi.epidata.acquisition.covidcast.dbjobs_runner import main as dbjobs_main
import delphi.operations.secrets as secrets

# py3tester coverage target (equivalent to `import *`)
Expand All @@ -32,9 +33,18 @@ def setUp(self):
user='user',
password='pass',
host='delphi_database_epidata',
database='epidata')
database='covid')
cur = cnx.cursor()
cur.execute('truncate table covidcast')

# clear all tables
cur.execute("truncate table signal_load")
cur.execute("truncate table signal_history")
cur.execute("truncate table signal_latest")
cur.execute("truncate table geo_dim")
cur.execute("truncate table signal_dim")
# reset the `covidcast_meta_cache` table (it should always have one row)
cur.execute('update covidcast_meta_cache set timestamp = 0, epidata = "[]"')

cnx.commit()
cur.close()

Expand Down Expand Up @@ -68,11 +78,12 @@ def apply_lag(expected_epidata):
return expected_epidata

def verify_timestamps_and_defaults(self):
self.cur.execute('select value_updated_timestamp, direction_updated_timestamp, direction from covidcast')
for value_updated_timestamp, direction_updated_timestamp, direction in self.cur:
self.cur.execute('''
select value_updated_timestamp from signal_history
UNION ALL
select value_updated_timestamp from signal_latest''')
for (value_updated_timestamp,) in self.cur:
self.assertGreater(value_updated_timestamp, 0)
self.assertEqual(direction_updated_timestamp, 0)
self.assertIsNone(direction)

def test_uploading(self):
"""Scan, parse, upload, archive, serve, and fetch a covidcast signal."""
Expand Down Expand Up @@ -112,6 +123,7 @@ def test_uploading(self):

# upload CSVs
main(args)
dbjobs_main()
response = Epidata.covidcast('src-name', signal_name, 'day', 'state', 20200419, '*')

expected_values = pd.concat([values, pd.DataFrame({ "time_value": [20200419] * 3, "signal": [signal_name] * 3, "direction": [None] * 3})], axis=1).rename(columns=uploader_column_rename).to_dict(orient="records")
Expand Down Expand Up @@ -140,6 +152,7 @@ def test_uploading(self):

# upload CSVs
main(args)
dbjobs_main()
response = Epidata.covidcast('src-name', signal_name, 'day', 'state', 20200419, '*')

expected_values = pd.concat([values, pd.DataFrame({
Expand Down Expand Up @@ -174,6 +187,7 @@ def test_uploading(self):

# upload CSVs
main(args)
dbjobs_main()
response = Epidata.covidcast('src-name', signal_name, 'day', 'state', 20200419, '*')

expected_response = {'result': -2, 'message': 'no results'}
Expand All @@ -199,6 +213,7 @@ def test_uploading(self):

# upload CSVs
main(args)
dbjobs_main()
response = Epidata.covidcast('src-name', signal_name, 'day', 'state', 20200419, '*')

expected_values_df = pd.concat([values, pd.DataFrame({
Expand Down Expand Up @@ -232,6 +247,7 @@ def test_uploading(self):

# upload CSVs
main(args)
dbjobs_main()
response = Epidata.covidcast('src-name', signal_name, 'day', 'state', 20200419, '*')

expected_values = pd.concat([values, pd.DataFrame({
Expand Down Expand Up @@ -267,6 +283,7 @@ def test_uploading(self):

# upload CSVs
main(args)
dbjobs_main()
response = Epidata.covidcast('src-name', signal_name, 'day', 'state', 20200419, '*')

expected_values = pd.concat([values, pd.DataFrame({
Expand Down Expand Up @@ -298,6 +315,7 @@ def test_uploading(self):

# upload CSVs
main(args)
dbjobs_main()
response = Epidata.covidcast('src-name', signal_name, 'day', 'state', 20200419, '*')

expected_response = {'result': -2, 'message': 'no results'}
Expand All @@ -314,6 +332,7 @@ def test_uploading(self):
f.write('this,header,is,wrong\n')

main(args)
dbjobs_main()

path = data_dir + '/archive/failed/src-name/20200420_state_test.csv'
self.assertIsNotNone(os.stat(path))
Expand All @@ -327,6 +346,7 @@ def test_uploading(self):
f.write('file name is wrong\n')

main(args)
dbjobs_main()

path = data_dir + '/archive/failed/unknown/hello.csv'
self.assertIsNotNone(os.stat(path))
Expand Down
91 changes: 91 additions & 0 deletions integrations/acquisition/covidcast/test_db.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
import unittest

from delphi_utils import Nans
from delphi.epidata.acquisition.covidcast.database import Database, CovidcastRow
import delphi.operations.secrets as secrets

# all the Nans we use here are just one value, so this is a shortcut to it:
nmv = Nans.NOT_MISSING.value

class TestTest(unittest.TestCase):

def setUp(self):
# use the local test instance of the database
secrets.db.host = 'delphi_database_epidata'
secrets.db.epi = ('user', 'pass')

self._db = Database()
self._db.connect()

# empty all of the data tables
for table in "signal_load signal_latest signal_history geo_dim signal_dim".split():
self._db._cursor.execute(f"TRUNCATE TABLE {table}")

def tearDown(self):
# close and destroy conenction to the database
self._db.disconnect(False)
del self._db

def _make_dummy_row(self):
return CovidcastRow('src', 'sig', 'day', 'state', 2022_02_22, 'pa', 2, 22, 222, nmv,nmv,nmv, 2022_02_22, 0)
# cols: ^ timeval v se ssz ^issue ^lag

def _insert_rows(self, rows):
# inserts rows into the database using the full acquisition process, including 'dbjobs' load into history & latest tables
self._db.insert_or_update_bulk(rows)
self._db.run_dbjobs()
###db._connection.commit() # NOTE: this isnt needed here, but would be if using external access (like through client lib)

def _find_matches_for_row(self, row):
# finds (if existing) row from both history and latest views that matches long-key of provided CovidcastRow
# TODO: consider making `issue` an optional match... this will break the at-most-1-row-returned assumption for signal_history
cols = "source signal time_type time_value geo_type geo_value issue".split()
results = {}
cur = self._db._cursor
for table in ['signal_latest_v', 'signal_history_v']:
q = f"SELECT * FROM {table} WHERE "
# NOTE: repr() puts str values in single quotes but simply 'string-ifies' numerics;
# getattr() accesses members by string of their name
q += " AND ".join([f" `{c}` = {repr(getattr(row,c))} " for c in cols])
q += " LIMIT 1;"
cur.execute(q)
res = cur.fetchone()
if res:
results[table] = dict(zip(cur.column_names, res))
else:
results[table] = None
return results

def test_id_sync(self):
# the history and latest tables have a non-AUTOINCREMENT primary key id that is fed by the
# AUTOINCREMENT pk id from the load table. this test is intended to make sure that they
# appropriately stay in sync with each other

pk_column = 'signal_data_id'
histor_view = 'signal_history_v'
latest_view = 'signal_latest_v'

# add a data point
base_row = self._make_dummy_row()
self._insert_rows([base_row])
# ensure the primary keys match in the latest and history tables
matches = self._find_matches_for_row(base_row)
self.assertEqual(matches[latest_view][pk_column],
matches[histor_view][pk_column])
# save old pk value
old_pk_id = matches[latest_view][pk_column]

# add a reissue for said data point
next_row = self._make_dummy_row()
next_row.issue += 1
self._insert_rows([next_row])
# ensure the new keys also match
matches = self._find_matches_for_row(next_row)
self.assertEqual(matches[latest_view][pk_column],
matches[histor_view][pk_column])
# check new ids were used
new_pk_id = matches[latest_view][pk_column]
self.assertNotEqual(old_pk_id, new_pk_id)

# verify old issue is no longer in latest table
self.assertIsNone(self._find_matches_for_row(base_row)[latest_view])
Loading