Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 21 additions & 23 deletions integrations/acquisition/covidcast/test_covidcast_meta_caching.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,17 @@ def setUp(self):
user='user',
password='pass',
host='delphi_database_epidata',
database='epidata')
database='covid')
cur = cnx.cursor()

# clear the `covidcast` table
cur.execute('truncate table covidcast')
# clear all tables
cur.execute("truncate table signal_load")
cur.execute("truncate table signal_history")
cur.execute("truncate table signal_latest")
cur.execute("truncate table geo_dim")
cur.execute("truncate table signal_dim")
# reset the `covidcast_meta_cache` table (it should always have one row)
cur.execute('update covidcast_meta_cache set timestamp = 0, epidata = ""')
cur.execute('update covidcast_meta_cache set timestamp = 0, epidata = "[]"')
cnx.commit()
cur.close()

Expand All @@ -67,30 +71,24 @@ def test_caching(self):

# insert dummy data
self.cur.execute(f'''
INSERT INTO
`covidcast` (`id`, `source`, `signal`, `time_type`, `geo_type`,
`time_value`, `geo_value`, `value_updated_timestamp`,
`value`, `stderr`, `sample_size`, `direction_updated_timestamp`,
`direction`, `issue`, `lag`, `is_latest_issue`, `is_wip`,`missing_value`,
`missing_stderr`,`missing_sample_size`)
VALUES
(0, 'src', 'sig', 'day', 'state', 20200422, 'pa',
123, 1, 2, 3, 456, 1, 20200422, 0, 1, False, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}),
(0, 'src', 'sig', 'day', 'state', 20200422, 'wa',
789, 1, 2, 3, 456, 1, 20200423, 1, 1, False, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}, {Nans.NOT_MISSING})
INSERT INTO `signal_dim` (`signal_key_id`, `source`, `signal`) VALUES (42, 'src', 'sig');
''')
self.cur.execute(f'''
INSERT INTO `geo_dim` (`geo_key_id`, `geo_type`, `geo_value`) VALUES (96, 'state', 'pa'), (97, 'state', 'wa');
''')
self.cur.execute(f'''
INSERT INTO
`covidcast` (`id`, `source`, `signal`, `time_type`, `geo_type`,
`time_value`, `geo_value`, `value_updated_timestamp`,
`value`, `stderr`, `sample_size`, `direction_updated_timestamp`,
`direction`, `issue`, `lag`, `is_latest_issue`, `is_wip`,`missing_value`,
`missing_stderr`,`missing_sample_size`)
`signal_latest` (`signal_data_id`, `signal_key_id`, `geo_key_id`, `time_type`,
`time_value`, `value_updated_timestamp`,
`value`, `stderr`, `sample_size`,
`issue`, `lag`, `missing_value`,
`missing_stderr`,`missing_sample_size`)
VALUES
(100, 'src', 'wip_sig', 'day', 'state', 20200422, 'pa',
456, 4, 5, 6, 789, -1, 20200422, 0, 1, True, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}, {Nans.NOT_MISSING})
(15, 42, 96, 'day', 20200422,
123, 1, 2, 3, 20200422, 0, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}),
(16, 42, 97, 'day', 20200422,
789, 1, 2, 3, 20200423, 1, {Nans.NOT_MISSING}, {Nans.NOT_MISSING}, {Nans.NOT_MISSING})
''')

self.cnx.commit()

# make sure the live utility is serving something sensible
Expand Down
32 changes: 26 additions & 6 deletions integrations/acquisition/covidcast/test_csv_uploading.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from delphi_utils import Nans
from delphi.epidata.client.delphi_epidata import Epidata
from delphi.epidata.acquisition.covidcast.csv_to_database import main
from delphi.epidata.acquisition.covidcast.dbjobs_runner import main as dbjobs_main
import delphi.operations.secrets as secrets

