Skip to content

Integration Test Case for is_latest_issue using DB #925

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 31 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
bd7f285
added integrations test for database for is_latest_issue
xavier-xia-99 Apr 25, 2022
d789236
syntax error
xavier-xia-99 Jun 6, 2022
c3f0205
Update integrations/acquisition/covidcast/test_is_latest_issue.py
xavier-xia-99 Jun 8, 2022
1f10a04
Update integrations/acquisition/covidcast/test_is_latest_issue.py
xavier-xia-99 Jun 8, 2022
b241980
Update requirements.txt
xavier-xia-99 Jun 8, 2022
0a4e09e
added line to ignore output.txt
xavier-xia-99 Jun 8, 2022
f0f3abe
tidied up code and provided common SQL queries as variables
xavier-xia-99 Jun 8, 2022
a998d38
removed unnecessary imports
xavier-xia-99 Jun 8, 2022
035baf7
added line to ignore output.txt
xavier-xia-99 Jun 8, 2022
6d8d25e
removed unnecessary row for signal_latest, checked there is only one …
xavier-xia-99 Jun 11, 2022
4090d4d
removed unnecessary self.
xavier-xia-99 Jun 11, 2022
2e0546f
changed (county,ca) to (county,numeric) pair
xavier-xia-99 Jun 11, 2022
39ab346
Delete output.txt
xavier-xia-99 Jun 12, 2022
db706ac
showTable function WIP
xavier-xia-99 Jun 12, 2022
b9d712f
added Epidata for BASE_URL
xavier-xia-99 Jun 15, 2022
d0b8d69
tidied up comments and test cases
xavier-xia-99 Jun 15, 2022
11a5316
changed all SQL queries to f-strings as better practice
xavier-xia-99 Jun 15, 2022
64ea75d
reverted test_covidcast_meta_caching
xavier-xia-99 Jun 15, 2022
9cd42c3
all done;
xavier-xia-99 Jun 15, 2022
00e396b
tidied up imports
xavier-xia-99 Jun 17, 2022
dee268d
removed dim tests
xavier-xia-99 Jul 12, 2022
157530f
added dim tests in new file
xavier-xia-99 Jul 12, 2022
18a2758
used set to remove ordering of elements in return list
xavier-xia-99 Jul 25, 2022
cb78c73
used set to remove ordering of elements in return list
xavier-xia-99 Jul 26, 2022
6682b58
edited dim_tables for better readability
xavier-xia-99 Jul 26, 2022
f06c327
syntax and functionality checked
xavier-xia-99 Jul 26, 2022
d250f98
file edits
xavier-xia-99 Aug 8, 2022
56852f4
removed unnecessary test objects
xavier-xia-99 Aug 9, 2022
7427906
refined line comments
xavier-xia-99 Aug 9, 2022
c120316
refined comments and added tests for geo_type, geo_value pairs
xavier-xia-99 Aug 9, 2022
1f6322c
clean up redundant comments
xavier-xia-99 Aug 9, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ __pycache__/
/node_modules
.mypy_cache
/missing_db_signals.csv
/dev/local/output.txt
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,10 @@ def test_caching(self):
# make sure the live utility is serving something sensible
cvc_database = live.Database()
cvc_database.connect()
epidata1 = cvc_database.compute_covidcast_meta()
epidata1 = cvc_database.compute_covidcast_meta()
cvc_database.disconnect(False)

# Testing Set
self.assertEqual(len(epidata1),1)
self.assertEqual(epidata1, [
{
Expand All @@ -117,9 +119,6 @@ def test_caching(self):
}
])
epidata1={'result':1, 'message':'success', 'epidata':epidata1}

# make sure the API covidcast_meta is still blank, since it only serves
# the cached version and we haven't cached anything yet
epidata2 = Epidata.covidcast_meta()
self.assertEqual(epidata2['result'], -2, json.dumps(epidata2))

Expand Down
194 changes: 194 additions & 0 deletions integrations/acquisition/covidcast/test_dim_tables.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
"""Integration tests for covidcast's dimension tables."""
# standard library
import unittest

# third party
import mysql.connector
# first party
from delphi_utils import Nans
from delphi.epidata.client.delphi_epidata import Epidata
from delphi.epidata.acquisition.covidcast.database import Database, CovidcastRow
import delphi.operations.secrets as secrets

__test_target__ = 'delphi.epidata.acquisition.covidcast.database'

