-
Notifications
You must be signed in to change notification settings - Fork 118
/
test_institutions_table.py
159 lines (136 loc) · 5.61 KB
/
test_institutions_table.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
# -*- coding: UTF-8 -*-
import csv
import os
import sys
import time
import threading
import pytest
import requests
from .test_apc_csv import RowObject, DATA_FILES
sys.path.append(os.path.join(sys.path[0], "python"))
import openapc_toolkit as oat
INSTITUTIONS_FILE_PATH = "data/institutions.csv"
# List of all rows in the institutions table, encapsulated as RowObject
INSTITUTIONS_DATA = []
# List of all institution identifiers in the APC data set (as strings)
APC_INSTITUTIONS = []
# Holds all currently active URLRequestThreads
THREAD_POOL = []
# Maximum number of parallel threads
THREAD_POOL_SIZE = 10
FINISHED_THREADS = []
# Prefix for all error messages
MSG_HEAD = "{}, line {}: "
# Number of expected columns in the institutions file
EXP_ROW_LENGTH = 12
class URLRequestThread(threading.Thread):
"""
Make a threaded request to the info_url and report non-200 status codes
Args:
row_object: A test_apc_csv.RowObject which encapsulates information
on a single row from the institutions table.
"""
def __init__(self, row_object):
# assigning a name is not strictly necessary, but can useful for debugging purposes
super().__init__(name = row_object.row[1] + "_thread")
self.row_object = row_object
self.url = row_object.row[10]
self.status_code = None
def run(self):
response = requests.get(self.url, timeout=10)
# Are there other codes besides 200 which indicate a success? Wait and see.
if response.status_code != 200:
self.status_code = response.status_code
def _cleanup_thread_pool():
"""
Clean up the THREAD_POOL.
Threads which are no longer alive (which means their run()
method has finished) are moved to FINISHED_THREADS.
"""
global THREAD_POOL, FINISHED_THREADS
still_running = []
for thread in THREAD_POOL:
if thread.is_alive():
still_running.append(thread)
else:
FINISHED_THREADS.append(thread)
THREAD_POOL = still_running
def run_url_threads():
"""
Create parallel URLRequestThreads for all info_urls and start them.
Fill up the THREAD_POOL with threads, then clean them up in regular
intervals and add new ones whenever there's room.
Unfortunately we cannot make calls to pytest.fail() directly in a
thread's run() method as this would interfere with the built-in
exception handling of the Thread class. We have to do the
thread runs before the other tests and store the results (finished
threads) in the FINISHED_THREADS list. We can then parametrize
over this list later (test_info_urls) to generate readable error
reports.
"""
global THREAD_POOL
for row_object in INSTITUTIONS_DATA:
if oat.has_value(row_object.row[10]):
thread = URLRequestThread(row_object)
while len(THREAD_POOL) >= THREAD_POOL_SIZE:
_cleanup_thread_pool()
time.sleep(0.2)
THREAD_POOL.append(thread)
thread.start()
# Wait until all threads have finished
while len(THREAD_POOL) > 0:
_cleanup_thread_pool()
time.sleep(0.2)
# Prepare the test data
with open(DATA_FILES["apc"]["file_path"], "r") as f:
reader = csv.reader(f)
reader.__next__() # skip the header
for row in reader:
if row[0] not in APC_INSTITUTIONS:
APC_INSTITUTIONS.append(row[0])
with open(INSTITUTIONS_FILE_PATH, "r") as f:
reader = csv.reader(f)
reader.__next__()
for row in reader:
# Use RowObject to store contextual information along with CSV rows for better error messages
row_object = RowObject("institutions.csv", reader.line_num, row, None)
INSTITUTIONS_DATA.append(row_object)
run_url_threads()
@pytest.mark.parametrize("row_object", INSTITUTIONS_DATA)
def test_data_format(row_object):
if len(row_object.row) != EXP_ROW_LENGTH:
msg = MSG_HEAD + "Row does not consist of {} columns."
msg = msg.format(row_object.file_name, row_object.line_number, EXP_ROW_LENGTH)
pytest.fail(msg)
@pytest.mark.parametrize("row_object", INSTITUTIONS_DATA)
def test_data_dirs(row_object):
data_dir = row_object.row[6]
if oat.has_value(data_dir):
if not os.path.isdir(os.path.join("data", data_dir)):
msg = MSG_HEAD + "Directory '{}' does not exist."
msg = msg.format(row_object.file_name, row_object.line_number, data_dir)
pytest.fail(msg)
@pytest.mark.parametrize("thread", FINISHED_THREADS)
def test_info_urls(thread):
if thread.status_code is not None:
msg = MSG_HEAD + "HTTP request to '{}' returned status code {}"
msg = msg.format(thread.row_object.file_name, thread.row_object.line_number, thread.url, thread.status_code)
pytest.fail(msg)
@pytest.mark.parametrize("row_object", INSTITUTIONS_DATA)
def test_institution_file_identifiers(row_object):
institution = row_object.row[0]
if institution not in APC_INSTITUTIONS:
msg = MSG_HEAD + "Institution identifier '{}' does not occur in APC data set."
msg = msg.format(row_object.file_name, row_object.line_number, institution)
pytest.fail(msg)
@pytest.mark.parametrize("institution", APC_INSTITUTIONS)
def test_apc_file_identifiers(institution):
for row_object in INSTITUTIONS_DATA:
# There are more efficient ways to do this (f.e. assert set() == set()),
# but those would not produce easily readable error messages
if institution == row_object.row[0]:
break
else:
msg = "APC data identifier '{}' does not occur in institution file."
msg = msg.format(institution)
pytest.fail(msg)