# py3tester coverage target (equivalent to `import *`)
Expand All @@ -32,9 +33,18 @@ def setUp(self):
user='user',
password='pass',
host='delphi_database_epidata',
database='epidata')
database='covid')
cur = cnx.cursor()
cur.execute('truncate table covidcast')

# clear all tables
cur.execute("truncate table signal_load")
cur.execute("truncate table signal_history")
cur.execute("truncate table signal_latest")
cur.execute("truncate table geo_dim")
cur.execute("truncate table signal_dim")
# reset the `covidcast_meta_cache` table (it should always have one row)
cur.execute('update covidcast_meta_cache set timestamp = 0, epidata = "[]"')

cnx.commit()
cur.close()

Expand Down Expand Up @@ -68,11 +78,12 @@ def apply_lag(expected_epidata):
return expected_epidata

def verify_timestamps_and_defaults(self):
self.cur.execute('select value_updated_timestamp, direction_updated_timestamp, direction from covidcast')
for value_updated_timestamp, direction_updated_timestamp, direction in self.cur:
self.cur.execute('''
select value_updated_timestamp from signal_history
UNION ALL
select value_updated_timestamp from signal_latest''')
for (value_updated_timestamp,) in self.cur:
self.assertGreater(value_updated_timestamp, 0)
self.assertEqual(direction_updated_timestamp, 0)
self.assertIsNone(direction)

def test_uploading(self):
"""Scan, parse, upload, archive, serve, and fetch a covidcast signal."""
Expand Down Expand Up @@ -112,6 +123,7 @@ def test_uploading(self):

# upload CSVs
main(args)
dbjobs_main()
response = Epidata.covidcast('src-name', signal_name, 'day', 'state', 20200419, '*')

expected_values = pd.concat([values, pd.DataFrame({ "time_value": [20200419] * 3, "signal": [signal_name] * 3, "direction": [None] * 3})], axis=1).rename(columns=uploader_column_rename).to_dict(orient="records")
Expand Down Expand Up @@ -140,6 +152,7 @@ def test_uploading(self):

# upload CSVs
main(args)
dbjobs_main()
response = Epidata.covidcast('src-name', signal_name, 'day', 'state', 20200419, '*')