nmv = Nans.NOT_MISSING.value
class CovidcastDimensionTablesTests(unittest.TestCase):
"""Tests covidcast's dimension tables."""

def setUp(self):
"""Perform per-test setup."""
# connect to the `epidata` database
cnx = mysql.connector.connect(
user='user',
password='pass',
host='delphi_database_epidata',
database='covid')
cur = cnx.cursor()

# clear all tables
cur.execute("truncate table signal_load")
cur.execute("truncate table signal_history")
cur.execute("truncate table signal_latest")
cur.execute("truncate table geo_dim")
cur.execute("truncate table signal_dim")
# reset the `covidcast_meta_cache` table (it should always have one row)
cur.execute('update covidcast_meta_cache set timestamp = 0, epidata = "[]"')
cnx.commit()
cur.close()

# make connection and cursor available to the Database object
self._db = Database()
self._db._connection = cnx
self._db._cursor = cnx.cursor()

# use the local instance of the Epidata API
Epidata.BASE_URL = 'http://delphi_web_epidata/epidata/api.php'

# use the local instance of the epidata database
secrets.db.host = 'delphi_database_epidata'
secrets.db.epi = ('user', 'pass')

#Commonly used SQL commands:
self.viewSignalLatest = f'SELECT * FROM {Database.latest_table}'
self.viewSignalHistory = f'SELECT * FROM {Database.history_table}'
self.viewSignalDim = f'SELECT `source`, `signal` FROM `signal_dim`'
self.viewGeoDim = f'SELECT `geo_type`,`geo_value` FROM `geo_dim`'

def tearDown(self):
"""Perform per-test teardown."""
self._db._cursor.close()
self._db._connection.close()

# We want to test src_sig to make sure rows are added to *_dim only when needed
#new src, sig (ensure that it is added into *_dim)
#old src, sig (ensure that it is NOT added into *_dim)
#new geo (added)
#old geo (not added)
def test_src_sig(self):
#BASE CASES
rows = [
CovidcastRow('src', 'sig', 'day', 'state', 20200414, 'pa',
2, 2, 2, nmv, nmv, nmv, 20200414, 0),
CovidcastRow('src', 'sig', 'day', 'county', 20200414, '11111',
3, 3, 3, nmv, nmv, nmv, 20200414, 0)
]
self._db.insert_or_update_batch(rows)
self._db.run_dbjobs()

#initializing local variables to be used throughout later
self._db._cursor.execute(self.viewGeoDim)
record = self._db._cursor.fetchall()
geoDimRowCount = len(list(record))

self._db._cursor.execute(self.viewSignalDim)
record = self._db._cursor.fetchall()
sigDimRowCount = len(list(record))

self._db._cursor.execute(self.viewSignalLatest)
record = self._db._cursor.fetchall()
sigLatestRowCount = len(list(record))

#test same src, sig not added
with self.subTest(name='older src and sig not added into sig_dim'):
oldSrcSig = [
CovidcastRow('src', 'sig', 'day', 'state', 20211111, 'pa', #same src, sig but diff timevalue and issue
99, 99, 99, nmv, nmv, nmv, 20211111, 1),
CovidcastRow('src', 'sig', 'day', 'county', 20211111, '11111', #same src, sig but diff timevalue and issue
99, 99, 99, nmv, nmv, nmv, 20211111, 1)
]
self._db.insert_or_update_batch(oldSrcSig)
self._db.run_dbjobs()

#testing src, sig
self._db._cursor.execute(self.viewSignalDim)
record = self._db._cursor.fetchall()
res = [('src','sig')] #output
self.assertEqual(res , list(record))

#ensure new entries are added in latest
self._db._cursor.execute(self.viewSignalLatest)
record = self._db._cursor.fetchall()
sigLatestRowCount = len(list(record))
self.assertEqual(4 , sigLatestRowCount) #added diff timevalue and issue, 2 older + 2 newer = 4

#ensure nothing changed in geoDim
self._db._cursor.execute(self.viewGeoDim)
record = self._db._cursor.fetchall()
self.assertEqual(len(list(record)),geoDimRowCount) #geoDimRowCount remains unchanged

#testing new src, sig added
with self.subTest(name='newer src and sig added in sig_dim'):
newSrcSig = [
CovidcastRow('new_src', 'sig', 'day', 'state', 20200414, 'pa', # new_src
2, 2, 2, nmv, nmv, nmv, 20200414, 0),
CovidcastRow('src', 'new_sig', 'day', 'state', 20200414, 'pa', # new_sig
2, 2, 2, nmv, nmv, nmv, 20200414, 0)
]
self._db.insert_or_update_batch(newSrcSig)
self._db.run_dbjobs()

#testing src, sig
self._db._cursor.execute(self.viewSignalDim)
record = self._db._cursor.fetchall()
sigDimRowCount = len(list(record)) #update sigDimRowCount
res = set([('new_src', 'sig'), ('src', 'new_sig'), ('src', 'sig')]) # turn into set to ignore ordering
self.assertEqual(res , set(record))
self.assertEqual(3, sigDimRowCount) #originally had (src , sig) added 2 new pairs

#ensure new entries are added in latest
self._db._cursor.execute(self.viewSignalLatest)
record = self._db._cursor.fetchall()
sigLatestRowCount = len(list(record))
self.assertEqual(6 , sigLatestRowCount) #added diff timevalue and issue, 2 more added ontop of 4 previously

#ensure nothing changed in geoDim
self._db._cursor.execute(self.viewGeoDim)
record = self._db._cursor.fetchall()
self.assertEqual(len(list(record)),geoDimRowCount)

#testing repeated geo not added
with self.subTest(name='old geo not added in geo_dim'):
repeatedGeoValues = [
CovidcastRow('src', 'sig', 'day', 'state', 20200415, 'pa', # same geo_type, geo_value ('pa')
2, 2, 2, nmv, nmv, nmv, 20200415, 0),
CovidcastRow('src', 'sig', 'day', 'county', 20200415, '11111', # same geo_type, geo_value ('11111')
3, 3, 3, nmv, nmv, nmv, 20200415, 0),
]
self._db.insert_or_update_batch(repeatedGeoValues)
self._db.run_dbjobs()

self._db._cursor.execute(self.viewSignalLatest)
record = self._db._cursor.fetchall()
sigLatestRowCount = len(list(record))
self.assertEqual(8, sigLatestRowCount) #total entries = 2 + 6 in previous subtest

#ensure nothing changed in geoDim with repeated geo_type, geo_value pairs
self._db._cursor.execute(self.viewGeoDim)
record = self._db._cursor.fetchall()
self.assertEqual(len(list(record)),geoDimRowCount) #geoDimRowCount unchanged

with self.subTest(name='newer geo added in geo_dim'):
newGeoValues = [ #geo_type #geo_value
CovidcastRow('src', 'sig', 'day', 'state', 20200414, 'nj', # everything same except, state = nj
2, 2, 2, nmv, nmv, nmv, 20200414, 0),
CovidcastRow('src', 'sig', 'day', 'county', 20200414, '15217', # everything same except, county = al
3, 3, 3, nmv, nmv, nmv, 20200414, 0),
CovidcastRow('src', 'sig', 'day', 'county', 20200414, '15451', # everything same except, county = nj
3, 3, 3, nmv, nmv, nmv, 20200414, 0)
]
self._db.insert_or_update_batch(newGeoValues)
self._db.run_dbjobs()

self._db._cursor.execute(f'SELECT `geo_type`,`geo_value` FROM `geo_dim`')
record = self._db._cursor.fetchall()
res = set([('state', 'nj'), ('county', '15217'), ('county', '15451'), ('state', 'pa'), ('county', '11111')]) # turn into set to ignore ordering
self.assertEqual(res , set(record)) #ensure the values are the same, 3 new ones and 2 older ones
geoDimRowCount = len(list(record))
self.assertEqual(5,geoDimRowCount) #2 + 3 new pairs

self._db._cursor.execute(self.viewSignalLatest)
record = self._db._cursor.fetchall()
sigLatestRowCount = len(list(record)) #update sigLatestRowCount
self.assertEqual(11,sigLatestRowCount) #total entries = 8 previously + 3 new ones

137 changes: 137 additions & 0 deletions integrations/acquisition/covidcast/test_is_latest_issue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
"""Integration tests for covidcast's is_latest_issue boolean."""
# standard library
import unittest

# third party
import mysql.connector
# first party
from delphi_utils import Nans
from delphi.epidata.client.delphi_epidata import Epidata
from delphi.epidata.acquisition.covidcast.database import Database, CovidcastRow
import delphi.operations.secrets as secrets


__test_target__ = 'delphi.epidata.acquisition.covidcast.database'