expected_values = pd.concat([values, pd.DataFrame({
Expand Down Expand Up @@ -174,6 +187,7 @@ def test_uploading(self):

# upload CSVs
main(args)
dbjobs_main()
response = Epidata.covidcast('src-name', signal_name, 'day', 'state', 20200419, '*')

expected_response = {'result': -2, 'message': 'no results'}
Expand All @@ -199,6 +213,7 @@ def test_uploading(self):

# upload CSVs
main(args)
dbjobs_main()
response = Epidata.covidcast('src-name', signal_name, 'day', 'state', 20200419, '*')

expected_values_df = pd.concat([values, pd.DataFrame({
Expand Down Expand Up @@ -232,6 +247,7 @@ def test_uploading(self):

# upload CSVs
main(args)
dbjobs_main()
response = Epidata.covidcast('src-name', signal_name, 'day', 'state', 20200419, '*')

expected_values = pd.concat([values, pd.DataFrame({
Expand Down Expand Up @@ -267,6 +283,7 @@ def test_uploading(self):

# upload CSVs
main(args)
dbjobs_main()
response = Epidata.covidcast('src-name', signal_name, 'day', 'state', 20200419, '*')

expected_values = pd.concat([values, pd.DataFrame({
Expand Down Expand Up @@ -298,6 +315,7 @@ def test_uploading(self):

# upload CSVs
main(args)
dbjobs_main()
response = Epidata.covidcast('src-name', signal_name, 'day', 'state', 20200419, '*')

expected_response = {'result': -2, 'message': 'no results'}
Expand All @@ -314,6 +332,7 @@ def test_uploading(self):
f.write('this,header,is,wrong\n')

main(args)
dbjobs_main()

path = data_dir + '/archive/failed/src-name/20200420_state_test.csv'
self.assertIsNotNone(os.stat(path))
Expand All @@ -327,6 +346,7 @@ def test_uploading(self):
f.write('file name is wrong\n')

main(args)
dbjobs_main()

path = data_dir + '/archive/failed/unknown/hello.csv'
self.assertIsNotNone(os.stat(path))
Expand Down
91 changes: 91 additions & 0 deletions integrations/acquisition/covidcast/test_db.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
import unittest

from delphi_utils import Nans
from delphi.epidata.acquisition.covidcast.database import Database, CovidcastRow
import delphi.operations.secrets as secrets

# all the Nans we use here are just one value, so this is a shortcut to it:
nmv = Nans.NOT_MISSING.value

class TestTest(unittest.TestCase):

def setUp(self):
# use the local test instance of the database
secrets.db.host = 'delphi_database_epidata'
secrets.db.epi = ('user', 'pass')

self._db = Database()
self._db.connect()

# empty all of the data tables
for table in "signal_load signal_latest signal_history geo_dim signal_dim".split():
self._db._cursor.execute(f"TRUNCATE TABLE {table}")

def tearDown(self):
# close and destroy conenction to the database
self._db.disconnect(False)
del self._db

def _make_dummy_row(self):
return CovidcastRow('src', 'sig', 'day', 'state', 2022_02_22, 'pa', 2, 22, 222, nmv,nmv,nmv, 2022_02_22, 0)
# cols: ^ timeval v se ssz ^issue ^lag

def _insert_rows(self, rows):
# inserts rows into the database using the full acquisition process, including 'dbjobs' load into history & latest tables
self._db.insert_or_update_bulk(rows)
self._db.run_dbjobs()
###db._connection.commit() # NOTE: this isnt needed here, but would be if using external access (like through client lib)

def _find_matches_for_row(self, row):
# finds (if existing) row from both history and latest views that matches long-key of provided CovidcastRow
# TODO: consider making `issue` an optional match... this will break the at-most-1-row-returned assumption for signal_history
cols = "source signal time_type time_value geo_type geo_value issue".split()
results = {}
cur = self._db._cursor
for table in ['signal_latest_v', 'signal_history_v']:
q = f"SELECT * FROM {table} WHERE "
# NOTE: repr() puts str values in single quotes but simply 'string-ifies' numerics;
# getattr() accesses members by string of their name
q += " AND ".join([f" `{c}` = {repr(getattr(row,c))} " for c in cols])
q += " LIMIT 1;"
cur.execute(q)
res = cur.fetchone()
if res:
results[table] = dict(zip(cur.column_names, res))
else:
results[table] = None
return results

def test_id_sync(self):
# the history and latest tables have a non-AUTOINCREMENT primary key id that is fed by the
# AUTOINCREMENT pk id from the load table. this test is intended to make sure that they
# appropriately stay in sync with each other

pk_column = 'signal_data_id'
histor_view = 'signal_history_v'
latest_view = 'signal_latest_v'

# add a data point
base_row = self._make_dummy_row()
self._insert_rows([base_row])
# ensure the primary keys match in the latest and history tables
matches = self._find_matches_for_row(base_row)
self.assertEqual(matches[latest_view][pk_column],
matches[histor_view][pk_column])
# save old pk value
old_pk_id = matches[latest_view][pk_column]

# add a reissue for said data point
next_row = self._make_dummy_row()
next_row.issue += 1
self._insert_rows([next_row])
# ensure the new keys also match
matches = self._find_matches_for_row(next_row)
self.assertEqual(matches[latest_view][pk_column],
matches[histor_view][pk_column])
# check new ids were used
new_pk_id = matches[latest_view][pk_column]
self.assertNotEqual(old_pk_id, new_pk_id)

# verify old issue is no longer in latest table
self.assertIsNone(self._find_matches_for_row(base_row)[latest_view])
Loading