nmv = Nans.NOT_MISSING.value
class CovidcastLatestIssueTests(unittest.TestCase):

"""Tests covidcast is_latest_issue caching."""

def setUp(self):
"""Perform per-test setup."""

# connect to the `epidata` database
cnx = mysql.connector.connect(
user='user',
password='pass',
host='delphi_database_epidata',
database='covid')
cur = cnx.cursor()

# clear all tables
cur.execute("truncate table signal_load")
cur.execute("truncate table signal_history")
cur.execute("truncate table signal_latest")
cur.execute("truncate table geo_dim")
cur.execute("truncate table signal_dim")
# reset the `covidcast_meta_cache` table (it should always have one row)
cur.execute('update covidcast_meta_cache set timestamp = 0, epidata = "[]"')
cnx.commit()
cur.close()

# make connection and cursor available to the Database object
self._db = Database()
self._db._connection = cnx
self._db._cursor = cnx.cursor()

# use the local instance of the Epidata API
Epidata.BASE_URL = 'http://delphi_web_epidata/epidata/api.php'

# use the local instance of the epidata database
secrets.db.host = 'delphi_database_epidata'
secrets.db.epi = ('user', 'pass')

#Commonly used SQL commands:
self.viewSignalLatest = f'SELECT * FROM {Database.latest_table}'
self.viewSignalHistory = f'SELECT * FROM {Database.history_table}'

def tearDown(self):
"""Perform per-test teardown."""
self._db._cursor.close()
self._db._connection.close()

def test_signal_latest(self):

rows = [
CovidcastRow('src', 'sig', 'day', 'state', 20200414, 'pa',
1.5, 2.5, 3.5, nmv, nmv, nmv, 20200414, 0)
]
self._db.insert_or_update_batch(rows)
self._db.run_dbjobs()
self._db._cursor.execute(self.viewSignalHistory)
totalRows = len(list(self._db._cursor.fetchall()))

#sanity check for adding dummy data
sql = f'SELECT `issue` FROM {Database.latest_table} where `time_value` = 20200414'
self._db._cursor.execute(sql)
record = self._db._cursor.fetchall()
self.assertEqual(record[0][0], 20200414)
self.assertEqual(len(record), 1) #placeholder data only has one issue for 20200414

#when uploading data patches (data in signal load has < issue than data in signal_latest)
#INSERT OLDER issue (does not end up in latest table)
#INSERT NEWER issue (ends up in latest table)
#when signal_load is older than signal_latest, we patch old data (i.e changed some old entries)
newRow = [
CovidcastRow('src', 'sig', 'day', 'state', 20200414, 'pa',
4.4, 4.4, 4.4, nmv, nmv, nmv, 20200416, 2)] #new row to be added
self._db.insert_or_update_batch(newRow)
self._db.run_dbjobs()

#check newer issue in signal_latest
sql = f'SELECT `issue` FROM {Database.latest_table} where `time_value` = 20200414 '
self._db._cursor.execute(sql)
record = self._db._cursor.fetchall()
self.assertEqual(record[0][0], 20200416) #new data added, reflected in latest table
self.assertEqual(len(record), 1) # no. of record is still one, since we have latest issue with 20200416

updateRow = [
CovidcastRow('src', 'sig', 'day', 'state', 20200414, 'pa',
6.5, 2.2, 11.5, nmv, nmv, nmv, 20200414, 2)] #update previous entry
self._db.insert_or_update_batch(updateRow)
self._db.run_dbjobs()

#check newer issue in signal_latest
sql = f'SELECT `issue` FROM {Database.latest_table} where `time_value` = 20200414 '
self._db._cursor.execute(sql)
record2 = self._db._cursor.fetchall()
self.assertEqual(record, record2) #same as previous as is_latest did not change

#dynamic check for signal_history's list of issue
self._db._cursor.execute(f'SELECT `issue` FROM {Database.history_table}')
record3 = self._db._cursor.fetchall()
totalRows = len(list(record3))
self.assertEqual(2, totalRows) #added 1 new row, updated old row. Total = 2
self.assertEqual(20200416,max(record3)[0]) #max of the outputs is 20200416 , extracting from tuple

#check older issue not inside latest, empty field
sql = f'SELECT * FROM {Database.latest_table} where `time_value` = 20200414 and `issue` = 20200414 '
self._db._cursor.execute(sql)
emptyRecord = list(self._db._cursor.fetchall())
empty = []
self.assertEqual(empty, emptyRecord)

@unittest.skip("Having different (time_value,issue) pairs in one call to db pipeline does not happen in practice")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can one of you clarify for me? I thought the whole point of batch issue uploads was that they included multiple issues.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh! I was writing tests with multiple issues but it was not performing as intended so I sought George out for some help. Would it be possible to find a time to sit down together to go through this quickly?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found it -- insert_or_update_bulk is called from within a loop, such that it is called separately for each CSV file. Each CSV contains only one time_value and only one issue.

for path, details in path_details:
logger.info(event='handling',dest=path)
path_src, filename = os.path.split(path)
if not details:
# file path or name was invalid, source is unknown
archive_as_failed(path_src, filename, 'unknown',logger)
continue
(source, signal, time_type, geo_type, time_value, issue, lag) = details
csv_rows = csv_importer_impl.load_csv(path, geo_type)
cc_rows = CovidcastRow.fromCsvRows(csv_rows, source, signal, time_type, geo_type, time_value, issue, lag)
rows_list = list(cc_rows)
all_rows_valid = rows_list and all(r is not None for r in rows_list)
if all_rows_valid:
try:
modified_row_count = database.insert_or_update_bulk(rows_list)

However, this test also includes run_dbjobs, which is not called as part of the csv_to_database script or as part of insert_or_update_bulk. We cannot therefore expect that run_dbjobs will only deal with one time_value and issue at a time. So it depends on what this test is trying to verify:

  • correct behavior of insert_or_update_bulk alone? --> then it should not call run_dbjobs
  • correct behavior of insert_or_update_bulk and run_dbjobs when run separately with each CSV file? --> then this test doesn't tell us whether run_dbjobs will run correctly as part of the intended load pipeline
  • correct behavior of insert_or_update_bulk when run separately with each CSV file, and of run_dbjobs when run after all available CSVs have been loaded into signal_load? --> then this test should run insert_or_update_bulk once for each time_value x issue configuration, and then run_dbjobs once at the end.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a flaw (for some definition of 'flaw') that we cannot properly process multiple issues per datapoint in a single run of insert_or_update[+dbjobs] if the issues are not provided in increasing order (or really, if the 'latest' issue is not last in the series). In practice, insert_or_update_bulk() is only called by csv_to_database:upload_archive() and each call only specifies a single issue (in fact, the only part of the composite key that differs is the geo_value). This is not going to cause us problems for the foreseeable future, but it is still worth discussing to determine if/how we want to account for possibilities later (adding/changing logic in insert_or_update (in the SQL or in the python), carrying some of those key-field arguments further down the call stack to prevent unintended consequences, refactoring the larger ingestion pipeline, or otherwise).

This comment was marked as outdated.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Results of zoom discussion:
Options:

  • one CSV per insert, many CSVs per dbjobs
  • Decision: one CSV per insert+dbjobs. Have insert call dbjobs itself so csv_to_database doesn't have to bother about it.
    • if someone chooses to run dbjobs on an untrusted signal_load, it's their responsibility to ensure that issues are ordered correctly.
    • could add a guard to dbjobs to check if signal_load is sorted correctly but that check would take a long time so let's not. comment it obsessively though at the head of dbjobs.

Options for fixing at dbjobs time: none of them are good so let's not.

  • get max before loading into latest
  • sort while loading into latest

Batch issue uploads do traverse issues in the correct order, but since we're having insert call dbjobs we don't need to care.

def test_diff_timevalue_issue(self):
rows = [
CovidcastRow('src', 'sig', 'day', 'state', 20200414, 'pa', #updating old issue, should be seen in latest
17, 17, 17, nmv, nmv, nmv, 20200417, 3),
CovidcastRow('src', 'sig', 'day', 'state', 20200414, 'pa', # updating previous entry
2, 2, 3, nmv, nmv, nmv, 20200416, 2)
]
self._db.insert_or_update_batch(rows)
self._db.run_dbjobs()
self._db._cursor.execute(f'SELECT `issue` FROM {Database.latest_table} ')
record = self._db._cursor.fetchall()
self.assertEqual(record[0][0], 20200417) #20200416 != 20200417
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
self.assertEqual(record[0][0], 20200417) #20200416 != 20200417
# Make sure the 4/17 issue is listed even though 4/16 was imported after it
self.assertEqual(record[0][0], 20200417)